Start working to eliminate VFM dependence on static variables.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #include "debug-print.h"
40
41 /* Specifies how to make an aggregate variable. */
42 struct agr_var
43   {
44     struct agr_var *next;               /* Next in list. */
45
46     /* Collected during parsing. */
47     struct variable *src;       /* Source variable. */
48     struct variable *dest;      /* Target variable. */
49     int function;               /* Function. */
50     int include_missing;        /* 1=Include user-missing values. */
51     union value arg[2];         /* Arguments. */
52
53     /* Accumulated during AGGREGATE execution. */
54     double dbl[3];
55     int int1, int2;
56     char *string;
57     int missing;
58   };
59
60 /* Aggregation functions. */
61 enum
62   {
63     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
64     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
65     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
66     FUNC = 0x1f, /* Function mask. */
67     FSTRING = 1<<5, /* String function bit. */
68   };
69
70 /* Attributes of an aggregation function. */
71 struct agr_func
72   {
73     const char *name;           /* Aggregation function name. */
74     int n_args;                 /* Number of arguments. */
75     int alpha_type;             /* When given ALPHA arguments, output type. */
76     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
77   };
78
79 /* Attributes of aggregation functions. */
80 static const struct agr_func agr_func_tab[] = 
81   {
82     {"<NONE>",  0, -1,      {0, 0, 0}},
83     {"SUM",     0, -1,      {FMT_F, 8, 2}},
84     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
85     {"SD",      0, -1,      {FMT_F, 8, 2}},
86     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
87     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
88     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
89     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
90     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
91     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
92     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
94     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
95     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
96     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
100     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
101     {"LAST",    0, ALPHA,   {-1, -1, -1}},
102     {NULL,      0, -1,      {-1, -1, -1}},
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
105   };
106
107 /* Output file, or NULL for the active file. */
108 static struct file_handle *outfile;
109
110 /* Missing value types. */
111 enum
112   {
113     ITEMWISE,           /* Missing values item by item. */
114     COLUMNWISE          /* Missing values column by column. */
115   };
116
117 /* ITEMWISE or COLUMNWISE. */
118 static int missing;
119
120 /* Sort program. */
121 static struct sort_cases_pgm *sort;
122
123 /* Aggregate variables. */
124 static struct agr_var *agr_first, *agr_next;
125
126 /* Aggregate dictionary. */
127 static struct dictionary *agr_dict;
128
129 /* Number of cases passed through aggregation. */
130 static int case_count;
131
132 /* Last values of the break variables. */
133 static union value *prev_case;
134
135 /* Buffers for use by the 10x transformation. */
136 static flt64 *buf64_1xx;
137 static struct ccase *buf_1xx;
138
139 static void initialize_aggregate_info (void);
140
141 /* Prototypes. */
142 static int parse_aggregate_functions (void);
143 static void free_aggregate_functions (void);
144 static int aggregate_single_case (struct ccase *input, struct ccase *output);
145 static int create_sysfile (void);
146
147 static trns_proc_func agr_00x_trns_proc, agr_10x_trns_proc;
148 static trns_free_func agr_10x_trns_free;
149 static void agr_00x_end_func (void *aux);
150 static void agr_10x_end_func (void *);
151 static int agr_11x_func (write_case_data);
152
153 #if DEBUGGING
154 static void debug_print (int flags);
155 #endif
156 \f
157 /* Parsing. */
158
159 /* Parses and executes the AGGREGATE procedure. */
160 int
161 cmd_aggregate (void)
162 {
163   /* Have we seen these subcommands? */
164   unsigned seen = 0;
165
166   outfile = NULL;
167   missing = ITEMWISE;
168   sort = NULL;
169   prev_case = NULL;
170   
171   agr_dict = dict_create ();
172   dict_set_label (agr_dict, dict_get_label (default_dict));
173   dict_set_documents (agr_dict, dict_get_documents (default_dict));
174   
175   lex_match_id ("AGGREGATE");
176
177   /* Read most of the subcommands. */
178   for (;;)
179     {
180       lex_match('/');
181       
182       if (lex_match_id ("OUTFILE"))
183         {
184           if (seen & 1)
185             {
186               destroy_sort_cases_pgm (sort);
187               dict_destroy (agr_dict);
188               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
189               return CMD_FAILURE;
190             }
191           seen |= 1;
192               
193           lex_match ('=');
194           if (lex_match ('*'))
195             outfile = NULL;
196           else 
197             {
198               outfile = fh_parse_file_handle ();
199               if (outfile == NULL)
200                 {
201                   destroy_sort_cases_pgm (sort);
202                   dict_destroy (agr_dict);
203                   return CMD_FAILURE;
204                 }
205             }
206         }
207       else if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               destroy_sort_cases_pgm (sort);
213               dict_destroy (agr_dict);
214               lex_error (_("while expecting COLUMNWISE"));
215               return CMD_FAILURE;
216             }
217           missing = COLUMNWISE;
218         }
219       else if (lex_match_id ("DOCUMENT"))
220         seen |= 2;
221       else if (lex_match_id ("PRESORTED"))
222         seen |= 4;
223       else if (lex_match_id ("BREAK"))
224         {
225           if (seen & 8)
226             {
227               destroy_sort_cases_pgm (sort);
228               dict_destroy (agr_dict);
229               msg (SE, _("%s subcommand given multiple times."),"BREAK");
230               return CMD_FAILURE;
231             }
232           seen |= 8;
233
234           lex_match ('=');
235           sort = parse_sort ();
236           if (sort == NULL)
237             {
238               dict_destroy (agr_dict);
239               return CMD_FAILURE;
240             }
241           
242           {
243             int i;
244             
245             for (i = 0; i < sort->var_cnt; i++)
246               {
247                 struct variable *v;
248               
249                 v = dict_clone_var (agr_dict, sort->vars[i], sort->vars[i]->name);
250                 assert (v != NULL);
251               }
252           }
253         }
254       else break;
255     }
256
257   /* Check for proper syntax. */
258   if (!(seen & 8))
259     msg (SW, _("BREAK subcommand not specified."));
260       
261   /* Read in the aggregate functions. */
262   if (!parse_aggregate_functions ())
263     {
264       free_aggregate_functions ();
265       destroy_sort_cases_pgm (sort);
266       return CMD_FAILURE;
267     }
268
269   /* Delete documents. */
270   if (!(seen & 2))
271     dict_set_documents (agr_dict, NULL);
272
273   /* Cancel SPLIT FILE. */
274   dict_set_split_vars (agr_dict, NULL, 0);
275   
276 #if DEBUGGING
277   debug_print (seen);
278 #endif
279
280   /* Initialize. */
281   case_count = 0;
282   initialize_aggregate_info ();
283
284   /* How to implement all this... There are three important variables:
285      whether output is going to the active file (0) or a separate file
286      (1); whether the input data is presorted (0) or needs sorting
287      (1); whether there is a temporary transformation (1) or not (0).
288      The eight cases are as follows:
289
290      000 (0): Pass it through an aggregate transformation that
291      modifies the data.
292
293      001 (1): Cancel the temporary transformation and handle as 000.
294
295      010 (2): Set up a SORT CASES and aggregate the output, writing
296      the results to the active file.
297      
298      011 (3): Cancel the temporary transformation and handle as 010.
299
300      100 (4): Pass it through an aggregate transformation that doesn't
301      modify the data but merely writes it to the output file.
302
303      101 (5): Handled as 100.
304
305      110 (6): Set up a SORT CASES and capture the output, aggregate
306      it, write it to the output file without modifying the active
307      file.
308
309      111 (7): Handled as 110. */
310   
311   {
312     unsigned type = 0;
313
314     if (outfile != NULL)
315       type |= 4;
316     if (sort != NULL && (seen & 4) == 0)
317       type |= 2;
318     if (temporary)
319       type |= 1;
320
321     switch (type)
322       {
323       case 3:
324         cancel_temporary ();
325         /* fall through */
326       case 2:
327         sort_cases (sort, 0);
328         goto case0;
329           
330       case 1:
331         cancel_temporary ();
332         /* fall through */
333       case 0:
334       case0:
335         {
336           struct trns_header *t = xmalloc (sizeof *t);
337           t->proc = agr_00x_trns_proc;
338           t->free = NULL;
339           add_transformation (t);
340           
341           temporary = 2;
342           temp_dict = agr_dict;
343           temp_trns = n_trns;
344           
345           agr_dict = NULL;
346
347           procedure (NULL, NULL, agr_00x_end_func, NULL);
348           break;
349         }
350
351       case 4:
352       case 5:
353         {
354           if (!create_sysfile ())
355             goto lossage;
356           
357           {
358             struct trns_header *t = xmalloc (sizeof *t);
359             t->proc = agr_10x_trns_proc;
360             t->free = agr_10x_trns_free;
361             add_transformation (t);
362
363             procedure (NULL, NULL, agr_10x_end_func, NULL);
364           }
365           
366           break;
367         }
368           
369       case 6:
370       case 7:
371         sort_cases (sort, 1);
372         
373         if (!create_sysfile ())
374           goto lossage;
375         read_sort_output (sort, agr_11x_func, NULL);
376         
377         {
378           struct ccase *save_temp_case = temp_case;
379           temp_case = NULL;
380           agr_11x_func (NULL);
381           temp_case = save_temp_case;
382         }
383         
384         break;
385
386       default:
387         assert (0);
388       }
389   }
390   
391   free (buf64_1xx);
392   free (buf_1xx);
393   
394   /* Clean up. */
395   destroy_sort_cases_pgm (sort);
396   free_aggregate_functions ();
397   free (prev_case);
398   
399   return CMD_SUCCESS;
400
401 lossage:
402   /* Clean up. */
403   destroy_sort_cases_pgm (sort);
404   free_aggregate_functions ();
405   free (prev_case);
406
407   return CMD_FAILURE;
408 }
409
410 /* Create a system file for use in aggregation to an external file,
411    and allocate temporary buffers for writing out cases. */
412 static int
413 create_sysfile (void)
414 {
415   struct sfm_write_info w;
416   w.h = outfile;
417   w.dict = agr_dict;
418   w.compress = set_scompression;
419   if (!sfm_write_dictionary (&w))
420     {
421       free_aggregate_functions ();
422       destroy_sort_cases_pgm (sort);
423       dict_destroy (agr_dict);
424       return 0;
425     }
426     
427   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
428   buf_1xx = xmalloc (dict_get_case_size (agr_dict));
429
430   return 1;
431 }
432
433 /* Parse all the aggregate functions. */
434 static int
435 parse_aggregate_functions (void)
436 {
437   agr_first = agr_next = NULL;
438
439   /* Parse everything. */
440   for (;;)
441     {
442       char **dest;
443       char **dest_label;
444       int n_dest;
445
446       int include_missing;
447       const struct agr_func *function;
448       int func_index;
449
450       union value arg[2];
451
452       struct variable **src;
453       int n_src;
454
455       int i;
456
457       dest = NULL;
458       dest_label = NULL;
459       n_dest = 0;
460       src = NULL;
461       n_src = 0;
462       arg[0].c = NULL;
463       arg[1].c = NULL;
464
465       /* Parse the list of target variables. */
466       while (!lex_match ('='))
467         {
468           int n_dest_prev = n_dest;
469           
470           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
471             goto lossage;
472
473           /* Assign empty labels. */
474           {
475             int j;
476
477             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
478             for (j = n_dest_prev; j < n_dest; j++)
479               dest_label[j] = NULL;
480           }
481           
482           if (token == T_STRING)
483             {
484               ds_truncate (&tokstr, 120);
485               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
486               lex_get ();
487             }
488         }
489
490       /* Get the name of the aggregation function. */
491       if (token != T_ID)
492         {
493           lex_error (_("expecting aggregation function"));
494           goto lossage;
495         }
496
497       include_missing = 0;
498       if (tokid[strlen (tokid) - 1] == '.')
499         {
500           include_missing = 1;
501           tokid[strlen (tokid) - 1] = 0;
502         }
503       
504       for (function = agr_func_tab; function->name; function++)
505         if (!strcmp (function->name, tokid))
506           break;
507       if (NULL == function->name)
508         {
509           msg (SE, _("Unknown aggregation function %s."), tokid);
510           goto lossage;
511         }
512       func_index = function - agr_func_tab;
513       lex_get ();
514
515       /* Check for leading lparen. */
516       if (!lex_match ('('))
517         {
518           if (func_index == N)
519             func_index = N_NO_VARS;
520           else if (func_index == NU)
521             func_index = NU_NO_VARS;
522           else
523             {
524               lex_error (_("expecting `('"));
525               goto lossage;
526             }
527         } else {
528           /* Parse list of source variables. */
529           {
530             int pv_opts = PV_NO_SCRATCH;
531
532             if (func_index == SUM || func_index == MEAN || func_index == SD)
533               pv_opts |= PV_NUMERIC;
534             else if (function->n_args)
535               pv_opts |= PV_SAME_TYPE;
536
537             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
538               goto lossage;
539           }
540
541           /* Parse function arguments, for those functions that
542              require arguments. */
543           if (function->n_args != 0)
544             for (i = 0; i < function->n_args; i++)
545               {
546                 int type;
547             
548                 lex_match (',');
549                 if (token == T_STRING)
550                   {
551                     arg[i].c = xstrdup (ds_value (&tokstr));
552                     type = ALPHA;
553                   }
554                 else if (token == T_NUM)
555                   {
556                     arg[i].f = tokval;
557                     type = NUMERIC;
558                   } else {
559                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
560                     goto lossage;
561                   }
562             
563                 lex_get ();
564
565                 if (type != src[0]->type)
566                   {
567                     msg (SE, _("Arguments to %s must be of same type as "
568                                "source variables."),
569                          function->name);
570                     goto lossage;
571                   }
572               }
573
574           /* Trailing rparen. */
575           if (!lex_match(')'))
576             {
577               lex_error (_("expecting `)'"));
578               goto lossage;
579             }
580           
581           /* Now check that the number of source variables match the
582              number of target variables.  Do this here because if we
583              do it earlier then the user can get very misleading error
584              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
585              error message when a proper message would be more like
586              `unknown variable t'. */
587           if (n_src != n_dest)
588             {
589               msg (SE, _("Number of source variables (%d) does not match "
590                          "number of target variables (%d)."),
591                    n_src, n_dest);
592               goto lossage;
593             }
594         }
595         
596       /* Finally add these to the linked list of aggregation
597          variables. */
598       for (i = 0; i < n_dest; i++)
599         {
600           struct agr_var *v = xmalloc (sizeof *v);
601
602           /* Add variable to chain. */
603           if (agr_first)
604             agr_next = agr_next->next = v;
605           else
606             agr_first = agr_next = v;
607           agr_next->next = NULL;
608           
609           /* Create the target variable in the aggregate
610              dictionary. */
611           {
612             struct variable *destvar;
613             
614             agr_next->function = func_index;
615
616             if (src)
617               {
618                 int output_width;
619
620                 agr_next->src = src[i];
621                 
622                 if (src[i]->type == ALPHA)
623                   {
624                     agr_next->function |= FSTRING;
625                     agr_next->string = xmalloc (src[i]->width);
626                   }
627                 
628                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
629                   output_width = 0;
630                 else
631                   output_width = agr_next->src->width;
632
633                 if (function->alpha_type == ALPHA)
634                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
635                 else
636                   {
637                     destvar = dict_create_var (agr_dict, dest[i], output_width);
638                     if (output_width == 0)
639                       destvar->print = destvar->write = function->format;
640                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
641                         && (func_index == N || func_index == N_NO_VARS
642                             || func_index == NU || func_index == NU_NO_VARS))
643                       {
644                         struct fmt_spec f = {FMT_F, 8, 2};
645                       
646                         destvar->print = destvar->write = f;
647                       }
648                   }
649               } else {
650                 agr_next->src = NULL;
651                 destvar = dict_create_var (agr_dict, dest[i], 0);
652               }
653           
654             if (!destvar)
655               {
656                 msg (SE, _("Variable name %s is not unique within the "
657                            "aggregate file dictionary, which contains "
658                            "the aggregate variables and the break "
659                            "variables."),
660                      dest[i]);
661                 free (dest[i]);
662                 goto lossage;
663               }
664
665             free (dest[i]);
666             destvar->init = 0;
667             if (dest_label[i])
668               {
669                 destvar->label = dest_label[i];
670                 dest_label[i] = NULL;
671               }
672             else if (function->alpha_type == ALPHA)
673               destvar->print = destvar->write = function->format;
674
675             agr_next->dest = destvar;
676           }
677           
678           agr_next->include_missing = include_missing;
679
680           if (agr_next->src != NULL)
681             {
682               int j;
683
684               if (agr_next->src->type == NUMERIC)
685                 for (j = 0; j < function->n_args; j++)
686                   agr_next->arg[j].f = arg[j].f;
687               else
688                 for (j = 0; j < function->n_args; j++)
689                   agr_next->arg[j].c = xstrdup (arg[j].c);
690             }
691         }
692       
693       if (src != NULL && src[0]->type == ALPHA)
694         for (i = 0; i < function->n_args; i++)
695           {
696             free (arg[i].c);
697             arg[i].c = NULL;
698           }
699
700       free (src);
701       free (dest);
702       free (dest_label);
703
704       if (!lex_match ('/'))
705         {
706           if (token == '.')
707             return 1;
708
709           lex_error ("expecting end of command");
710           return 0;
711         }
712       continue;
713       
714     lossage:
715       for (i = 0; i < n_dest; i++)
716         {
717           free (dest[i]);
718           free (dest_label[i]);
719         }
720       free (dest);
721       free (dest_label);
722       free (arg[0].c);
723       free (arg[1].c);
724       if (src && n_src && src[0]->type == ALPHA)
725         for (i = 0; i < function->n_args; i++)
726           {
727             free(arg[i].c);
728             arg[i].c = NULL;
729           }
730       free (src);
731         
732       return 0;
733     }
734 }
735
736 /* Frees all the state for the AGGREGATE procedure. */
737 static void
738 free_aggregate_functions (void)
739 {
740   struct agr_var *iter, *next;
741
742   if (agr_dict)
743     dict_destroy (agr_dict);
744   for (iter = agr_first; iter; iter = next)
745     {
746       next = iter->next;
747
748       if (iter->function & FSTRING)
749         {
750           int n_args;
751           int i;
752
753           n_args = agr_func_tab[iter->function & FUNC].n_args;
754           for (i = 0; i < n_args; i++)
755             free (iter->arg[i].c);
756           free (iter->string);
757         }
758       free (iter);
759     }
760 }
761 \f
762 /* Execution. */
763
764 static void accumulate_aggregate_info (struct ccase *input);
765 static void dump_aggregate_info (struct ccase *output);
766
767 /* Processes a single case INPUT for aggregation.  If output is
768    warranted, it is written to case OUTPUT, which may be (but need not
769    be) an alias to INPUT.  Returns -1 when output is performed, -2
770    otherwise. */
771 /* The code in this function has an eerie similarity to
772    vfm.c:SPLIT_FILE_procfunc()... */
773 static int
774 aggregate_single_case (struct ccase *input, struct ccase *output)
775 {
776   /* The first case always begins a new break group.  We also need to
777      preserve the values of the case for later comparison. */
778   if (case_count++ == 0)
779     {
780       int n_elem = 0;
781       
782       {
783         int i;
784
785         for (i = 0; i < sort->var_cnt; i++)
786           n_elem += sort->vars[i]->nv;
787       }
788       
789       prev_case = xmalloc (sizeof *prev_case * n_elem);
790
791       /* Copy INPUT into prev_case. */
792       {
793         union value *iter = prev_case;
794         int i;
795
796         for (i = 0; i < sort->var_cnt; i++)
797           {
798             struct variable *v = sort->vars[i];
799             
800             if (v->type == NUMERIC)
801               (iter++)->f = input->data[v->fv].f;
802             else
803               {
804                 memcpy (iter->s, input->data[v->fv].s, v->width);
805                 iter += v->nv;
806               }
807           }
808       }
809             
810       accumulate_aggregate_info (input);
811         
812       return -2;
813     }
814       
815   /* Compare the value of each break variable to the values on the
816      previous case. */
817   {
818     union value *iter = prev_case;
819     int i;
820     
821     for (i = 0; i < sort->var_cnt; i++)
822       {
823         struct variable *v = sort->vars[i];
824       
825         switch (v->type)
826           {
827           case NUMERIC:
828             if (input->data[v->fv].f != iter->f)
829               goto not_equal;
830             iter++;
831             break;
832           case ALPHA:
833             if (memcmp (input->data[v->fv].s, iter->s, v->width))
834               goto not_equal;
835             iter += v->nv;
836             break;
837           default:
838             assert (0);
839           }
840       }
841   }
842
843   accumulate_aggregate_info (input);
844
845   return -2;
846   
847 not_equal:
848   /* The values of the break variable are different from the values on
849      the previous case.  That means that it's time to dump aggregate
850      info. */
851   dump_aggregate_info (output);
852   initialize_aggregate_info ();
853   accumulate_aggregate_info (input);
854
855   /* Copy INPUT into prev_case. */
856   {
857     union value *iter = prev_case;
858     int i;
859
860     for (i = 0; i < sort->var_cnt; i++)
861       {
862         struct variable *v = sort->vars[i];
863             
864         if (v->type == NUMERIC)
865           (iter++)->f = input->data[v->fv].f;
866         else
867           {
868             memcpy (iter->s, input->data[v->fv].s, v->width);
869             iter += v->nv;
870           }
871       }
872   }
873   
874   return -1;
875 }
876
877 /* Accumulates aggregation data from the case INPUT. */
878 static void 
879 accumulate_aggregate_info (struct ccase *input)
880 {
881   struct agr_var *iter;
882   double weight;
883
884   weight = dict_get_case_weight (default_dict, input);
885
886   for (iter = agr_first; iter; iter = iter->next)
887     if (iter->src)
888       {
889         union value *v = &input->data[iter->src->fv];
890
891         if ((!iter->include_missing && is_missing (v, iter->src))
892             || (iter->include_missing && iter->src->type == NUMERIC
893                 && v->f == SYSMIS))
894           {
895             switch (iter->function)
896               {
897               case NMISS:
898                 iter->dbl[0] += weight;
899                 break;
900               case NUMISS:
901                 iter->int1++;
902                 break;
903               }
904             iter->missing = 1;
905             continue;
906           }
907         
908         /* This is horrible.  There are too many possibilities. */
909         switch (iter->function)
910           {
911           case SUM:
912             iter->dbl[0] += v->f;
913             break;
914           case MEAN:
915             iter->dbl[0] += v->f * weight;
916             iter->dbl[1] += weight;
917             break;
918           case SD: 
919             {
920               double product = v->f * weight;
921               iter->dbl[0] += product;
922               iter->dbl[1] += product * v->f;
923               iter->dbl[2] += weight;
924               break; 
925             }
926           case MAX:
927             iter->dbl[0] = max (iter->dbl[0], v->f);
928             iter->int1 = 1;
929             break;
930           case MAX | FSTRING:
931             if (memcmp (iter->string, v->s, iter->src->width) < 0)
932               memcpy (iter->string, v->s, iter->src->width);
933             iter->int1 = 1;
934             break;
935           case MIN:
936             iter->dbl[0] = min (iter->dbl[0], v->f);
937             iter->int1 = 1;
938             break;
939           case MIN | FSTRING:
940             if (memcmp (iter->string, v->s, iter->src->width) > 0)
941               memcpy (iter->string, v->s, iter->src->width);
942             iter->int1 = 1;
943             break;
944           case FGT:
945           case PGT:
946             if (v->f > iter->arg[0].f)
947               iter->dbl[0] += weight;
948             iter->dbl[1] += weight;
949             break;
950           case FGT | FSTRING:
951           case PGT | FSTRING:
952             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
953               iter->dbl[0] += weight;
954             iter->dbl[1] += weight;
955             break;
956           case FLT:
957           case PLT:
958             if (v->f < iter->arg[0].f)
959               iter->dbl[0] += weight;
960             iter->dbl[1] += weight;
961             break;
962           case FLT | FSTRING:
963           case PLT | FSTRING:
964             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
965               iter->dbl[0] += weight;
966             iter->dbl[1] += weight;
967             break;
968           case FIN:
969           case PIN:
970             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
971               iter->dbl[0] += weight;
972             iter->dbl[1] += weight;
973             break;
974           case FIN | FSTRING:
975           case PIN | FSTRING:
976             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
977                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
978               iter->dbl[0] += weight;
979             iter->dbl[1] += weight;
980             break;
981           case FOUT:
982           case POUT:
983             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
984               iter->dbl[0] += weight;
985             iter->dbl[1] += weight;
986             break;
987           case FOUT | FSTRING:
988           case POUT | FSTRING:
989             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
990                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
991               iter->dbl[0] += weight;
992             iter->dbl[1] += weight;
993             break;
994           case N:
995             iter->dbl[0] += weight;
996             break;
997           case NU:
998             iter->int1++;
999             break;
1000           case FIRST:
1001             if (iter->int1 == 0)
1002               {
1003                 iter->dbl[0] = v->f;
1004                 iter->int1 = 1;
1005               }
1006             break;
1007           case FIRST | FSTRING:
1008             if (iter->int1 == 0)
1009               {
1010                 memcpy (iter->string, v->s, iter->src->width);
1011                 iter->int1 = 1;
1012               }
1013             break;
1014           case LAST:
1015             iter->dbl[0] = v->f;
1016             iter->int1 = 1;
1017             break;
1018           case LAST | FSTRING:
1019             memcpy (iter->string, v->s, iter->src->width);
1020             iter->int1 = 1;
1021             break;
1022           default:
1023             assert (0);
1024           }
1025     } else {
1026       switch (iter->function)
1027         {
1028         case N_NO_VARS:
1029           iter->dbl[0] += weight;
1030           break;
1031         case NU_NO_VARS:
1032           iter->int1++;
1033           break;
1034         default:
1035           assert (0);
1036         }
1037     }
1038 }
1039
1040 /* We've come to a record that differs from the previous in one or
1041    more of the break variables.  Make an output record from the
1042    accumulated statistics in the OUTPUT case. */
1043 static void 
1044 dump_aggregate_info (struct ccase *output)
1045 {
1046   debug_printf (("(dumping "));
1047   
1048   {
1049     int n_elem = 0;
1050     
1051     {
1052       int i;
1053
1054       for (i = 0; i < sort->var_cnt; i++)
1055         n_elem += sort->vars[i]->nv;
1056     }
1057     debug_printf (("n_elem=%d:", n_elem));
1058     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1059   }
1060   
1061   {
1062     struct agr_var *i;
1063   
1064     for (i = agr_first; i; i = i->next)
1065       {
1066         union value *v = &output->data[i->dest->fv];
1067
1068         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1069
1070         if (missing == COLUMNWISE && i->missing != 0
1071             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1072             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1073           {
1074             if (i->function & FSTRING)
1075               memset (v->s, ' ', i->dest->width);
1076             else
1077               v->f = SYSMIS;
1078             continue;
1079           }
1080         
1081         switch (i->function)
1082           {
1083           case SUM:
1084             v->f = i->dbl[0];
1085             break;
1086           case MEAN:
1087             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1088             break;
1089           case SD:
1090             v->f = ((i->dbl[2] > 1.0)
1091                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1092                     : SYSMIS);
1093             break;
1094           case MAX:
1095           case MIN:
1096             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1097             break;
1098           case MAX | FSTRING:
1099           case MIN | FSTRING:
1100             if (i->int1)
1101               memcpy (v->s, i->string, i->dest->width);
1102             else
1103               memset (v->s, ' ', i->dest->width);
1104             break;
1105           case FGT | FSTRING:
1106           case FLT | FSTRING:
1107           case FIN | FSTRING:
1108           case FOUT | FSTRING:
1109             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1110             break;
1111           case FGT:
1112           case FLT:
1113           case FIN:
1114           case FOUT:
1115             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1116             break;
1117           case PGT:
1118           case PGT | FSTRING:
1119           case PLT:
1120           case PLT | FSTRING:
1121           case PIN:
1122           case PIN | FSTRING:
1123           case POUT:
1124           case POUT | FSTRING:
1125             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1126             break;
1127           case N:
1128             v->f = i->dbl[0];
1129             break;
1130           case NU:
1131             v->f = i->int1;
1132             break;
1133           case FIRST:
1134           case LAST:
1135             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1136             break;
1137           case FIRST | FSTRING:
1138           case LAST | FSTRING:
1139             if (i->int1)
1140               memcpy (v->s, i->string, i->dest->width);
1141             else
1142               memset (v->s, ' ', i->dest->width);
1143             break;
1144           case N_NO_VARS:
1145             v->f = i->dbl[0];
1146             break;
1147           case NU_NO_VARS:
1148             v->f = i->int1;
1149             break;
1150           case NMISS:
1151             v->f = i->dbl[0];
1152             break;
1153           case NUMISS:
1154             v->f = i->int1;
1155             break;
1156           default:
1157             assert (0);
1158           }
1159       }
1160   }
1161   debug_printf ((") "));
1162 }
1163
1164 /* Resets the state for all the aggregate functions. */
1165 static void
1166 initialize_aggregate_info (void)
1167 {
1168   struct agr_var *iter;
1169
1170   for (iter = agr_first; iter; iter = iter->next)
1171     {
1172       iter->missing = 0;
1173       switch (iter->function)
1174         {
1175         case MIN:
1176           iter->dbl[0] = DBL_MAX;
1177           break;
1178         case MIN | FSTRING:
1179           memset (iter->string, 255, iter->src->width);
1180           break;
1181         case MAX:
1182           iter->dbl[0] = -DBL_MAX;
1183           break;
1184         case MAX | FSTRING:
1185           memset (iter->string, 0, iter->src->width);
1186           break;
1187         default:
1188           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1189           iter->int1 = iter->int2 = 0;
1190           break;
1191         }
1192     }
1193 }
1194 \f
1195 /* Aggregate each case as it comes through.  Cases which aren't needed
1196    are dropped. */
1197 static int
1198 agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1199                    int case_num UNUSED)
1200 {
1201   int code = aggregate_single_case (c, compaction_case);
1202   debug_printf (("%d ", code));
1203   return code;
1204 }
1205
1206 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1207    write() method here because end_func is called so soon after all
1208    the cases have been output; very little has been cleaned up at this
1209    point. */
1210 static void
1211 agr_00x_end_func (void *aux UNUSED)
1212 {
1213   /* Ensure that info for the last break group gets written to the
1214      active file. */
1215   dump_aggregate_info (compaction_case);
1216   vfm_sink->class->write (vfm_sink, temp_case);
1217 }
1218
1219 /* Transform the aggregate case buf_1xx, in internal format, to system
1220    file format, in buf64_1xx, and write the resultant case to the
1221    system file. */
1222 static void
1223 write_case_to_sfm (void)
1224 {
1225   flt64 *p = buf64_1xx;
1226   int i;
1227
1228   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1229     {
1230       struct variable *v = dict_get_var (agr_dict, i);
1231       
1232       if (v->type == NUMERIC)
1233         {
1234           double src = buf_1xx->data[v->fv].f;
1235           if (src == SYSMIS)
1236             *p++ = -FLT64_MAX;
1237           else
1238             *p++ = src;
1239         }
1240       else
1241         {
1242           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1243           memset (&((char *) p)[v->width], ' ',
1244                   REM_RND_UP (v->width, sizeof (flt64)));
1245           p += DIV_RND_UP (v->width, sizeof (flt64));
1246         }
1247     }
1248
1249   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1250 }
1251
1252 /* Aggregate the current case and output it if we passed a
1253    breakpoint. */
1254 static int
1255 agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1256                    int case_num UNUSED)
1257 {
1258   int code = aggregate_single_case (c, buf_1xx);
1259
1260   assert (code == -2 || code == -1);
1261   if (code == -1)
1262     write_case_to_sfm ();
1263   return -1;
1264 }
1265
1266 /* Close the system file now that we're done with it.  */
1267 static void
1268 agr_10x_trns_free (struct trns_header *h UNUSED)
1269 {
1270   fh_close_handle (outfile);
1271 }
1272
1273 /* Ensure that info for the last break group gets written to the
1274    system file. */
1275 static void
1276 agr_10x_end_func (void *aux UNUSED)
1277 {
1278   dump_aggregate_info (buf_1xx);
1279   write_case_to_sfm ();
1280 }
1281
1282 /* When called with temp_case non-NULL (the normal case), runs the
1283    case through the aggregater and outputs it to the system file if
1284    appropriate.  If temp_case is NULL, finishes up writing the last
1285    case if necessary. */
1286 static int
1287 agr_11x_func (write_case_data wc_data UNUSED)
1288 {
1289   if (temp_case != NULL)
1290     {
1291       int code = aggregate_single_case (temp_case, buf_1xx);
1292       
1293       assert (code == -2 || code == -1);
1294       if (code == -1)
1295         write_case_to_sfm ();
1296     }
1297   else
1298     {
1299       if (case_count)
1300         {
1301           dump_aggregate_info (buf_1xx);
1302           write_case_to_sfm ();
1303         }
1304       fh_close_handle (outfile);
1305     }
1306   return 1;
1307 }
1308 \f
1309 /* Debugging. */
1310 #if DEBUGGING
1311 /* Print out useful debugging information. */
1312 static void
1313 debug_print (int flags)
1314 {
1315   printf ("AGGREGATE\n /OUTFILE=%s\n",
1316           outfile ? fh_handle_filename (outfile) : "*");
1317
1318   if (missing == COLUMNWISE)
1319     puts (" /MISSING=COLUMNWISE");
1320
1321   if (flags & 2)
1322     puts (" /DOCUMENT");
1323   if (flags & 4)
1324     puts (" /PRESORTED");
1325   
1326   {
1327     int i;
1328
1329     printf (" /BREAK=");
1330     for (i = 0; i < sort->var_cnt; i++)
1331       printf ("%s(%c) ", sort->vars[i]->name,
1332               sort->vars[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1333     putc ('\n', stdout);
1334   }
1335   
1336   {
1337     struct agr_var *iter;
1338     
1339     for (iter = agr_first; iter; iter = iter->next)
1340       {
1341         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1342         
1343         printf (" /%s", iter->dest->name);
1344         if (iter->dest->label)
1345           printf ("'%s'", iter->dest->label);
1346         printf ("=%s(%s", f->name, iter->src->name);
1347         if (f->n_args)
1348           {
1349             int i;
1350             
1351             for (i = 0; i < f->n_args; i++)
1352               {
1353                 putc (',', stdout);
1354                 if (iter->src->type == NUMERIC)
1355                   printf ("%g", iter->arg[i].f);
1356                 else
1357                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1358               }
1359           }
1360         printf (")\n");
1361       }
1362   }
1363 }
1364
1365 #endif /* DEBUGGING */