dbfad40f6d4b2b79b938b6a83ed3c771bfa49b8d
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "settings.h"
30 #include "sfm.h"
31 #include "sort.h"
32 #include "stats.h"
33 #include "str.h"
34 #include "var.h"
35 #include "vfm.h"
36 #include "vfmP.h"
37
38 #include "debug-print.h"
39
40 /* Specifies how to make an aggregate variable. */
41 struct agr_var
42   {
43     struct agr_var *next;               /* Next in list. */
44
45     /* Collected during parsing. */
46     struct variable *src;       /* Source variable. */
47     struct variable *dest;      /* Target variable. */
48     int function;               /* Function. */
49     int include_missing;        /* 1=Include user-missing values. */
50     union value arg[2];         /* Arguments. */
51
52     /* Accumulated during AGGREGATE execution. */
53     double dbl[3];
54     int int1, int2;
55     char *string;
56     int missing;
57   };
58
59 /* Aggregation functions. */
60 enum
61   {
62     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
63     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
64     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
65     FUNC = 0x1f, /* Function mask. */
66     FSTRING = 1<<5, /* String function bit. */
67   };
68
69 /* Attributes of an aggregation function. */
70 struct agr_func
71   {
72     const char *name;           /* Aggregation function name. */
73     int n_args;                 /* Number of arguments. */
74     int alpha_type;             /* When given ALPHA arguments, output type. */
75     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
76   };
77
78 /* Attributes of aggregation functions. */
79 static struct agr_func agr_func_tab[] = 
80   {
81     {"<NONE>",  0, -1,      {0, 0, 0}},
82     {"SUM",     0, -1,      {FMT_F, 8, 2}},
83     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
84     {"SD",      0, -1,      {FMT_F, 8, 2}},
85     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
86     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
87     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
88     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
89     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
90     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
91     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
94     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
95     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
99     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
100     {"LAST",    0, ALPHA,   {-1, -1, -1}},
101     {NULL,      0, -1,      {-1, -1, -1}},
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
104   };
105
106 /* Output file, or NULL for the active file. */
107 static struct file_handle *outfile;
108
109 /* Missing value types. */
110 enum
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* ITEMWISE or COLUMNWISE. */
117 static int missing;
118
119 /* Aggregate variables. */
120 static struct agr_var *agr_first, *agr_next;
121
122 /* Aggregate dictionary. */
123 static struct dictionary *agr_dict;
124
125 /* Number of cases passed through aggregation. */
126 static int case_count;
127
128 /* Last values of the break variables. */
129 static union value *prev_case;
130
131 /* Buffers for use by the 10x transformation. */
132 static flt64 *buf64_1xx;
133 static struct ccase *buf_1xx;
134
135 static void initialize_aggregate_info (void);
136
137 /* Prototypes. */
138 static int parse_aggregate_functions (void);
139 static void free_aggregate_functions (void);
140 static int aggregate_single_case (struct ccase *input, struct ccase *output);
141 static int create_sysfile (void);
142
143 static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
144 static void agr_00x_end_func (void *);
145 static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
146 static void agr_10x_trns_free (struct trns_header *);
147 static void agr_10x_end_func (void *);
148 static int agr_11x_func (write_case_data);
149
150 #if DEBUGGING
151 static void debug_print (int flags);
152 #endif
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (void)
159 {
160   /* From sort.c. */
161   int parse_sort_variables (void);
162   
163   /* Have we seen these subcommands? */
164   unsigned seen = 0;
165
166   outfile = NULL;
167   missing = ITEMWISE;
168   v_sort = NULL;
169   prev_case = NULL;
170   
171   agr_dict = dict_create ();
172   dict_set_label (agr_dict, dict_get_label (default_dict));
173   dict_set_documents (agr_dict, dict_get_documents (default_dict));
174   
175   lex_match_id ("AGGREGATE");
176
177   /* Read most of the subcommands. */
178   for (;;)
179     {
180       lex_match('/');
181       
182       if (lex_match_id ("OUTFILE"))
183         {
184           if (seen & 1)
185             {
186               free (v_sort);
187               dict_destroy (agr_dict);
188               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
189               return CMD_FAILURE;
190             }
191           seen |= 1;
192               
193           lex_match ('=');
194           if (lex_match ('*'))
195             outfile = NULL;
196           else 
197             {
198               outfile = fh_parse_file_handle ();
199               if (outfile == NULL)
200                 {
201                   free (v_sort);
202                   dict_destroy (agr_dict);
203                   return CMD_FAILURE;
204                 }
205             }
206         }
207       else if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               free (v_sort);
213               dict_destroy (agr_dict);
214               lex_error (_("while expecting COLUMNWISE"));
215               return CMD_FAILURE;
216             }
217           missing = COLUMNWISE;
218         }
219       else if (lex_match_id ("DOCUMENT"))
220         seen |= 2;
221       else if (lex_match_id ("PRESORTED"))
222         seen |= 4;
223       else if (lex_match_id ("BREAK"))
224         {
225           if (seen & 8)
226             {
227               free (v_sort);
228               dict_destroy (agr_dict);
229               msg (SE, _("%s subcommand given multiple times."),"BREAK");
230               return CMD_FAILURE;
231             }
232           seen |= 8;
233
234           lex_match ('=');
235           if (!parse_sort_variables ())
236             {
237               dict_destroy (agr_dict);
238               return CMD_FAILURE;
239             }
240           
241           {
242             int i;
243             
244             for (i = 0; i < nv_sort; i++)
245               {
246                 struct variable *v;
247               
248                 v = dict_clone_var (agr_dict, v_sort[i], v_sort[i]->name);
249                 assert (v != NULL);
250               }
251           }
252         }
253       else break;
254     }
255
256   /* Check for proper syntax. */
257   if (!(seen & 8))
258     msg (SW, _("BREAK subcommand not specified."));
259       
260   /* Read in the aggregate functions. */
261   if (!parse_aggregate_functions ())
262     {
263       free_aggregate_functions ();
264       free (v_sort);
265       return CMD_FAILURE;
266     }
267
268   /* Delete documents. */
269   if (!(seen & 2))
270     dict_set_documents (agr_dict, NULL);
271
272   /* Cancel SPLIT FILE. */
273   dict_set_split_vars (agr_dict, NULL, 0);
274   
275 #if DEBUGGING
276   debug_print (seen);
277 #endif
278
279   /* Initialize. */
280   case_count = 0;
281   initialize_aggregate_info ();
282
283   /* How to implement all this... There are three important variables:
284      whether output is going to the active file (0) or a separate file
285      (1); whether the input data is presorted (0) or needs sorting
286      (1); whether there is a temporary transformation (1) or not (0).
287      The eight cases are as follows:
288
289      000 (0): Pass it through an aggregate transformation that
290      modifies the data.
291
292      001 (1): Cancel the temporary transformation and handle as 000.
293
294      010 (2): Set up a SORT CASES and aggregate the output, writing
295      the results to the active file.
296      
297      011 (3): Cancel the temporary transformation and handle as 010.
298
299      100 (4): Pass it through an aggregate transformation that doesn't
300      modify the data but merely writes it to the output file.
301
302      101 (5): Handled as 100.
303
304      110 (6): Set up a SORT CASES and capture the output, aggregate
305      it, write it to the output file without modifying the active
306      file.
307
308      111 (7): Handled as 110. */
309   
310   {
311     unsigned type = 0;
312
313     if (outfile != NULL)
314       type |= 4;
315     if (nv_sort != 0 && (seen & 4) == 0)
316       type |= 2;
317     if (temporary)
318       type |= 1;
319
320     switch (type)
321       {
322       case 3:
323         cancel_temporary ();
324         /* fall through */
325       case 2:
326         sort_cases (0);
327         goto case0;
328           
329       case 1:
330         cancel_temporary ();
331         /* fall through */
332       case 0:
333       case0:
334         {
335           struct trns_header *t = xmalloc (sizeof *t);
336           t->proc = agr_00x_trns_proc;
337           t->free = NULL;
338           add_transformation (t);
339           
340           temporary = 2;
341           temp_dict = agr_dict;
342           temp_trns = n_trns;
343           
344           agr_dict = NULL;
345
346           procedure (NULL, NULL, agr_00x_end_func, NULL);
347           break;
348         }
349
350       case 4:
351       case 5:
352         {
353           if (!create_sysfile ())
354             goto lossage;
355           
356           {
357             struct trns_header *t = xmalloc (sizeof *t);
358             t->proc = agr_10x_trns_proc;
359             t->free = agr_10x_trns_free;
360             add_transformation (t);
361
362             procedure (NULL, NULL, agr_10x_end_func, NULL);
363           }
364           
365           break;
366         }
367           
368       case 6:
369       case 7:
370         sort_cases (1);
371         
372         if (!create_sysfile ())
373           goto lossage;
374         read_sort_output (agr_11x_func, NULL);
375         
376         {
377           struct ccase *save_temp_case = temp_case;
378           temp_case = NULL;
379           agr_11x_func (NULL);
380           temp_case = save_temp_case;
381         }
382         
383         break;
384
385       default:
386         assert (0);
387       }
388   }
389   
390   free (buf64_1xx);
391   free (buf_1xx);
392   
393   /* Clean up. */
394   free (v_sort);
395   free_aggregate_functions ();
396   free (prev_case);
397   
398   return CMD_SUCCESS;
399
400 lossage:
401   /* Clean up. */
402   free (v_sort);
403   free_aggregate_functions ();
404   free (prev_case);
405
406   return CMD_FAILURE;
407 }
408
409 /* Create a system file for use in aggregation to an external file,
410    and allocate temporary buffers for writing out cases. */
411 static int
412 create_sysfile (void)
413 {
414   struct sfm_write_info w;
415   w.h = outfile;
416   w.dict = agr_dict;
417   w.compress = set_scompression;
418   if (!sfm_write_dictionary (&w))
419     {
420       free_aggregate_functions ();
421       free (v_sort);
422       dict_destroy (agr_dict);
423       return 0;
424     }
425     
426   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
427   buf_1xx = xmalloc (dict_get_case_size (agr_dict));
428
429   return 1;
430 }
431
432 /* Parse all the aggregate functions. */
433 static int
434 parse_aggregate_functions (void)
435 {
436   agr_first = agr_next = NULL;
437
438   /* Parse everything. */
439   for (;;)
440     {
441       char **dest;
442       char **dest_label;
443       int n_dest;
444
445       int include_missing;
446       struct agr_func *function;
447       int func_index;
448
449       union value arg[2];
450
451       struct variable **src;
452       int n_src;
453
454       int i;
455
456       dest = NULL;
457       dest_label = NULL;
458       n_dest = 0;
459       src = NULL;
460       n_src = 0;
461       arg[0].c = NULL;
462       arg[1].c = NULL;
463
464       /* Parse the list of target variables. */
465       while (!lex_match ('='))
466         {
467           int n_dest_prev = n_dest;
468           
469           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
470             goto lossage;
471
472           /* Assign empty labels. */
473           {
474             int j;
475
476             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
477             for (j = n_dest_prev; j < n_dest; j++)
478               dest_label[j] = NULL;
479           }
480           
481           if (token == T_STRING)
482             {
483               ds_truncate (&tokstr, 120);
484               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
485               lex_get ();
486             }
487         }
488
489       /* Get the name of the aggregation function. */
490       if (token != T_ID)
491         {
492           lex_error (_("expecting aggregation function"));
493           goto lossage;
494         }
495
496       include_missing = 0;
497       if (tokid[strlen (tokid) - 1] == '.')
498         {
499           include_missing = 1;
500           tokid[strlen (tokid) - 1] = 0;
501         }
502       
503       for (function = agr_func_tab; function->name; function++)
504         if (!strcmp (function->name, tokid))
505           break;
506       if (NULL == function->name)
507         {
508           msg (SE, _("Unknown aggregation function %s."), tokid);
509           goto lossage;
510         }
511       func_index = function - agr_func_tab;
512       lex_get ();
513
514       /* Check for leading lparen. */
515       if (!lex_match ('('))
516         {
517           if (func_index == N)
518             func_index = N_NO_VARS;
519           else if (func_index == NU)
520             func_index = NU_NO_VARS;
521           else
522             {
523               lex_error (_("expecting `('"));
524               goto lossage;
525             }
526         } else {
527           /* Parse list of source variables. */
528           {
529             int pv_opts = PV_NO_SCRATCH;
530
531             if (func_index == SUM || func_index == MEAN || func_index == SD)
532               pv_opts |= PV_NUMERIC;
533             else if (function->n_args)
534               pv_opts |= PV_SAME_TYPE;
535
536             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
537               goto lossage;
538           }
539
540           /* Parse function arguments, for those functions that
541              require arguments. */
542           if (function->n_args != 0)
543             for (i = 0; i < function->n_args; i++)
544               {
545                 int type;
546             
547                 lex_match (',');
548                 if (token == T_STRING)
549                   {
550                     arg[i].c = xstrdup (ds_value (&tokstr));
551                     type = ALPHA;
552                   }
553                 else if (token == T_NUM)
554                   {
555                     arg[i].f = tokval;
556                     type = NUMERIC;
557                   } else {
558                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
559                     goto lossage;
560                   }
561             
562                 lex_get ();
563
564                 if (type != src[0]->type)
565                   {
566                     msg (SE, _("Arguments to %s must be of same type as "
567                                "source variables."),
568                          function->name);
569                     goto lossage;
570                   }
571               }
572
573           /* Trailing rparen. */
574           if (!lex_match(')'))
575             {
576               lex_error (_("expecting `)'"));
577               goto lossage;
578             }
579           
580           /* Now check that the number of source variables match the
581              number of target variables.  Do this here because if we
582              do it earlier then the user can get very misleading error
583              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
584              error message when a proper message would be more like
585              `unknown variable t'. */
586           if (n_src != n_dest)
587             {
588               msg (SE, _("Number of source variables (%d) does not match "
589                          "number of target variables (%d)."),
590                    n_src, n_dest);
591               goto lossage;
592             }
593         }
594         
595       /* Finally add these to the linked list of aggregation
596          variables. */
597       for (i = 0; i < n_dest; i++)
598         {
599           struct agr_var *v = xmalloc (sizeof *v);
600
601           /* Add variable to chain. */
602           if (agr_first)
603             agr_next = agr_next->next = v;
604           else
605             agr_first = agr_next = v;
606           agr_next->next = NULL;
607           
608           /* Create the target variable in the aggregate
609              dictionary. */
610           {
611             struct variable *destvar;
612             
613             agr_next->function = func_index;
614
615             if (src)
616               {
617                 int output_width;
618
619                 agr_next->src = src[i];
620                 
621                 if (src[i]->type == ALPHA)
622                   {
623                     agr_next->function |= FSTRING;
624                     agr_next->string = xmalloc (src[i]->width);
625                   }
626                 
627                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
628                   output_width = 0;
629                 else
630                   output_width = agr_next->src->width;
631
632                 if (function->alpha_type == ALPHA)
633                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
634                 else
635                   {
636                     destvar = dict_create_var (agr_dict, dest[i], output_width);
637                     if (output_width == 0)
638                       destvar->print = destvar->write = function->format;
639                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
640                         && (func_index == N || func_index == N_NO_VARS
641                             || func_index == NU || func_index == NU_NO_VARS))
642                       {
643                         struct fmt_spec f = {FMT_F, 8, 2};
644                       
645                         destvar->print = destvar->write = f;
646                       }
647                   }
648               } else {
649                 agr_next->src = NULL;
650                 destvar = dict_create_var (agr_dict, dest[i], 0);
651               }
652           
653             if (!destvar)
654               {
655                 msg (SE, _("Variable name %s is not unique within the "
656                            "aggregate file dictionary, which contains "
657                            "the aggregate variables and the break "
658                            "variables."),
659                      dest[i]);
660                 free (dest[i]);
661                 goto lossage;
662               }
663
664             free (dest[i]);
665             destvar->init = 0;
666             if (dest_label[i])
667               {
668                 destvar->label = dest_label[i];
669                 dest_label[i] = NULL;
670               }
671             else if (function->alpha_type == ALPHA)
672               destvar->print = destvar->write = function->format;
673
674             agr_next->dest = destvar;
675           }
676           
677           agr_next->include_missing = include_missing;
678
679           if (agr_next->src != NULL)
680             {
681               int j;
682
683               if (agr_next->src->type == NUMERIC)
684                 for (j = 0; j < function->n_args; j++)
685                   agr_next->arg[j].f = arg[j].f;
686               else
687                 for (j = 0; j < function->n_args; j++)
688                   agr_next->arg[j].c = xstrdup (arg[j].c);
689             }
690         }
691       
692       if (src != NULL && src[0]->type == ALPHA)
693         for (i = 0; i < function->n_args; i++)
694           {
695             free (arg[i].c);
696             arg[i].c = NULL;
697           }
698
699       free (src);
700       free (dest);
701       free (dest_label);
702
703       if (!lex_match ('/'))
704         {
705           if (token == '.')
706             return 1;
707
708           lex_error ("expecting end of command");
709           return 0;
710         }
711       continue;
712       
713     lossage:
714       for (i = 0; i < n_dest; i++)
715         {
716           free (dest[i]);
717           free (dest_label[i]);
718         }
719       free (dest);
720       free (dest_label);
721       free (arg[0].c);
722       free (arg[1].c);
723       if (src && n_src && src[0]->type == ALPHA)
724         for (i = 0; i < function->n_args; i++)
725           {
726             free(arg[i].c);
727             arg[i].c = NULL;
728           }
729       free (src);
730         
731       return 0;
732     }
733 }
734
735 /* Frees all the state for the AGGREGATE procedure. */
736 static void
737 free_aggregate_functions (void)
738 {
739   struct agr_var *iter, *next;
740
741   if (agr_dict)
742     dict_destroy (agr_dict);
743   for (iter = agr_first; iter; iter = next)
744     {
745       next = iter->next;
746
747       if (iter->function & FSTRING)
748         {
749           int n_args;
750           int i;
751
752           n_args = agr_func_tab[iter->function & FUNC].n_args;
753           for (i = 0; i < n_args; i++)
754             free (iter->arg[i].c);
755           free (iter->string);
756         }
757       free (iter);
758     }
759 }
760 \f
761 /* Execution. */
762
763 static void accumulate_aggregate_info (struct ccase *input);
764 static void dump_aggregate_info (struct ccase *output);
765
766 /* Processes a single case INPUT for aggregation.  If output is
767    warranted, it is written to case OUTPUT, which may be (but need not
768    be) an alias to INPUT.  Returns -1 when output is performed, -2
769    otherwise. */
770 /* The code in this function has an eerie similarity to
771    vfm.c:SPLIT_FILE_procfunc()... */
772 static int
773 aggregate_single_case (struct ccase *input, struct ccase *output)
774 {
775   /* The first case always begins a new break group.  We also need to
776      preserve the values of the case for later comparison. */
777   if (case_count++ == 0)
778     {
779       int n_elem = 0;
780       
781       {
782         int i;
783
784         for (i = 0; i < nv_sort; i++)
785           n_elem += v_sort[i]->nv;
786       }
787       
788       prev_case = xmalloc (sizeof *prev_case * n_elem);
789
790       /* Copy INPUT into prev_case. */
791       {
792         union value *iter = prev_case;
793         int i;
794
795         for (i = 0; i < nv_sort; i++)
796           {
797             struct variable *v = v_sort[i];
798             
799             if (v->type == NUMERIC)
800               (iter++)->f = input->data[v->fv].f;
801             else
802               {
803                 memcpy (iter->s, input->data[v->fv].s, v->width);
804                 iter += v->nv;
805               }
806           }
807       }
808             
809       accumulate_aggregate_info (input);
810         
811       return -2;
812     }
813       
814   /* Compare the value of each break variable to the values on the
815      previous case. */
816   {
817     union value *iter = prev_case;
818     int i;
819     
820     for (i = 0; i < nv_sort; i++)
821       {
822         struct variable *v = v_sort[i];
823       
824         switch (v->type)
825           {
826           case NUMERIC:
827             if (input->data[v->fv].f != iter->f)
828               goto not_equal;
829             iter++;
830             break;
831           case ALPHA:
832             if (memcmp (input->data[v->fv].s, iter->s, v->width))
833               goto not_equal;
834             iter += v->nv;
835             break;
836           default:
837             assert (0);
838           }
839       }
840   }
841
842   accumulate_aggregate_info (input);
843
844   return -2;
845   
846 not_equal:
847   /* The values of the break variable are different from the values on
848      the previous case.  That means that it's time to dump aggregate
849      info. */
850   dump_aggregate_info (output);
851   initialize_aggregate_info ();
852   accumulate_aggregate_info (input);
853
854   /* Copy INPUT into prev_case. */
855   {
856     union value *iter = prev_case;
857     int i;
858
859     for (i = 0; i < nv_sort; i++)
860       {
861         struct variable *v = v_sort[i];
862             
863         if (v->type == NUMERIC)
864           (iter++)->f = input->data[v->fv].f;
865         else
866           {
867             memcpy (iter->s, input->data[v->fv].s, v->width);
868             iter += v->nv;
869           }
870       }
871   }
872   
873   return -1;
874 }
875
876 /* Accumulates aggregation data from the case INPUT. */
877 static void 
878 accumulate_aggregate_info (struct ccase *input)
879 {
880   struct agr_var *iter;
881   double weight;
882
883   weight = dict_get_case_weight (default_dict, input);
884
885   for (iter = agr_first; iter; iter = iter->next)
886     if (iter->src)
887       {
888         union value *v = &input->data[iter->src->fv];
889
890         if ((!iter->include_missing && is_missing (v, iter->src))
891             || (iter->include_missing && iter->src->type == NUMERIC
892                 && v->f == SYSMIS))
893           {
894             switch (iter->function)
895               {
896               case NMISS:
897                 iter->dbl[0] += weight;
898                 break;
899               case NUMISS:
900                 iter->int1++;
901                 break;
902               }
903             iter->missing = 1;
904             continue;
905           }
906         
907         /* This is horrible.  There are too many possibilities. */
908         switch (iter->function)
909           {
910           case SUM:
911             iter->dbl[0] += v->f;
912             break;
913           case MEAN:
914             iter->dbl[0] += v->f * weight;
915             iter->dbl[1] += weight;
916             break;
917           case SD: 
918             {
919               double product = v->f * weight;
920               iter->dbl[0] += product;
921               iter->dbl[1] += product * v->f;
922               iter->dbl[2] += weight;
923               break; 
924             }
925           case MAX:
926             iter->dbl[0] = max (iter->dbl[0], v->f);
927             iter->int1 = 1;
928             break;
929           case MAX | FSTRING:
930             if (memcmp (iter->string, v->s, iter->src->width) < 0)
931               memcpy (iter->string, v->s, iter->src->width);
932             iter->int1 = 1;
933             break;
934           case MIN:
935             iter->dbl[0] = min (iter->dbl[0], v->f);
936             iter->int1 = 1;
937             break;
938           case MIN | FSTRING:
939             if (memcmp (iter->string, v->s, iter->src->width) > 0)
940               memcpy (iter->string, v->s, iter->src->width);
941             iter->int1 = 1;
942             break;
943           case FGT:
944           case PGT:
945             if (v->f > iter->arg[0].f)
946               iter->dbl[0] += weight;
947             iter->dbl[1] += weight;
948             break;
949           case FGT | FSTRING:
950           case PGT | FSTRING:
951             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
952               iter->dbl[0] += weight;
953             iter->dbl[1] += weight;
954             break;
955           case FLT:
956           case PLT:
957             if (v->f < iter->arg[0].f)
958               iter->dbl[0] += weight;
959             iter->dbl[1] += weight;
960             break;
961           case FLT | FSTRING:
962           case PLT | FSTRING:
963             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
964               iter->dbl[0] += weight;
965             iter->dbl[1] += weight;
966             break;
967           case FIN:
968           case PIN:
969             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
970               iter->dbl[0] += weight;
971             iter->dbl[1] += weight;
972             break;
973           case FIN | FSTRING:
974           case PIN | FSTRING:
975             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
976                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
977               iter->dbl[0] += weight;
978             iter->dbl[1] += weight;
979             break;
980           case FOUT:
981           case POUT:
982             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
983               iter->dbl[0] += weight;
984             iter->dbl[1] += weight;
985             break;
986           case FOUT | FSTRING:
987           case POUT | FSTRING:
988             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
989                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
990               iter->dbl[0] += weight;
991             iter->dbl[1] += weight;
992             break;
993           case N:
994             iter->dbl[0] += weight;
995             break;
996           case NU:
997             iter->int1++;
998             break;
999           case FIRST:
1000             if (iter->int1 == 0)
1001               {
1002                 iter->dbl[0] = v->f;
1003                 iter->int1 = 1;
1004               }
1005             break;
1006           case FIRST | FSTRING:
1007             if (iter->int1 == 0)
1008               {
1009                 memcpy (iter->string, v->s, iter->src->width);
1010                 iter->int1 = 1;
1011               }
1012             break;
1013           case LAST:
1014             iter->dbl[0] = v->f;
1015             iter->int1 = 1;
1016             break;
1017           case LAST | FSTRING:
1018             memcpy (iter->string, v->s, iter->src->width);
1019             iter->int1 = 1;
1020             break;
1021           default:
1022             assert (0);
1023           }
1024     } else {
1025       switch (iter->function)
1026         {
1027         case N_NO_VARS:
1028           iter->dbl[0] += weight;
1029           break;
1030         case NU_NO_VARS:
1031           iter->int1++;
1032           break;
1033         default:
1034           assert (0);
1035         }
1036     }
1037 }
1038
1039 /* We've come to a record that differs from the previous in one or
1040    more of the break variables.  Make an output record from the
1041    accumulated statistics in the OUTPUT case. */
1042 static void 
1043 dump_aggregate_info (struct ccase *output)
1044 {
1045   debug_printf (("(dumping "));
1046   
1047   {
1048     int n_elem = 0;
1049     
1050     {
1051       int i;
1052
1053       for (i = 0; i < nv_sort; i++)
1054         n_elem += v_sort[i]->nv;
1055     }
1056     debug_printf (("n_elem=%d:", n_elem));
1057     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1058   }
1059   
1060   {
1061     struct agr_var *i;
1062   
1063     for (i = agr_first; i; i = i->next)
1064       {
1065         union value *v = &output->data[i->dest->fv];
1066
1067         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1068
1069         if (missing == COLUMNWISE && i->missing != 0
1070             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1071             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1072           {
1073             if (i->function & FSTRING)
1074               memset (v->s, ' ', i->dest->width);
1075             else
1076               v->f = SYSMIS;
1077             continue;
1078           }
1079         
1080         switch (i->function)
1081           {
1082           case SUM:
1083             v->f = i->dbl[0];
1084             break;
1085           case MEAN:
1086             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1087             break;
1088           case SD:
1089             v->f = ((i->dbl[2] > 1.0)
1090                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1091                     : SYSMIS);
1092             break;
1093           case MAX:
1094           case MIN:
1095             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1096             break;
1097           case MAX | FSTRING:
1098           case MIN | FSTRING:
1099             if (i->int1)
1100               memcpy (v->s, i->string, i->dest->width);
1101             else
1102               memset (v->s, ' ', i->dest->width);
1103             break;
1104           case FGT | FSTRING:
1105           case FLT | FSTRING:
1106           case FIN | FSTRING:
1107           case FOUT | FSTRING:
1108             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1109             break;
1110           case FGT:
1111           case FLT:
1112           case FIN:
1113           case FOUT:
1114             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1115             break;
1116           case PGT:
1117           case PGT | FSTRING:
1118           case PLT:
1119           case PLT | FSTRING:
1120           case PIN:
1121           case PIN | FSTRING:
1122           case POUT:
1123           case POUT | FSTRING:
1124             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1125             break;
1126           case N:
1127             v->f = i->dbl[0];
1128             break;
1129           case NU:
1130             v->f = i->int1;
1131             break;
1132           case FIRST:
1133           case LAST:
1134             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1135             break;
1136           case FIRST | FSTRING:
1137           case LAST | FSTRING:
1138             if (i->int1)
1139               memcpy (v->s, i->string, i->dest->width);
1140             else
1141               memset (v->s, ' ', i->dest->width);
1142             break;
1143           case N_NO_VARS:
1144             v->f = i->dbl[0];
1145             break;
1146           case NU_NO_VARS:
1147             v->f = i->int1;
1148             break;
1149           case NMISS:
1150             v->f = i->dbl[0];
1151             break;
1152           case NUMISS:
1153             v->f = i->int1;
1154             break;
1155           default:
1156             assert (0);
1157           }
1158       }
1159   }
1160   debug_printf ((") "));
1161 }
1162
1163 /* Resets the state for all the aggregate functions. */
1164 static void
1165 initialize_aggregate_info (void)
1166 {
1167   struct agr_var *iter;
1168
1169   for (iter = agr_first; iter; iter = iter->next)
1170     {
1171       iter->missing = 0;
1172       switch (iter->function)
1173         {
1174         case MIN:
1175           iter->dbl[0] = DBL_MAX;
1176           break;
1177         case MIN | FSTRING:
1178           memset (iter->string, 255, iter->src->width);
1179           break;
1180         case MAX:
1181           iter->dbl[0] = -DBL_MAX;
1182           break;
1183         case MAX | FSTRING:
1184           memset (iter->string, 0, iter->src->width);
1185           break;
1186         default:
1187           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1188           iter->int1 = iter->int2 = 0;
1189           break;
1190         }
1191     }
1192 }
1193 \f
1194 /* Aggregate each case as it comes through.  Cases which aren't needed
1195    are dropped. */
1196 static int
1197 agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c)
1198 {
1199   int code = aggregate_single_case (c, compaction_case);
1200   debug_printf (("%d ", code));
1201   return code;
1202 }
1203
1204 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1205    write() method here because end_func is called so soon after all
1206    the cases have been output; very little has been cleaned up at this
1207    point. */
1208 static void
1209 agr_00x_end_func (void *aux UNUSED)
1210 {
1211   /* Ensure that info for the last break group gets written to the
1212      active file. */
1213   dump_aggregate_info (compaction_case);
1214   vfm_sink_info.ncases++;
1215   vfm_sink->write ();
1216 }
1217
1218 /* Transform the aggregate case buf_1xx, in internal format, to system
1219    file format, in buf64_1xx, and write the resultant case to the
1220    system file. */
1221 static void
1222 write_case_to_sfm (void)
1223 {
1224   flt64 *p = buf64_1xx;
1225   int i;
1226
1227   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1228     {
1229       struct variable *v = dict_get_var (agr_dict, i);
1230       
1231       if (v->type == NUMERIC)
1232         {
1233           double src = buf_1xx->data[v->fv].f;
1234           if (src == SYSMIS)
1235             *p++ = -FLT64_MAX;
1236           else
1237             *p++ = src;
1238         }
1239       else
1240         {
1241           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1242           memset (&((char *) p)[v->width], ' ',
1243                   REM_RND_UP (v->width, sizeof (flt64)));
1244           p += DIV_RND_UP (v->width, sizeof (flt64));
1245         }
1246     }
1247
1248   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1249 }
1250
1251 /* Aggregate the current case and output it if we passed a
1252    breakpoint. */
1253 static int
1254 agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c)
1255 {
1256   int code = aggregate_single_case (c, buf_1xx);
1257
1258   assert (code == -2 || code == -1);
1259   if (code == -1)
1260     write_case_to_sfm ();
1261   return -1;
1262 }
1263
1264 /* Close the system file now that we're done with it.  */
1265 static void
1266 agr_10x_trns_free (struct trns_header *h UNUSED)
1267 {
1268   fh_close_handle (outfile);
1269 }
1270
1271 /* Ensure that info for the last break group gets written to the
1272    system file. */
1273 static void
1274 agr_10x_end_func (void *aux UNUSED)
1275 {
1276   dump_aggregate_info (buf_1xx);
1277   write_case_to_sfm ();
1278 }
1279
1280 /* When called with temp_case non-NULL (the normal case), runs the
1281    case through the aggregater and outputs it to the system file if
1282    appropriate.  If temp_case is NULL, finishes up writing the last
1283    case if necessary. */
1284 static int
1285 agr_11x_func (write_case_data wc_data UNUSED)
1286 {
1287   if (temp_case != NULL)
1288     {
1289       int code = aggregate_single_case (temp_case, buf_1xx);
1290       
1291       assert (code == -2 || code == -1);
1292       if (code == -1)
1293         write_case_to_sfm ();
1294     }
1295   else
1296     {
1297       if (case_count)
1298         {
1299           dump_aggregate_info (buf_1xx);
1300           write_case_to_sfm ();
1301         }
1302       fh_close_handle (outfile);
1303     }
1304   return 1;
1305 }
1306 \f
1307 /* Debugging. */
1308 #if DEBUGGING
1309 /* Print out useful debugging information. */
1310 static void
1311 debug_print (int flags)
1312 {
1313   printf ("AGGREGATE\n /OUTFILE=%s\n",
1314           outfile ? fh_handle_filename (outfile) : "*");
1315
1316   if (missing == COLUMNWISE)
1317     puts (" /MISSING=COLUMNWISE");
1318
1319   if (flags & 2)
1320     puts (" /DOCUMENT");
1321   if (flags & 4)
1322     puts (" /PRESORTED");
1323   
1324   {
1325     int i;
1326
1327     printf (" /BREAK=");
1328     for (i = 0; i < nv_sort; i++)
1329       printf ("%s(%c) ", v_sort[i]->name,
1330               v_sort[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1331     putc ('\n', stdout);
1332   }
1333   
1334   {
1335     struct agr_var *iter;
1336     
1337     for (iter = agr_first; iter; iter = iter->next)
1338       {
1339         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1340         
1341         printf (" /%s", iter->dest->name);
1342         if (iter->dest->label)
1343           printf ("'%s'", iter->dest->label);
1344         printf ("=%s(%s", f->name, iter->src->name);
1345         if (f->n_args)
1346           {
1347             int i;
1348             
1349             for (i = 0; i < f->n_args; i++)
1350               {
1351                 putc (',', stdout);
1352                 if (iter->src->type == NUMERIC)
1353                   printf ("%g", iter->arg[i].f);
1354                 else
1355                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1356               }
1357           }
1358         printf (")\n");
1359       }
1360   }
1361 }
1362
1363 #endif /* DEBUGGING */