Sat Dec 27 16:16:49 2003 Ben Pfaff <blp@gnu.org>
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "settings.h"
30 #include "sfm.h"
31 #include "sort.h"
32 #include "stats.h"
33 #include "str.h"
34 #include "var.h"
35 #include "vfm.h"
36 #include "vfmP.h"
37
38 #include "debug-print.h"
39
40 /* Specifies how to make an aggregate variable. */
41 struct agr_var
42   {
43     struct agr_var *next;               /* Next in list. */
44
45     /* Collected during parsing. */
46     struct variable *src;       /* Source variable. */
47     struct variable *dest;      /* Target variable. */
48     int function;               /* Function. */
49     int include_missing;        /* 1=Include user-missing values. */
50     union value arg[2];         /* Arguments. */
51
52     /* Accumulated during AGGREGATE execution. */
53     double dbl[3];
54     int int1, int2;
55     char *string;
56     int missing;
57   };
58
59 /* Aggregation functions. */
60 enum
61   {
62     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
63     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
64     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
65     FUNC = 0x1f, /* Function mask. */
66     FSTRING = 1<<5, /* String function bit. */
67   };
68
69 /* Attributes of an aggregation function. */
70 struct agr_func
71   {
72     const char *name;           /* Aggregation function name. */
73     int n_args;                 /* Number of arguments. */
74     int alpha_type;             /* When given ALPHA arguments, output type. */
75     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
76   };
77
78 /* Attributes of aggregation functions. */
79 static struct agr_func agr_func_tab[] = 
80   {
81     {"<NONE>",  0, -1,      {0, 0, 0}},
82     {"SUM",     0, -1,      {FMT_F, 8, 2}},
83     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
84     {"SD",      0, -1,      {FMT_F, 8, 2}},
85     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
86     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
87     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
88     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
89     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
90     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
91     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
94     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
95     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
99     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
100     {"LAST",    0, ALPHA,   {-1, -1, -1}},
101     {NULL,      0, -1,      {-1, -1, -1}},
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
104   };
105
106 /* Output file, or NULL for the active file. */
107 static struct file_handle *outfile;
108
109 /* Missing value types. */
110 enum
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* ITEMWISE or COLUMNWISE. */
117 static int missing;
118
119 /* Aggregate variables. */
120 static struct agr_var *agr_first, *agr_next;
121
122 /* Aggregate dictionary. */
123 static struct dictionary *agr_dict;
124
125 /* Number of cases passed through aggregation. */
126 static int case_count;
127
128 /* Last values of the break variables. */
129 static union value *prev_case;
130
131 /* Buffers for use by the 10x transformation. */
132 static flt64 *buf64_1xx;
133 static struct ccase *buf_1xx;
134
135 static void initialize_aggregate_info (void);
136
137 /* Prototypes. */
138 static int parse_aggregate_functions (void);
139 static void free_aggregate_functions (void);
140 static int aggregate_single_case (struct ccase *input, struct ccase *output);
141 static int create_sysfile (void);
142
143 static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
144 static void agr_00x_end_func (void);
145 static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
146 static void agr_10x_trns_free (struct trns_header *);
147 static void agr_10x_end_func (void);
148 static int agr_11x_func (void);
149
150 #if DEBUGGING
151 static void debug_print (int flags);
152 #endif
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (void)
159 {
160   /* From sort.c. */
161   int parse_sort_variables (void);
162   
163   /* Have we seen these subcommands? */
164   unsigned seen = 0;
165
166   outfile = NULL;
167   missing = ITEMWISE;
168   v_sort = NULL;
169   prev_case = NULL;
170   
171   agr_dict = dict_create ();
172   dict_set_label (agr_dict, dict_get_label (default_dict));
173   dict_set_documents (agr_dict, dict_get_documents (default_dict));
174   
175   lex_match_id ("AGGREGATE");
176
177   /* Read most of the subcommands. */
178   for (;;)
179     {
180       lex_match('/');
181       
182       if (lex_match_id ("OUTFILE"))
183         {
184           if (seen & 1)
185             {
186               free (v_sort);
187               dict_destroy (agr_dict);
188               msg (SE, _("OUTFILE specified multiple times."));
189               return CMD_FAILURE;
190             }
191           seen |= 1;
192               
193           lex_match ('=');
194           if (lex_match ('*'))
195             outfile = NULL;
196           else 
197             {
198               outfile = fh_parse_file_handle ();
199               if (outfile == NULL)
200                 {
201                   free (v_sort);
202                   dict_destroy (agr_dict);
203                   return CMD_FAILURE;
204                 }
205             }
206         }
207       else if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               free (v_sort);
213               dict_destroy (agr_dict);
214               lex_error (_("while expecting COLUMNWISE"));
215               return CMD_FAILURE;
216             }
217           missing = COLUMNWISE;
218         }
219       else if (lex_match_id ("DOCUMENT"))
220         seen |= 2;
221       else if (lex_match_id ("PRESORTED"))
222         seen |= 4;
223       else if (lex_match_id ("BREAK"))
224         {
225           if (seen & 8)
226             {
227               free (v_sort);
228               dict_destroy (agr_dict);
229               msg (SE, _("BREAK specified multiple times."));
230               return CMD_FAILURE;
231             }
232           seen |= 8;
233
234           lex_match ('=');
235           if (!parse_sort_variables ())
236             {
237               dict_destroy (agr_dict);
238               return CMD_FAILURE;
239             }
240           
241           {
242             int i;
243             
244             for (i = 0; i < nv_sort; i++)
245               {
246                 struct variable *v;
247               
248                 v = dict_clone_var (agr_dict, v_sort[i], v_sort[i]->name);
249                 assert (v != NULL);
250               }
251           }
252         }
253       else break;
254     }
255
256   /* Check for proper syntax. */
257   if (!(seen & 8))
258     msg (SW, _("BREAK subcommand not specified."));
259       
260   /* Read in the aggregate functions. */
261   if (!parse_aggregate_functions ())
262     {
263       free_aggregate_functions ();
264       free (v_sort);
265       return CMD_FAILURE;
266     }
267
268   /* Delete documents. */
269   if (!(seen & 2))
270     dict_set_documents (agr_dict, NULL);
271
272   /* Cancel SPLIT FILE. */
273   dict_set_split_vars (agr_dict, NULL, 0);
274   
275 #if DEBUGGING
276   debug_print (seen);
277 #endif
278
279   /* Initialize. */
280   case_count = 0;
281   initialize_aggregate_info ();
282
283   /* How to implement all this... There are three important variables:
284      whether output is going to the active file (0) or a separate file
285      (1); whether the input data is presorted (0) or needs sorting
286      (1); whether there is a temporary transformation (1) or not (0).
287      The eight cases are as follows:
288
289      000 (0): Pass it through an aggregate transformation that
290      modifies the data.
291
292      001 (1): Cancel the temporary transformation and handle as 000.
293
294      010 (2): Set up a SORT CASES and aggregate the output, writing
295      the results to the active file.
296      
297      011 (3): Cancel the temporary transformation and handle as 010.
298
299      100 (4): Pass it through an aggregate transformation that doesn't
300      modify the data but merely writes it to the output file.
301
302      101 (5): Handled as 100.
303
304      110 (6): Set up a SORT CASES and capture the output, aggregate
305      it, write it to the output file without modifying the active
306      file.
307
308      111 (7): Handled as 110. */
309   
310   {
311     unsigned type = 0;
312
313     if (outfile != NULL)
314       type |= 4;
315     if (nv_sort != 0 && (seen & 4) == 0)
316       type |= 2;
317     if (temporary)
318       type |= 1;
319
320     switch (type)
321       {
322       case 3:
323         cancel_temporary ();
324         /* fall through */
325       case 2:
326         sort_cases (0);
327         goto case0;
328           
329       case 1:
330         cancel_temporary ();
331         /* fall through */
332       case 0:
333       case0:
334         {
335           struct trns_header *t = xmalloc (sizeof *t);
336           t->proc = agr_00x_trns_proc;
337           t->free = NULL;
338           add_transformation (t);
339           
340           temporary = 2;
341           temp_dict = agr_dict;
342           temp_trns = n_trns;
343           
344           agr_dict = NULL;
345
346           procedure (NULL, NULL, agr_00x_end_func);
347           break;
348         }
349
350       case 4:
351       case 5:
352         {
353           if (!create_sysfile ())
354             goto lossage;
355           
356           {
357             struct trns_header *t = xmalloc (sizeof *t);
358             t->proc = agr_10x_trns_proc;
359             t->free = agr_10x_trns_free;
360             add_transformation (t);
361
362             procedure (NULL, NULL, agr_10x_end_func);
363           }
364           
365           break;
366         }
367           
368       case 6:
369       case 7:
370         sort_cases (1);
371         
372         if (!create_sysfile ())
373           goto lossage;
374         read_sort_output (agr_11x_func);
375         
376         {
377           struct ccase *save_temp_case = temp_case;
378           temp_case = NULL;
379           agr_11x_func ();
380           temp_case = save_temp_case;
381         }
382         
383         break;
384
385       default:
386         assert (0);
387       }
388   }
389   
390   free (buf64_1xx);
391   free (buf_1xx);
392   
393   /* Clean up. */
394   free (v_sort);
395   free_aggregate_functions ();
396   free (prev_case);
397   
398   return CMD_SUCCESS;
399
400 lossage:
401   /* Clean up. */
402   free (v_sort);
403   free_aggregate_functions ();
404   free (prev_case);
405
406   return CMD_FAILURE;
407 }
408
409 /* Create a system file for use in aggregation to an external file,
410    and allocate temporary buffers for writing out cases. */
411 static int
412 create_sysfile (void)
413 {
414   struct sfm_write_info w;
415   w.h = outfile;
416   w.dict = agr_dict;
417   w.compress = set_scompression;
418   if (!sfm_write_dictionary (&w))
419     {
420       free_aggregate_functions ();
421       free (v_sort);
422       dict_destroy (agr_dict);
423       return 0;
424     }
425     
426   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
427   buf_1xx = xmalloc (sizeof (struct ccase)
428                      + (sizeof (union value)
429                         * (dict_get_value_cnt (agr_dict) - 1)));
430
431   return 1;
432 }
433
434 /* Parse all the aggregate functions. */
435 static int
436 parse_aggregate_functions (void)
437 {
438   agr_first = agr_next = NULL;
439
440   /* Parse everything. */
441   for (;;)
442     {
443       char **dest;
444       char **dest_label;
445       int n_dest;
446
447       int include_missing;
448       struct agr_func *function;
449       int func_index;
450
451       union value arg[2];
452
453       struct variable **src;
454       int n_src;
455
456       int i;
457
458       dest = NULL;
459       dest_label = NULL;
460       n_dest = 0;
461       src = NULL;
462       n_src = 0;
463       arg[0].c = NULL;
464       arg[1].c = NULL;
465
466       /* Parse the list of target variables. */
467       while (!lex_match ('='))
468         {
469           int n_dest_prev = n_dest;
470           
471           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
472             goto lossage;
473
474           /* Assign empty labels. */
475           {
476             int j;
477
478             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
479             for (j = n_dest_prev; j < n_dest; j++)
480               dest_label[j] = NULL;
481           }
482           
483           if (token == T_STRING)
484             {
485               ds_truncate (&tokstr, 120);
486               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
487               lex_get ();
488             }
489         }
490
491       /* Get the name of the aggregation function. */
492       if (token != T_ID)
493         {
494           lex_error (_("expecting aggregation function"));
495           goto lossage;
496         }
497
498       include_missing = 0;
499       if (tokid[strlen (tokid) - 1] == '.')
500         {
501           include_missing = 1;
502           tokid[strlen (tokid) - 1] = 0;
503         }
504       
505       for (function = agr_func_tab; function->name; function++)
506         if (!strcmp (function->name, tokid))
507           break;
508       if (NULL == function->name)
509         {
510           msg (SE, _("Unknown aggregation function %s."), tokid);
511           goto lossage;
512         }
513       func_index = function - agr_func_tab;
514       lex_get ();
515
516       /* Check for leading lparen. */
517       if (!lex_match ('('))
518         {
519           if (func_index == N)
520             func_index = N_NO_VARS;
521           else if (func_index == NU)
522             func_index = NU_NO_VARS;
523           else
524             {
525               lex_error (_("expecting `('"));
526               goto lossage;
527             }
528         } else {
529           /* Parse list of source variables. */
530           {
531             int pv_opts = PV_NO_SCRATCH;
532
533             if (func_index == SUM || func_index == MEAN || func_index == SD)
534               pv_opts |= PV_NUMERIC;
535             else if (function->n_args)
536               pv_opts |= PV_SAME_TYPE;
537
538             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
539               goto lossage;
540           }
541
542           /* Parse function arguments, for those functions that
543              require arguments. */
544           if (function->n_args != 0)
545             for (i = 0; i < function->n_args; i++)
546               {
547                 int type;
548             
549                 lex_match (',');
550                 if (token == T_STRING)
551                   {
552                     arg[i].c = xstrdup (ds_value (&tokstr));
553                     type = ALPHA;
554                   }
555                 else if (token == T_NUM)
556                   {
557                     arg[i].f = tokval;
558                     type = NUMERIC;
559                   } else {
560                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
561                     goto lossage;
562                   }
563             
564                 lex_get ();
565
566                 if (type != src[0]->type)
567                   {
568                     msg (SE, _("Arguments to %s must be of same type as "
569                                "source variables."),
570                          function->name);
571                     goto lossage;
572                   }
573               }
574
575           /* Trailing rparen. */
576           if (!lex_match(')'))
577             {
578               lex_error (_("expecting `)'"));
579               goto lossage;
580             }
581           
582           /* Now check that the number of source variables match the
583              number of target variables.  Do this here because if we
584              do it earlier then the user can get very misleading error
585              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
586              error message when a proper message would be more like
587              `unknown variable t'. */
588           if (n_src != n_dest)
589             {
590               msg (SE, _("Number of source variables (%d) does not match "
591                          "number of target variables (%d)."),
592                    n_src, n_dest);
593               goto lossage;
594             }
595         }
596         
597       /* Finally add these to the linked list of aggregation
598          variables. */
599       for (i = 0; i < n_dest; i++)
600         {
601           struct agr_var *v = xmalloc (sizeof *v);
602
603           /* Add variable to chain. */
604           if (agr_first)
605             agr_next = agr_next->next = v;
606           else
607             agr_first = agr_next = v;
608           agr_next->next = NULL;
609           
610           /* Create the target variable in the aggregate
611              dictionary. */
612           {
613             struct variable *destvar;
614             
615             agr_next->function = func_index;
616
617             if (src)
618               {
619                 int output_width;
620
621                 agr_next->src = src[i];
622                 
623                 if (src[i]->type == ALPHA)
624                   {
625                     agr_next->function |= FSTRING;
626                     agr_next->string = xmalloc (src[i]->width);
627                   }
628                 
629                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
630                   output_width = 0;
631                 else
632                   output_width = agr_next->src->width;
633
634                 if (function->alpha_type == ALPHA)
635                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
636                 else
637                   {
638                     destvar = dict_create_var (agr_dict, dest[i], output_width);
639                     if (output_width == 0)
640                       destvar->print = destvar->write = function->format;
641                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
642                         && (func_index == N || func_index == N_NO_VARS
643                             || func_index == NU || func_index == NU_NO_VARS))
644                       {
645                         struct fmt_spec f = {FMT_F, 8, 2};
646                       
647                         destvar->print = destvar->write = f;
648                       }
649                   }
650               } else {
651                 agr_next->src = NULL;
652                 destvar = dict_create_var (agr_dict, dest[i], 0);
653               }
654           
655             if (!destvar)
656               {
657                 msg (SE, _("Variable name %s is not unique within the "
658                            "aggregate file dictionary, which contains "
659                            "the aggregate variables and the break "
660                            "variables."),
661                      dest[i]);
662                 free (dest[i]);
663                 goto lossage;
664               }
665
666             free (dest[i]);
667             if (dest_label[i])
668               {
669                 destvar->label = dest_label[i];
670                 dest_label[i] = NULL;
671               }
672             else if (function->alpha_type == ALPHA)
673               destvar->print = destvar->write = function->format;
674
675             agr_next->dest = destvar;
676           }
677           
678           agr_next->include_missing = include_missing;
679
680           if (agr_next->src != NULL)
681             {
682               int j;
683
684               if (agr_next->src->type == NUMERIC)
685                 for (j = 0; j < function->n_args; j++)
686                   agr_next->arg[j].f = arg[j].f;
687               else
688                 for (j = 0; j < function->n_args; j++)
689                   agr_next->arg[j].c = xstrdup (arg[j].c);
690             }
691         }
692       
693       if (src != NULL && src[0]->type == ALPHA)
694         for (i = 0; i < function->n_args; i++)
695           {
696             free (arg[i].c);
697             arg[i].c = NULL;
698           }
699
700       free (src);
701       free (dest);
702       free (dest_label);
703
704       if (!lex_match ('/'))
705         {
706           if (token == '.')
707             return 1;
708
709           lex_error ("expecting end of command");
710           return 0;
711         }
712       continue;
713       
714     lossage:
715       for (i = 0; i < n_dest; i++)
716         {
717           free (dest[i]);
718           free (dest_label[i]);
719         }
720       free (dest);
721       free (dest_label);
722       free (arg[0].c);
723       free (arg[1].c);
724       if (src && n_src && src[0]->type == ALPHA)
725         for (i = 0; i < function->n_args; i++)
726           {
727             free(arg[i].c);
728             arg[i].c = NULL;
729           }
730       free (src);
731         
732       return 0;
733     }
734 }
735
736 /* Frees all the state for the AGGREGATE procedure. */
737 static void
738 free_aggregate_functions (void)
739 {
740   struct agr_var *iter, *next;
741
742   if (agr_dict)
743     dict_destroy (agr_dict);
744   for (iter = agr_first; iter; iter = next)
745     {
746       next = iter->next;
747
748       if (iter->function & FSTRING)
749         {
750           int n_args;
751           int i;
752
753           n_args = agr_func_tab[iter->function & FUNC].n_args;
754           for (i = 0; i < n_args; i++)
755             free (iter->arg[i].c);
756           free (iter->string);
757         }
758       free (iter);
759     }
760 }
761 \f
762 /* Execution. */
763
764 static void accumulate_aggregate_info (struct ccase *input);
765 static void dump_aggregate_info (struct ccase *output);
766
767 /* Processes a single case INPUT for aggregation.  If output is
768    warranted, it is written to case OUTPUT, which may be (but need not
769    be) an alias to INPUT.  Returns -1 when output is performed, -2
770    otherwise. */
771 /* The code in this function has an eerie similarity to
772    vfm.c:SPLIT_FILE_procfunc()... */
773 static int
774 aggregate_single_case (struct ccase *input, struct ccase *output)
775 {
776   /* The first case always begins a new break group.  We also need to
777      preserve the values of the case for later comparison. */
778   if (case_count++ == 0)
779     {
780       int n_elem = 0;
781       
782       {
783         int i;
784
785         for (i = 0; i < nv_sort; i++)
786           n_elem += v_sort[i]->nv;
787       }
788       
789       prev_case = xmalloc (sizeof *prev_case * n_elem);
790
791       /* Copy INPUT into prev_case. */
792       {
793         union value *iter = prev_case;
794         int i;
795
796         for (i = 0; i < nv_sort; i++)
797           {
798             struct variable *v = v_sort[i];
799             
800             if (v->type == NUMERIC)
801               (iter++)->f = input->data[v->fv].f;
802             else
803               {
804                 memcpy (iter->s, input->data[v->fv].s, v->width);
805                 iter += v->nv;
806               }
807           }
808       }
809             
810       accumulate_aggregate_info (input);
811         
812       return -2;
813     }
814       
815   /* Compare the value of each break variable to the values on the
816      previous case. */
817   {
818     union value *iter = prev_case;
819     int i;
820     
821     for (i = 0; i < nv_sort; i++)
822       {
823         struct variable *v = v_sort[i];
824       
825         switch (v->type)
826           {
827           case NUMERIC:
828             if (input->data[v->fv].f != iter->f)
829               goto not_equal;
830             iter++;
831             break;
832           case ALPHA:
833             if (memcmp (input->data[v->fv].s, iter->s, v->width))
834               goto not_equal;
835             iter += v->nv;
836             break;
837           default:
838             assert (0);
839           }
840       }
841   }
842
843   accumulate_aggregate_info (input);
844
845   return -2;
846   
847 not_equal:
848   /* The values of the break variable are different from the values on
849      the previous case.  That means that it's time to dump aggregate
850      info. */
851   dump_aggregate_info (output);
852   initialize_aggregate_info ();
853   accumulate_aggregate_info (input);
854
855   /* Copy INPUT into prev_case. */
856   {
857     union value *iter = prev_case;
858     int i;
859
860     for (i = 0; i < nv_sort; i++)
861       {
862         struct variable *v = v_sort[i];
863             
864         if (v->type == NUMERIC)
865           (iter++)->f = input->data[v->fv].f;
866         else
867           {
868             memcpy (iter->s, input->data[v->fv].s, v->width);
869             iter += v->nv;
870           }
871       }
872   }
873   
874   return -1;
875 }
876
877 /* Accumulates aggregation data from the case INPUT. */
878 static void 
879 accumulate_aggregate_info (struct ccase *input)
880 {
881   struct agr_var *iter;
882   double weight;
883
884   weight = dict_get_case_weight (default_dict, input);
885
886   for (iter = agr_first; iter; iter = iter->next)
887     if (iter->src)
888       {
889         union value *v = &input->data[iter->src->fv];
890
891         if ((!iter->include_missing && is_missing (v, iter->src))
892             || (iter->include_missing && iter->src->type == NUMERIC
893                 && v->f == SYSMIS))
894           {
895             switch (iter->function)
896               {
897               case NMISS:
898                 iter->dbl[0] += weight;
899                 break;
900               case NUMISS:
901                 iter->int1++;
902                 break;
903               }
904             iter->missing = 1;
905             continue;
906           }
907         
908         /* This is horrible.  There are too many possibilities. */
909         switch (iter->function)
910           {
911           case SUM:
912             iter->dbl[0] += v->f;
913             break;
914           case MEAN:
915             iter->dbl[0] += v->f * weight;
916             iter->dbl[1] += weight;
917             break;
918           case SD: 
919             {
920               double product = v->f * weight;
921               iter->dbl[0] += product;
922               iter->dbl[1] += product * v->f;
923               iter->dbl[2] += weight;
924               break; 
925             }
926           case MAX:
927             iter->dbl[0] = max (iter->dbl[0], v->f);
928             iter->int1 = 1;
929             break;
930           case MAX | FSTRING:
931             if (memcmp (iter->string, v->s, iter->src->width) < 0)
932               memcpy (iter->string, v->s, iter->src->width);
933             iter->int1 = 1;
934             break;
935           case MIN:
936             iter->dbl[0] = min (iter->dbl[0], v->f);
937             iter->int1 = 1;
938             break;
939           case MIN | FSTRING:
940             if (memcmp (iter->string, v->s, iter->src->width) > 0)
941               memcpy (iter->string, v->s, iter->src->width);
942             iter->int1 = 1;
943             break;
944           case FGT:
945           case PGT:
946             if (v->f > iter->arg[0].f)
947               iter->dbl[0] += weight;
948             iter->dbl[1] += weight;
949             break;
950           case FGT | FSTRING:
951           case PGT | FSTRING:
952             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
953               iter->dbl[0] += weight;
954             iter->dbl[1] += weight;
955             break;
956           case FLT:
957           case PLT:
958             if (v->f < iter->arg[0].f)
959               iter->dbl[0] += weight;
960             iter->dbl[1] += weight;
961             break;
962           case FLT | FSTRING:
963           case PLT | FSTRING:
964             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
965               iter->dbl[0] += weight;
966             iter->dbl[1] += weight;
967             break;
968           case FIN:
969           case PIN:
970             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
971               iter->dbl[0] += weight;
972             iter->dbl[1] += weight;
973             break;
974           case FIN | FSTRING:
975           case PIN | FSTRING:
976             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
977                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
978               iter->dbl[0] += weight;
979             iter->dbl[1] += weight;
980             break;
981           case FOUT:
982           case POUT:
983             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
984               iter->dbl[0] += weight;
985             iter->dbl[1] += weight;
986             break;
987           case FOUT | FSTRING:
988           case POUT | FSTRING:
989             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
990                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
991               iter->dbl[0] += weight;
992             iter->dbl[1] += weight;
993             break;
994           case N:
995             iter->dbl[0] += weight;
996             break;
997           case NU:
998             iter->int1++;
999             break;
1000           case FIRST:
1001             if (iter->int1 == 0)
1002               {
1003                 iter->dbl[0] = v->f;
1004                 iter->int1 = 1;
1005               }
1006             break;
1007           case FIRST | FSTRING:
1008             if (iter->int1 == 0)
1009               {
1010                 memcpy (iter->string, v->s, iter->src->width);
1011                 iter->int1 = 1;
1012               }
1013             break;
1014           case LAST:
1015             iter->dbl[0] = v->f;
1016             iter->int1 = 1;
1017             break;
1018           case LAST | FSTRING:
1019             memcpy (iter->string, v->s, iter->src->width);
1020             iter->int1 = 1;
1021             break;
1022           default:
1023             assert (0);
1024           }
1025     } else {
1026       switch (iter->function)
1027         {
1028         case N_NO_VARS:
1029           iter->dbl[0] += weight;
1030           break;
1031         case NU_NO_VARS:
1032           iter->int1++;
1033           break;
1034         default:
1035           assert (0);
1036         }
1037     }
1038 }
1039
1040 /* We've come to a record that differs from the previous in one or
1041    more of the break variables.  Make an output record from the
1042    accumulated statistics in the OUTPUT case. */
1043 static void 
1044 dump_aggregate_info (struct ccase *output)
1045 {
1046   debug_printf (("(dumping "));
1047   
1048   {
1049     int n_elem = 0;
1050     
1051     {
1052       int i;
1053
1054       for (i = 0; i < nv_sort; i++)
1055         n_elem += v_sort[i]->nv;
1056     }
1057     debug_printf (("n_elem=%d:", n_elem));
1058     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1059   }
1060   
1061   {
1062     struct agr_var *i;
1063   
1064     for (i = agr_first; i; i = i->next)
1065       {
1066         union value *v = &output->data[i->dest->fv];
1067
1068         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1069
1070         if (missing == COLUMNWISE && i->missing != 0
1071             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1072             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1073           {
1074             if (i->function & FSTRING)
1075               memset (v->s, ' ', i->dest->width);
1076             else
1077               v->f = SYSMIS;
1078             continue;
1079           }
1080         
1081         switch (i->function)
1082           {
1083           case SUM:
1084             v->f = i->dbl[0];
1085             break;
1086           case MEAN:
1087             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1088             break;
1089           case SD:
1090             v->f = ((i->dbl[2] > 1.0)
1091                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1092                     : SYSMIS);
1093             break;
1094           case MAX:
1095           case MIN:
1096             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1097             break;
1098           case MAX | FSTRING:
1099           case MIN | FSTRING:
1100             if (i->int1)
1101               memcpy (v->s, i->string, i->dest->width);
1102             else
1103               memset (v->s, ' ', i->dest->width);
1104             break;
1105           case FGT | FSTRING:
1106           case FLT | FSTRING:
1107           case FIN | FSTRING:
1108           case FOUT | FSTRING:
1109             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1110             break;
1111           case FGT:
1112           case FLT:
1113           case FIN:
1114           case FOUT:
1115             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1116             break;
1117           case PGT:
1118           case PGT | FSTRING:
1119           case PLT:
1120           case PLT | FSTRING:
1121           case PIN:
1122           case PIN | FSTRING:
1123           case POUT:
1124           case POUT | FSTRING:
1125             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1126             break;
1127           case N:
1128             v->f = i->dbl[0];
1129             break;
1130           case NU:
1131             v->f = i->int1;
1132             break;
1133           case FIRST:
1134           case LAST:
1135             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1136             break;
1137           case FIRST | FSTRING:
1138           case LAST | FSTRING:
1139             if (i->int1)
1140               memcpy (v->s, i->string, i->dest->width);
1141             else
1142               memset (v->s, ' ', i->dest->width);
1143             break;
1144           case N_NO_VARS:
1145             v->f = i->dbl[0];
1146             break;
1147           case NU_NO_VARS:
1148             v->f = i->int1;
1149             break;
1150           case NMISS:
1151             v->f = i->dbl[0];
1152             break;
1153           case NUMISS:
1154             v->f = i->int1;
1155             break;
1156           default:
1157             assert (0);
1158           }
1159       }
1160   }
1161   debug_printf ((") "));
1162 }
1163
1164 /* Resets the state for all the aggregate functions. */
1165 static void
1166 initialize_aggregate_info (void)
1167 {
1168   struct agr_var *iter;
1169
1170   for (iter = agr_first; iter; iter = iter->next)
1171     {
1172       iter->missing = 0;
1173       switch (iter->function)
1174         {
1175         case MIN:
1176           iter->dbl[0] = DBL_MAX;
1177           break;
1178         case MIN | FSTRING:
1179           memset (iter->string, 255, iter->src->width);
1180           break;
1181         case MAX:
1182           iter->dbl[0] = -DBL_MAX;
1183           break;
1184         case MAX | FSTRING:
1185           memset (iter->string, 0, iter->src->width);
1186           break;
1187         default:
1188           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1189           iter->int1 = iter->int2 = 0;
1190           break;
1191         }
1192     }
1193 }
1194 \f
1195 /* Aggregate each case as it comes through.  Cases which aren't needed
1196    are dropped. */
1197 static int
1198 agr_00x_trns_proc (struct trns_header *h unused, struct ccase *c)
1199 {
1200   int code = aggregate_single_case (c, compaction_case);
1201   debug_printf (("%d ", code));
1202   return code;
1203 }
1204
1205 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1206    write() method here because end_func is called so soon after all
1207    the cases have been output; very little has been cleaned up at this
1208    point. */
1209 static void
1210 agr_00x_end_func (void)
1211 {
1212   /* Ensure that info for the last break group gets written to the
1213      active file. */
1214   dump_aggregate_info (compaction_case);
1215   vfm_sink_info.ncases++;
1216   vfm_sink->write ();
1217 }
1218
1219 /* Transform the aggregate case buf_1xx, in internal format, to system
1220    file format, in buf64_1xx, and write the resultant case to the
1221    system file. */
1222 static void
1223 write_case_to_sfm (void)
1224 {
1225   flt64 *p = buf64_1xx;
1226   int i;
1227
1228   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1229     {
1230       struct variable *v = dict_get_var (agr_dict, i);
1231       
1232       if (v->type == NUMERIC)
1233         {
1234           double src = buf_1xx->data[v->fv].f;
1235           if (src == SYSMIS)
1236             *p++ = -FLT64_MAX;
1237           else
1238             *p++ = src;
1239         }
1240       else
1241         {
1242           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1243           memset (&((char *) p)[v->width], ' ',
1244                   REM_RND_UP (v->width, sizeof (flt64)));
1245           p += DIV_RND_UP (v->width, sizeof (flt64));
1246         }
1247     }
1248
1249   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1250 }
1251
1252 /* Aggregate the current case and output it if we passed a
1253    breakpoint. */
1254 static int
1255 agr_10x_trns_proc (struct trns_header *h unused, struct ccase *c)
1256 {
1257   int code = aggregate_single_case (c, buf_1xx);
1258
1259   assert (code == -2 || code == -1);
1260   if (code == -1)
1261     write_case_to_sfm ();
1262   return -1;
1263 }
1264
1265 /* Close the system file now that we're done with it.  */
1266 static void
1267 agr_10x_trns_free (struct trns_header *h unused)
1268 {
1269   fh_close_handle (outfile);
1270 }
1271
1272 /* Ensure that info for the last break group gets written to the
1273    system file. */
1274 static void
1275 agr_10x_end_func (void)
1276 {
1277   dump_aggregate_info (buf_1xx);
1278   write_case_to_sfm ();
1279 }
1280
1281 /* When called with temp_case non-NULL (the normal case), runs the
1282    case through the aggregater and outputs it to the system file if
1283    appropriate.  If temp_case is NULL, finishes up writing the last
1284    case if necessary. */
1285 static int
1286 agr_11x_func (void)
1287 {
1288   if (temp_case != NULL)
1289     {
1290       int code = aggregate_single_case (temp_case, buf_1xx);
1291       
1292       assert (code == -2 || code == -1);
1293       if (code == -1)
1294         write_case_to_sfm ();
1295     }
1296   else
1297     {
1298       if (case_count)
1299         {
1300           dump_aggregate_info (buf_1xx);
1301           write_case_to_sfm ();
1302         }
1303       fh_close_handle (outfile);
1304     }
1305   return 1;
1306 }
1307 \f
1308 /* Debugging. */
1309 #if DEBUGGING
1310 /* Print out useful debugging information. */
1311 static void
1312 debug_print (int flags)
1313 {
1314   printf ("AGGREGATE\n /OUTFILE=%s\n",
1315           outfile ? fh_handle_filename (outfile) : "*");
1316
1317   if (missing == COLUMNWISE)
1318     puts (" /MISSING=COLUMNWISE");
1319
1320   if (flags & 2)
1321     puts (" /DOCUMENT");
1322   if (flags & 4)
1323     puts (" /PRESORTED");
1324   
1325   {
1326     int i;
1327
1328     printf (" /BREAK=");
1329     for (i = 0; i < nv_sort; i++)
1330       printf ("%s(%c) ", v_sort[i]->name,
1331               v_sort[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1332     putc ('\n', stdout);
1333   }
1334   
1335   {
1336     struct agr_var *iter;
1337     
1338     for (iter = agr_first; iter; iter = iter->next)
1339       {
1340         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1341         
1342         printf (" /%s", iter->dest->name);
1343         if (iter->dest->label)
1344           printf ("'%s'", iter->dest->label);
1345         printf ("=%s(%s", f->name, iter->src->name);
1346         if (f->n_args)
1347           {
1348             int i;
1349             
1350             for (i = 0; i < f->n_args; i++)
1351               {
1352                 putc (',', stdout);
1353                 if (iter->src->type == NUMERIC)
1354                   printf ("%g", iter->arg[i].f);
1355                 else
1356                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1357               }
1358           }
1359         printf (")\n");
1360       }
1361   }
1362 }
1363
1364 #endif /* DEBUGGING */