880a713a3e6481e18c86a9298578b7816647fa84
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #include "debug-print.h"
40
41 /* Specifies how to make an aggregate variable. */
42 struct agr_var
43   {
44     struct agr_var *next;               /* Next in list. */
45
46     /* Collected during parsing. */
47     struct variable *src;       /* Source variable. */
48     struct variable *dest;      /* Target variable. */
49     int function;               /* Function. */
50     int include_missing;        /* 1=Include user-missing values. */
51     union value arg[2];         /* Arguments. */
52
53     /* Accumulated during AGGREGATE execution. */
54     double dbl[3];
55     int int1, int2;
56     char *string;
57     int missing;
58   };
59
60 /* Aggregation functions. */
61 enum
62   {
63     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
64     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
65     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
66     FUNC = 0x1f, /* Function mask. */
67     FSTRING = 1<<5, /* String function bit. */
68   };
69
70 /* Attributes of an aggregation function. */
71 struct agr_func
72   {
73     const char *name;           /* Aggregation function name. */
74     int n_args;                 /* Number of arguments. */
75     int alpha_type;             /* When given ALPHA arguments, output type. */
76     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
77   };
78
79 /* Attributes of aggregation functions. */
80 static const struct agr_func agr_func_tab[] = 
81   {
82     {"<NONE>",  0, -1,      {0, 0, 0}},
83     {"SUM",     0, -1,      {FMT_F, 8, 2}},
84     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
85     {"SD",      0, -1,      {FMT_F, 8, 2}},
86     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
87     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
88     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
89     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
90     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
91     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
92     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
94     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
95     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
96     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
100     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
101     {"LAST",    0, ALPHA,   {-1, -1, -1}},
102     {NULL,      0, -1,      {-1, -1, -1}},
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
105   };
106
107 /* Output file, or NULL for the active file. */
108 static struct file_handle *outfile;
109
110 /* Missing value types. */
111 enum
112   {
113     ITEMWISE,           /* Missing values item by item. */
114     COLUMNWISE          /* Missing values column by column. */
115   };
116
117 /* ITEMWISE or COLUMNWISE. */
118 static int missing;
119
120 /* Sort program. */
121 static struct sort_cases_pgm *sort;
122
123 /* Aggregate variables. */
124 static struct agr_var *agr_first, *agr_next;
125
126 /* Aggregate dictionary. */
127 static struct dictionary *agr_dict;
128
129 /* Number of cases passed through aggregation. */
130 static int case_count;
131
132 /* Last values of the break variables. */
133 static union value *prev_case;
134
135 /* Buffers for use by the 10x transformation. */
136 static flt64 *buf64_1xx;
137 static struct ccase *buf_1xx;
138
139 static void initialize_aggregate_info (void);
140
141 /* Prototypes. */
142 static int parse_aggregate_functions (void);
143 static void free_aggregate_functions (void);
144 static int aggregate_single_case (struct ccase *input, struct ccase *output);
145 static int create_sysfile (void);
146
147 static trns_proc_func agr_00x_trns_proc, agr_10x_trns_proc;
148 static trns_free_func agr_10x_trns_free;
149 static void agr_00x_end_func (void *aux);
150 static void agr_10x_end_func (void *);
151 static int agr_11x_func (write_case_data);
152
153 #if DEBUGGING
154 static void debug_print (int flags);
155 #endif
156 \f
157 /* Parsing. */
158
159 /* Parses and executes the AGGREGATE procedure. */
160 int
161 cmd_aggregate (void)
162 {
163   /* Have we seen these subcommands? */
164   unsigned seen = 0;
165
166   outfile = NULL;
167   missing = ITEMWISE;
168   sort = NULL;
169   prev_case = NULL;
170   
171   agr_dict = dict_create ();
172   dict_set_label (agr_dict, dict_get_label (default_dict));
173   dict_set_documents (agr_dict, dict_get_documents (default_dict));
174   
175   lex_match_id ("AGGREGATE");
176
177   /* Read most of the subcommands. */
178   for (;;)
179     {
180       lex_match('/');
181       
182       if (lex_match_id ("OUTFILE"))
183         {
184           if (seen & 1)
185             {
186               destroy_sort_cases_pgm (sort);
187               dict_destroy (agr_dict);
188               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
189               return CMD_FAILURE;
190             }
191           seen |= 1;
192               
193           lex_match ('=');
194           if (lex_match ('*'))
195             outfile = NULL;
196           else 
197             {
198               outfile = fh_parse_file_handle ();
199               if (outfile == NULL)
200                 {
201                   destroy_sort_cases_pgm (sort);
202                   dict_destroy (agr_dict);
203                   return CMD_FAILURE;
204                 }
205             }
206         }
207       else if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               destroy_sort_cases_pgm (sort);
213               dict_destroy (agr_dict);
214               lex_error (_("while expecting COLUMNWISE"));
215               return CMD_FAILURE;
216             }
217           missing = COLUMNWISE;
218         }
219       else if (lex_match_id ("DOCUMENT"))
220         seen |= 2;
221       else if (lex_match_id ("PRESORTED"))
222         seen |= 4;
223       else if (lex_match_id ("BREAK"))
224         {
225           if (seen & 8)
226             {
227               destroy_sort_cases_pgm (sort);
228               dict_destroy (agr_dict);
229               msg (SE, _("%s subcommand given multiple times."),"BREAK");
230               return CMD_FAILURE;
231             }
232           seen |= 8;
233
234           lex_match ('=');
235           sort = parse_sort ();
236           if (sort == NULL)
237             {
238               dict_destroy (agr_dict);
239               return CMD_FAILURE;
240             }
241           
242           {
243             int i;
244             
245             for (i = 0; i < sort->var_cnt; i++)
246               {
247                 struct variable *v;
248               
249                 v = dict_clone_var (agr_dict, sort->vars[i], sort->vars[i]->name);
250                 assert (v != NULL);
251               }
252           }
253         }
254       else break;
255     }
256
257   /* Check for proper syntax. */
258   if (!(seen & 8))
259     msg (SW, _("BREAK subcommand not specified."));
260       
261   /* Read in the aggregate functions. */
262   if (!parse_aggregate_functions ())
263     {
264       free_aggregate_functions ();
265       destroy_sort_cases_pgm (sort);
266       return CMD_FAILURE;
267     }
268
269   /* Delete documents. */
270   if (!(seen & 2))
271     dict_set_documents (agr_dict, NULL);
272
273   /* Cancel SPLIT FILE. */
274   dict_set_split_vars (agr_dict, NULL, 0);
275   
276 #if DEBUGGING
277   debug_print (seen);
278 #endif
279
280   /* Initialize. */
281   case_count = 0;
282   initialize_aggregate_info ();
283
284   /* How to implement all this... There are three important variables:
285      whether output is going to the active file (0) or a separate file
286      (1); whether the input data is presorted (0) or needs sorting
287      (1); whether there is a temporary transformation (1) or not (0).
288      The eight cases are as follows:
289
290      000 (0): Pass it through an aggregate transformation that
291      modifies the data.
292
293      001 (1): Cancel the temporary transformation and handle as 000.
294
295      010 (2): Set up a SORT CASES and aggregate the output, writing
296      the results to the active file.
297      
298      011 (3): Cancel the temporary transformation and handle as 010.
299
300      100 (4): Pass it through an aggregate transformation that doesn't
301      modify the data but merely writes it to the output file.
302
303      101 (5): Handled as 100.
304
305      110 (6): Set up a SORT CASES and capture the output, aggregate
306      it, write it to the output file without modifying the active
307      file.
308
309      111 (7): Handled as 110. */
310   
311   {
312     unsigned type = 0;
313
314     if (outfile != NULL)
315       type |= 4;
316     if (sort != NULL && (seen & 4) == 0)
317       type |= 2;
318     if (temporary)
319       type |= 1;
320
321     switch (type)
322       {
323       case 3:
324         cancel_temporary ();
325         /* fall through */
326       case 2:
327         sort_cases (sort, 0);
328         goto case0;
329           
330       case 1:
331         cancel_temporary ();
332         /* fall through */
333       case 0:
334       case0:
335         {
336           struct trns_header *t = xmalloc (sizeof *t);
337           t->proc = agr_00x_trns_proc;
338           t->free = NULL;
339           add_transformation (t);
340           
341           temporary = 2;
342           temp_dict = agr_dict;
343           temp_trns = n_trns;
344           
345           agr_dict = NULL;
346
347           procedure (NULL, NULL, agr_00x_end_func, NULL);
348           break;
349         }
350
351       case 4:
352       case 5:
353         {
354           if (!create_sysfile ())
355             goto lossage;
356           
357           {
358             struct trns_header *t = xmalloc (sizeof *t);
359             t->proc = agr_10x_trns_proc;
360             t->free = agr_10x_trns_free;
361             add_transformation (t);
362
363             procedure (NULL, NULL, agr_10x_end_func, NULL);
364           }
365           
366           break;
367         }
368           
369       case 6:
370       case 7:
371         sort_cases (sort, 1);
372         
373         if (!create_sysfile ())
374           goto lossage;
375         read_sort_output (sort, agr_11x_func, NULL);
376         
377         {
378           struct ccase *save_temp_case = temp_case;
379           temp_case = NULL;
380           agr_11x_func (NULL);
381           temp_case = save_temp_case;
382         }
383         
384         break;
385
386       default:
387         assert (0);
388       }
389   }
390   
391   free (buf64_1xx);
392   free (buf_1xx);
393   
394   /* Clean up. */
395   destroy_sort_cases_pgm (sort);
396   free_aggregate_functions ();
397   free (prev_case);
398   
399   return CMD_SUCCESS;
400
401 lossage:
402   /* Clean up. */
403   destroy_sort_cases_pgm (sort);
404   free_aggregate_functions ();
405   free (prev_case);
406
407   return CMD_FAILURE;
408 }
409
410 /* Create a system file for use in aggregation to an external file,
411    and allocate temporary buffers for writing out cases. */
412 static int
413 create_sysfile (void)
414 {
415   struct sfm_write_info w;
416   w.h = outfile;
417   w.dict = agr_dict;
418   w.compress = set_scompression;
419   if (!sfm_write_dictionary (&w))
420     {
421       free_aggregate_functions ();
422       destroy_sort_cases_pgm (sort);
423       dict_destroy (agr_dict);
424       return 0;
425     }
426     
427   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
428   buf_1xx = xmalloc (dict_get_case_size (agr_dict));
429
430   return 1;
431 }
432
433 /* Parse all the aggregate functions. */
434 static int
435 parse_aggregate_functions (void)
436 {
437   agr_first = agr_next = NULL;
438
439   /* Parse everything. */
440   for (;;)
441     {
442       char **dest;
443       char **dest_label;
444       int n_dest;
445
446       int include_missing;
447       const struct agr_func *function;
448       int func_index;
449
450       union value arg[2];
451
452       struct variable **src;
453       int n_src;
454
455       int i;
456
457       dest = NULL;
458       dest_label = NULL;
459       n_dest = 0;
460       src = NULL;
461       function = NULL;
462       n_src = 0;
463       arg[0].c = NULL;
464       arg[1].c = NULL;
465
466       /* Parse the list of target variables. */
467       while (!lex_match ('='))
468         {
469           int n_dest_prev = n_dest;
470           
471           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
472             goto lossage;
473
474           /* Assign empty labels. */
475           {
476             int j;
477
478             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
479             for (j = n_dest_prev; j < n_dest; j++)
480               dest_label[j] = NULL;
481           }
482           
483           if (token == T_STRING)
484             {
485               ds_truncate (&tokstr, 120);
486               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
487               lex_get ();
488             }
489         }
490
491       /* Get the name of the aggregation function. */
492       if (token != T_ID)
493         {
494           lex_error (_("expecting aggregation function"));
495           goto lossage;
496         }
497
498       include_missing = 0;
499       if (tokid[strlen (tokid) - 1] == '.')
500         {
501           include_missing = 1;
502           tokid[strlen (tokid) - 1] = 0;
503         }
504       
505       for (function = agr_func_tab; function->name; function++)
506         if (!strcmp (function->name, tokid))
507           break;
508       if (NULL == function->name)
509         {
510           msg (SE, _("Unknown aggregation function %s."), tokid);
511           goto lossage;
512         }
513       func_index = function - agr_func_tab;
514       lex_get ();
515
516       /* Check for leading lparen. */
517       if (!lex_match ('('))
518         {
519           if (func_index == N)
520             func_index = N_NO_VARS;
521           else if (func_index == NU)
522             func_index = NU_NO_VARS;
523           else
524             {
525               lex_error (_("expecting `('"));
526               goto lossage;
527             }
528         } else {
529           /* Parse list of source variables. */
530           {
531             int pv_opts = PV_NO_SCRATCH;
532
533             if (func_index == SUM || func_index == MEAN || func_index == SD)
534               pv_opts |= PV_NUMERIC;
535             else if (function->n_args)
536               pv_opts |= PV_SAME_TYPE;
537
538             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
539               goto lossage;
540           }
541
542           /* Parse function arguments, for those functions that
543              require arguments. */
544           if (function->n_args != 0)
545             for (i = 0; i < function->n_args; i++)
546               {
547                 int type;
548             
549                 lex_match (',');
550                 if (token == T_STRING)
551                   {
552                     arg[i].c = xstrdup (ds_value (&tokstr));
553                     type = ALPHA;
554                   }
555                 else if (token == T_NUM)
556                   {
557                     arg[i].f = tokval;
558                     type = NUMERIC;
559                   } else {
560                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
561                     goto lossage;
562                   }
563             
564                 lex_get ();
565
566                 if (type != src[0]->type)
567                   {
568                     msg (SE, _("Arguments to %s must be of same type as "
569                                "source variables."),
570                          function->name);
571                     goto lossage;
572                   }
573               }
574
575           /* Trailing rparen. */
576           if (!lex_match(')'))
577             {
578               lex_error (_("expecting `)'"));
579               goto lossage;
580             }
581           
582           /* Now check that the number of source variables match the
583              number of target variables.  Do this here because if we
584              do it earlier then the user can get very misleading error
585              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
586              error message when a proper message would be more like
587              `unknown variable t'. */
588           if (n_src != n_dest)
589             {
590               msg (SE, _("Number of source variables (%d) does not match "
591                          "number of target variables (%d)."),
592                    n_src, n_dest);
593               goto lossage;
594             }
595         }
596         
597       /* Finally add these to the linked list of aggregation
598          variables. */
599       for (i = 0; i < n_dest; i++)
600         {
601           struct agr_var *v = xmalloc (sizeof *v);
602
603           /* Add variable to chain. */
604           if (agr_first)
605             agr_next = agr_next->next = v;
606           else
607             agr_first = agr_next = v;
608           agr_next->next = NULL;
609           
610           /* Create the target variable in the aggregate
611              dictionary. */
612           {
613             struct variable *destvar;
614             
615             agr_next->function = func_index;
616
617             if (src)
618               {
619                 int output_width;
620
621                 agr_next->src = src[i];
622                 
623                 if (src[i]->type == ALPHA)
624                   {
625                     agr_next->function |= FSTRING;
626                     agr_next->string = xmalloc (src[i]->width);
627                   }
628                 
629                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
630                   output_width = 0;
631                 else
632                   output_width = agr_next->src->width;
633
634                 if (function->alpha_type == ALPHA)
635                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
636                 else
637                   {
638                     destvar = dict_create_var (agr_dict, dest[i], output_width);
639                     if (output_width == 0)
640                       destvar->print = destvar->write = function->format;
641                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
642                         && (func_index == N || func_index == N_NO_VARS
643                             || func_index == NU || func_index == NU_NO_VARS))
644                       {
645                         struct fmt_spec f = {FMT_F, 8, 2};
646                       
647                         destvar->print = destvar->write = f;
648                       }
649                   }
650               } else {
651                 agr_next->src = NULL;
652                 destvar = dict_create_var (agr_dict, dest[i], 0);
653               }
654           
655             if (!destvar)
656               {
657                 msg (SE, _("Variable name %s is not unique within the "
658                            "aggregate file dictionary, which contains "
659                            "the aggregate variables and the break "
660                            "variables."),
661                      dest[i]);
662                 free (dest[i]);
663                 goto lossage;
664               }
665
666             free (dest[i]);
667             destvar->init = 0;
668             if (dest_label[i])
669               {
670                 destvar->label = dest_label[i];
671                 dest_label[i] = NULL;
672               }
673             else if (function->alpha_type == ALPHA)
674               destvar->print = destvar->write = function->format;
675
676             agr_next->dest = destvar;
677           }
678           
679           agr_next->include_missing = include_missing;
680
681           if (agr_next->src != NULL)
682             {
683               int j;
684
685               if (agr_next->src->type == NUMERIC)
686                 for (j = 0; j < function->n_args; j++)
687                   agr_next->arg[j].f = arg[j].f;
688               else
689                 for (j = 0; j < function->n_args; j++)
690                   agr_next->arg[j].c = xstrdup (arg[j].c);
691             }
692         }
693       
694       if (src != NULL && src[0]->type == ALPHA)
695         for (i = 0; i < function->n_args; i++)
696           {
697             free (arg[i].c);
698             arg[i].c = NULL;
699           }
700
701       free (src);
702       free (dest);
703       free (dest_label);
704
705       if (!lex_match ('/'))
706         {
707           if (token == '.')
708             return 1;
709
710           lex_error ("expecting end of command");
711           return 0;
712         }
713       continue;
714       
715     lossage:
716       for (i = 0; i < n_dest; i++)
717         {
718           free (dest[i]);
719           free (dest_label[i]);
720         }
721       free (dest);
722       free (dest_label);
723       free (arg[0].c);
724       free (arg[1].c);
725       if (src && n_src && src[0]->type == ALPHA)
726         for (i = 0; i < function->n_args; i++)
727           {
728             free(arg[i].c);
729             arg[i].c = NULL;
730           }
731       free (src);
732         
733       return 0;
734     }
735 }
736
737 /* Frees all the state for the AGGREGATE procedure. */
738 static void
739 free_aggregate_functions (void)
740 {
741   struct agr_var *iter, *next;
742
743   if (agr_dict)
744     dict_destroy (agr_dict);
745   for (iter = agr_first; iter; iter = next)
746     {
747       next = iter->next;
748
749       if (iter->function & FSTRING)
750         {
751           int n_args;
752           int i;
753
754           n_args = agr_func_tab[iter->function & FUNC].n_args;
755           for (i = 0; i < n_args; i++)
756             free (iter->arg[i].c);
757           free (iter->string);
758         }
759       free (iter);
760     }
761 }
762 \f
763 /* Execution. */
764
765 static void accumulate_aggregate_info (struct ccase *input);
766 static void dump_aggregate_info (struct ccase *output);
767
768 /* Processes a single case INPUT for aggregation.  If output is
769    warranted, it is written to case OUTPUT, which may be (but need not
770    be) an alias to INPUT.  Returns -1 when output is performed, -2
771    otherwise. */
772 /* The code in this function has an eerie similarity to
773    vfm.c:SPLIT_FILE_procfunc()... */
774 static int
775 aggregate_single_case (struct ccase *input, struct ccase *output)
776 {
777   /* The first case always begins a new break group.  We also need to
778      preserve the values of the case for later comparison. */
779   if (case_count++ == 0)
780     {
781       int n_elem = 0;
782       
783       {
784         int i;
785
786         for (i = 0; i < sort->var_cnt; i++)
787           n_elem += sort->vars[i]->nv;
788       }
789       
790       prev_case = xmalloc (sizeof *prev_case * n_elem);
791
792       /* Copy INPUT into prev_case. */
793       {
794         union value *iter = prev_case;
795         int i;
796
797         for (i = 0; i < sort->var_cnt; i++)
798           {
799             struct variable *v = sort->vars[i];
800             
801             if (v->type == NUMERIC)
802               (iter++)->f = input->data[v->fv].f;
803             else
804               {
805                 memcpy (iter->s, input->data[v->fv].s, v->width);
806                 iter += v->nv;
807               }
808           }
809       }
810             
811       accumulate_aggregate_info (input);
812         
813       return -2;
814     }
815       
816   /* Compare the value of each break variable to the values on the
817      previous case. */
818   {
819     union value *iter = prev_case;
820     int i;
821     
822     for (i = 0; i < sort->var_cnt; i++)
823       {
824         struct variable *v = sort->vars[i];
825       
826         switch (v->type)
827           {
828           case NUMERIC:
829             if (input->data[v->fv].f != iter->f)
830               goto not_equal;
831             iter++;
832             break;
833           case ALPHA:
834             if (memcmp (input->data[v->fv].s, iter->s, v->width))
835               goto not_equal;
836             iter += v->nv;
837             break;
838           default:
839             assert (0);
840           }
841       }
842   }
843
844   accumulate_aggregate_info (input);
845
846   return -2;
847   
848 not_equal:
849   /* The values of the break variable are different from the values on
850      the previous case.  That means that it's time to dump aggregate
851      info. */
852   dump_aggregate_info (output);
853   initialize_aggregate_info ();
854   accumulate_aggregate_info (input);
855
856   /* Copy INPUT into prev_case. */
857   {
858     union value *iter = prev_case;
859     int i;
860
861     for (i = 0; i < sort->var_cnt; i++)
862       {
863         struct variable *v = sort->vars[i];
864             
865         if (v->type == NUMERIC)
866           (iter++)->f = input->data[v->fv].f;
867         else
868           {
869             memcpy (iter->s, input->data[v->fv].s, v->width);
870             iter += v->nv;
871           }
872       }
873   }
874   
875   return -1;
876 }
877
878 /* Accumulates aggregation data from the case INPUT. */
879 static void 
880 accumulate_aggregate_info (struct ccase *input)
881 {
882   struct agr_var *iter;
883   double weight;
884
885   weight = dict_get_case_weight (default_dict, input);
886
887   for (iter = agr_first; iter; iter = iter->next)
888     if (iter->src)
889       {
890         union value *v = &input->data[iter->src->fv];
891
892         if ((!iter->include_missing && is_missing (v, iter->src))
893             || (iter->include_missing && iter->src->type == NUMERIC
894                 && v->f == SYSMIS))
895           {
896             switch (iter->function)
897               {
898               case NMISS:
899                 iter->dbl[0] += weight;
900                 break;
901               case NUMISS:
902                 iter->int1++;
903                 break;
904               }
905             iter->missing = 1;
906             continue;
907           }
908         
909         /* This is horrible.  There are too many possibilities. */
910         switch (iter->function)
911           {
912           case SUM:
913             iter->dbl[0] += v->f;
914             break;
915           case MEAN:
916             iter->dbl[0] += v->f * weight;
917             iter->dbl[1] += weight;
918             break;
919           case SD: 
920             {
921               double product = v->f * weight;
922               iter->dbl[0] += product;
923               iter->dbl[1] += product * v->f;
924               iter->dbl[2] += weight;
925               break; 
926             }
927           case MAX:
928             iter->dbl[0] = max (iter->dbl[0], v->f);
929             iter->int1 = 1;
930             break;
931           case MAX | FSTRING:
932             if (memcmp (iter->string, v->s, iter->src->width) < 0)
933               memcpy (iter->string, v->s, iter->src->width);
934             iter->int1 = 1;
935             break;
936           case MIN:
937             iter->dbl[0] = min (iter->dbl[0], v->f);
938             iter->int1 = 1;
939             break;
940           case MIN | FSTRING:
941             if (memcmp (iter->string, v->s, iter->src->width) > 0)
942               memcpy (iter->string, v->s, iter->src->width);
943             iter->int1 = 1;
944             break;
945           case FGT:
946           case PGT:
947             if (v->f > iter->arg[0].f)
948               iter->dbl[0] += weight;
949             iter->dbl[1] += weight;
950             break;
951           case FGT | FSTRING:
952           case PGT | FSTRING:
953             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
954               iter->dbl[0] += weight;
955             iter->dbl[1] += weight;
956             break;
957           case FLT:
958           case PLT:
959             if (v->f < iter->arg[0].f)
960               iter->dbl[0] += weight;
961             iter->dbl[1] += weight;
962             break;
963           case FLT | FSTRING:
964           case PLT | FSTRING:
965             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
966               iter->dbl[0] += weight;
967             iter->dbl[1] += weight;
968             break;
969           case FIN:
970           case PIN:
971             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
972               iter->dbl[0] += weight;
973             iter->dbl[1] += weight;
974             break;
975           case FIN | FSTRING:
976           case PIN | FSTRING:
977             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
978                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
979               iter->dbl[0] += weight;
980             iter->dbl[1] += weight;
981             break;
982           case FOUT:
983           case POUT:
984             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
985               iter->dbl[0] += weight;
986             iter->dbl[1] += weight;
987             break;
988           case FOUT | FSTRING:
989           case POUT | FSTRING:
990             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
991                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
992               iter->dbl[0] += weight;
993             iter->dbl[1] += weight;
994             break;
995           case N:
996             iter->dbl[0] += weight;
997             break;
998           case NU:
999             iter->int1++;
1000             break;
1001           case FIRST:
1002             if (iter->int1 == 0)
1003               {
1004                 iter->dbl[0] = v->f;
1005                 iter->int1 = 1;
1006               }
1007             break;
1008           case FIRST | FSTRING:
1009             if (iter->int1 == 0)
1010               {
1011                 memcpy (iter->string, v->s, iter->src->width);
1012                 iter->int1 = 1;
1013               }
1014             break;
1015           case LAST:
1016             iter->dbl[0] = v->f;
1017             iter->int1 = 1;
1018             break;
1019           case LAST | FSTRING:
1020             memcpy (iter->string, v->s, iter->src->width);
1021             iter->int1 = 1;
1022             break;
1023           default:
1024             assert (0);
1025           }
1026     } else {
1027       switch (iter->function)
1028         {
1029         case N_NO_VARS:
1030           iter->dbl[0] += weight;
1031           break;
1032         case NU_NO_VARS:
1033           iter->int1++;
1034           break;
1035         default:
1036           assert (0);
1037         }
1038     }
1039 }
1040
1041 /* We've come to a record that differs from the previous in one or
1042    more of the break variables.  Make an output record from the
1043    accumulated statistics in the OUTPUT case. */
1044 static void 
1045 dump_aggregate_info (struct ccase *output)
1046 {
1047   debug_printf (("(dumping "));
1048   
1049   {
1050     int n_elem = 0;
1051     
1052     {
1053       int i;
1054
1055       for (i = 0; i < sort->var_cnt; i++)
1056         n_elem += sort->vars[i]->nv;
1057     }
1058     debug_printf (("n_elem=%d:", n_elem));
1059     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1060   }
1061   
1062   {
1063     struct agr_var *i;
1064   
1065     for (i = agr_first; i; i = i->next)
1066       {
1067         union value *v = &output->data[i->dest->fv];
1068
1069         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1070
1071         if (missing == COLUMNWISE && i->missing != 0
1072             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1073             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1074           {
1075             if (i->function & FSTRING)
1076               memset (v->s, ' ', i->dest->width);
1077             else
1078               v->f = SYSMIS;
1079             continue;
1080           }
1081         
1082         switch (i->function)
1083           {
1084           case SUM:
1085             v->f = i->dbl[0];
1086             break;
1087           case MEAN:
1088             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1089             break;
1090           case SD:
1091             v->f = ((i->dbl[2] > 1.0)
1092                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1093                     : SYSMIS);
1094             break;
1095           case MAX:
1096           case MIN:
1097             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1098             break;
1099           case MAX | FSTRING:
1100           case MIN | FSTRING:
1101             if (i->int1)
1102               memcpy (v->s, i->string, i->dest->width);
1103             else
1104               memset (v->s, ' ', i->dest->width);
1105             break;
1106           case FGT | FSTRING:
1107           case FLT | FSTRING:
1108           case FIN | FSTRING:
1109           case FOUT | FSTRING:
1110             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1111             break;
1112           case FGT:
1113           case FLT:
1114           case FIN:
1115           case FOUT:
1116             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1117             break;
1118           case PGT:
1119           case PGT | FSTRING:
1120           case PLT:
1121           case PLT | FSTRING:
1122           case PIN:
1123           case PIN | FSTRING:
1124           case POUT:
1125           case POUT | FSTRING:
1126             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1127             break;
1128           case N:
1129             v->f = i->dbl[0];
1130             break;
1131           case NU:
1132             v->f = i->int1;
1133             break;
1134           case FIRST:
1135           case LAST:
1136             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1137             break;
1138           case FIRST | FSTRING:
1139           case LAST | FSTRING:
1140             if (i->int1)
1141               memcpy (v->s, i->string, i->dest->width);
1142             else
1143               memset (v->s, ' ', i->dest->width);
1144             break;
1145           case N_NO_VARS:
1146             v->f = i->dbl[0];
1147             break;
1148           case NU_NO_VARS:
1149             v->f = i->int1;
1150             break;
1151           case NMISS:
1152             v->f = i->dbl[0];
1153             break;
1154           case NUMISS:
1155             v->f = i->int1;
1156             break;
1157           default:
1158             assert (0);
1159           }
1160       }
1161   }
1162   debug_printf ((") "));
1163 }
1164
1165 /* Resets the state for all the aggregate functions. */
1166 static void
1167 initialize_aggregate_info (void)
1168 {
1169   struct agr_var *iter;
1170
1171   for (iter = agr_first; iter; iter = iter->next)
1172     {
1173       iter->missing = 0;
1174       switch (iter->function)
1175         {
1176         case MIN:
1177           iter->dbl[0] = DBL_MAX;
1178           break;
1179         case MIN | FSTRING:
1180           memset (iter->string, 255, iter->src->width);
1181           break;
1182         case MAX:
1183           iter->dbl[0] = -DBL_MAX;
1184           break;
1185         case MAX | FSTRING:
1186           memset (iter->string, 0, iter->src->width);
1187           break;
1188         default:
1189           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1190           iter->int1 = iter->int2 = 0;
1191           break;
1192         }
1193     }
1194 }
1195 \f
1196 /* Aggregate each case as it comes through.  Cases which aren't needed
1197    are dropped. */
1198 static int
1199 agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1200                    int case_num UNUSED)
1201 {
1202   int code = aggregate_single_case (c, compaction_case);
1203   debug_printf (("%d ", code));
1204   return code;
1205 }
1206
1207 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1208    write() method here because end_func is called so soon after all
1209    the cases have been output; very little has been cleaned up at this
1210    point. */
1211 static void
1212 agr_00x_end_func (void *aux UNUSED)
1213 {
1214   /* Ensure that info for the last break group gets written to the
1215      active file. */
1216   dump_aggregate_info (compaction_case);
1217   vfm_sink->class->write (vfm_sink, temp_case);
1218 }
1219
1220 /* Transform the aggregate case buf_1xx, in internal format, to system
1221    file format, in buf64_1xx, and write the resultant case to the
1222    system file. */
1223 static void
1224 write_case_to_sfm (void)
1225 {
1226   flt64 *p = buf64_1xx;
1227   int i;
1228
1229   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1230     {
1231       struct variable *v = dict_get_var (agr_dict, i);
1232       
1233       if (v->type == NUMERIC)
1234         {
1235           double src = buf_1xx->data[v->fv].f;
1236           if (src == SYSMIS)
1237             *p++ = -FLT64_MAX;
1238           else
1239             *p++ = src;
1240         }
1241       else
1242         {
1243           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1244           memset (&((char *) p)[v->width], ' ',
1245                   REM_RND_UP (v->width, sizeof (flt64)));
1246           p += DIV_RND_UP (v->width, sizeof (flt64));
1247         }
1248     }
1249
1250   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1251 }
1252
1253 /* Aggregate the current case and output it if we passed a
1254    breakpoint. */
1255 static int
1256 agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1257                    int case_num UNUSED)
1258 {
1259   int code = aggregate_single_case (c, buf_1xx);
1260
1261   assert (code == -2 || code == -1);
1262   if (code == -1)
1263     write_case_to_sfm ();
1264   return -1;
1265 }
1266
1267 /* Close the system file now that we're done with it.  */
1268 static void
1269 agr_10x_trns_free (struct trns_header *h UNUSED)
1270 {
1271   fh_close_handle (outfile);
1272 }
1273
1274 /* Ensure that info for the last break group gets written to the
1275    system file. */
1276 static void
1277 agr_10x_end_func (void *aux UNUSED)
1278 {
1279   dump_aggregate_info (buf_1xx);
1280   write_case_to_sfm ();
1281 }
1282
1283 /* When called with temp_case non-NULL (the normal case), runs the
1284    case through the aggregater and outputs it to the system file if
1285    appropriate.  If temp_case is NULL, finishes up writing the last
1286    case if necessary. */
1287 static int
1288 agr_11x_func (write_case_data wc_data UNUSED)
1289 {
1290   if (temp_case != NULL)
1291     {
1292       int code = aggregate_single_case (temp_case, buf_1xx);
1293       
1294       assert (code == -2 || code == -1);
1295       if (code == -1)
1296         write_case_to_sfm ();
1297     }
1298   else
1299     {
1300       if (case_count)
1301         {
1302           dump_aggregate_info (buf_1xx);
1303           write_case_to_sfm ();
1304         }
1305       fh_close_handle (outfile);
1306     }
1307   return 1;
1308 }
1309 \f
1310 /* Debugging. */
1311 #if DEBUGGING
1312 /* Print out useful debugging information. */
1313 static void
1314 debug_print (int flags)
1315 {
1316   printf ("AGGREGATE\n /OUTFILE=%s\n",
1317           outfile ? fh_handle_filename (outfile) : "*");
1318
1319   if (missing == COLUMNWISE)
1320     puts (" /MISSING=COLUMNWISE");
1321
1322   if (flags & 2)
1323     puts (" /DOCUMENT");
1324   if (flags & 4)
1325     puts (" /PRESORTED");
1326   
1327   {
1328     int i;
1329
1330     printf (" /BREAK=");
1331     for (i = 0; i < sort->var_cnt; i++)
1332       printf ("%s(%c) ", sort->vars[i]->name,
1333               sort->vars[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1334     putc ('\n', stdout);
1335   }
1336   
1337   {
1338     struct agr_var *iter;
1339     
1340     for (iter = agr_first; iter; iter = iter->next)
1341       {
1342         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1343         
1344         printf (" /%s", iter->dest->name);
1345         if (iter->dest->label)
1346           printf ("'%s'", iter->dest->label);
1347         printf ("=%s(%s", f->name, iter->src->name);
1348         if (f->n_args)
1349           {
1350             int i;
1351             
1352             for (i = 0; i < f->n_args; i++)
1353               {
1354                 putc (',', stdout);
1355                 if (iter->src->type == NUMERIC)
1356                   printf ("%g", iter->arg[i].f);
1357                 else
1358                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1359               }
1360           }
1361         printf (")\n");
1362       }
1363   }
1364 }
1365
1366 #endif /* DEBUGGING */