checkin of 0.3.0
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "approx.h"
25 #include "command.h"
26 #include "error.h"
27 #include "file-handle.h"
28 #include "lexer.h"
29 #include "misc.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #undef DEBUGGING
40 /*#define DEBUGGING 1*/
41 #include "debug-print.h"
42
43 /* Specifies how to make an aggregate variable. */
44 struct agr_var
45   {
46     struct agr_var *next;               /* Next in list. */
47
48     /* Collected during parsing. */
49     struct variable *src;       /* Source variable. */
50     struct variable *dest;      /* Target variable. */
51     int function;               /* Function. */
52     int include_missing;        /* 1=Include user-missing values. */
53     union value arg[2];         /* Arguments. */
54
55     /* Accumulated during AGGREGATE execution. */
56     double dbl[3];
57     int int1, int2;
58     char *string;
59     int missing;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70     FWEIGHT = 1<<6, /* Weighted function bit. */
71     FOPTIONS = FSTRING | FWEIGHT /* Function options mask. */
72   };
73
74 /* Attributes of an aggregation function. */
75 struct agr_func
76   {
77     const char *name;           /* Aggregation function name. */
78     int n_args;                 /* Number of arguments. */
79     int alpha_type;             /* When given ALPHA arguments, output type. */
80     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
81   };
82
83 /* Attributes of aggregation functions. */
84 static struct agr_func agr_func_tab[] = 
85   {
86     {"<NONE>",  0, -1,      {0, 0, 0}},
87     {"SUM",     0, -1,      {FMT_F, 8, 2}},
88     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
89     {"SD",      0, -1,      {FMT_F, 8, 2}},
90     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
91     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
92     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
93     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
94     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
95     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
96     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
97     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
98     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
99     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
100     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
102     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
103     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
104     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
105     {"LAST",    0, ALPHA,   {-1, -1, -1}},
106     {NULL,      0, -1,      {-1, -1, -1}},
107     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
108     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
109   };
110
111 /* Output file, or NULL for the active file. */
112 static struct file_handle *outfile;
113
114 /* Missing value types. */
115 enum
116   {
117     ITEMWISE,           /* Missing values item by item. */
118     COLUMNWISE          /* Missing values column by column. */
119   };
120
121 /* ITEMWISE or COLUMNWISE. */
122 static int missing;
123
124 /* Aggregate variables. */
125 static struct agr_var *agr_first, *agr_next;
126
127 /* Aggregate dictionary. */
128 static struct dictionary *agr_dict;
129
130 /* Number of cases passed through aggregation. */
131 static int case_count;
132
133 /* Last values of the break variables. */
134 static union value *prev_case;
135
136 /* Buffers for use by the 10x transformation. */
137 static flt64 *buf64_1xx;
138 static struct ccase *buf_1xx;
139
140 static void initialize_aggregate_info (void);
141
142 /* Prototypes. */
143 static int parse_aggregate_functions (void);
144 static void free_aggregate_functions (void);
145 static int aggregate_single_case (struct ccase *input, struct ccase *output);
146 static int create_sysfile (void);
147
148 static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
149 static void agr_00x_end_func (void);
150 static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
151 static void agr_10x_trns_free (struct trns_header *);
152 static void agr_10x_end_func (void);
153 static int agr_11x_func (void);
154
155 #if DEBUGGING
156 static void debug_print (int flags);
157 #endif
158 \f
159 /* Parsing. */
160
161 /* Parses and executes the AGGREGATE procedure. */
162 int
163 cmd_aggregate (void)
164 {
165   /* From sort.c. */
166   int parse_sort_variables (void);
167   
168   /* Have we seen these subcommands? */
169   unsigned seen = 0;
170
171   outfile = NULL;
172   missing = ITEMWISE;
173   v_sort = NULL;
174   prev_case = NULL;
175   
176   agr_dict = new_dictionary (1);
177   
178   lex_match_id ("AGGREGATE");
179
180   /* Read most of the subcommands. */
181   for (;;)
182     {
183       lex_match('/');
184       
185       if (lex_match_id ("OUTFILE"))
186         {
187           if (seen & 1)
188             {
189               free (v_sort);
190               free_dictionary (agr_dict);
191               msg (SE, _("OUTFILE specified multiple times."));
192               return CMD_FAILURE;
193             }
194           seen |= 1;
195               
196           lex_match ('=');
197           if (lex_match ('*'))
198             outfile = NULL;
199           else 
200             {
201               outfile = fh_parse_file_handle ();
202               if (outfile == NULL)
203                 {
204                   free (v_sort);
205                   free_dictionary (agr_dict);
206                   return CMD_FAILURE;
207                 }
208             }
209         }
210       else if (lex_match_id ("MISSING"))
211         {
212           lex_match ('=');
213           if (!lex_match_id ("COLUMNWISE"))
214             {
215               free (v_sort);
216               free_dictionary (agr_dict);
217               lex_error (_("while expecting COLUMNWISE"));
218               return CMD_FAILURE;
219             }
220           missing = COLUMNWISE;
221         }
222       else if (lex_match_id ("DOCUMENT"))
223         seen |= 2;
224       else if (lex_match_id ("PRESORTED"))
225         seen |= 4;
226       else if (lex_match_id ("BREAK"))
227         {
228           if (seen & 8)
229             {
230               free (v_sort);
231               free_dictionary (agr_dict);
232               msg (SE, _("BREAK specified multiple times."));
233               return CMD_FAILURE;
234             }
235           seen |= 8;
236
237           lex_match ('=');
238           if (!parse_sort_variables ())
239             {
240               free_dictionary (agr_dict);
241               return CMD_FAILURE;
242             }
243           
244           {
245             int i;
246             
247             for (i = 0; i < nv_sort; i++)
248               {
249                 struct variable *v;
250               
251                 v = dup_variable (agr_dict, v_sort[i], v_sort[i]->name);
252                 assert (v != NULL);
253               }
254           }
255         }
256       else break;
257     }
258
259   /* Check for proper syntax. */
260   if (!(seen & 8))
261     msg (SW, _("BREAK subcommand not specified."));
262       
263   /* Read in the aggregate functions. */
264   if (!parse_aggregate_functions ())
265     {
266       free_aggregate_functions ();
267       free (v_sort);
268       return CMD_FAILURE;
269     }
270
271   /* Delete documents. */
272   if (!(seen & 2))
273     {
274       free (agr_dict->documents);
275       agr_dict->documents = NULL;
276       agr_dict->n_documents = 0;
277     }
278
279   /* Cancel SPLIT FILE. */
280   default_dict.n_splits = 0;
281   free (default_dict.splits);
282   default_dict.splits = NULL;
283   
284 #if DEBUGGING
285   debug_print (seen);
286 #endif
287
288   /* Initialize. */
289   case_count = 0;
290   initialize_aggregate_info ();
291
292   /* How to implement all this... There are three important variables:
293      whether output is going to the active file (0) or a separate file
294      (1); whether the input data is presorted (0) or needs sorting
295      (1); whether there is a temporary transformation (1) or not (0).
296      The eight cases are as follows:
297
298      000 (0): Pass it through an aggregate transformation that
299      modifies the data.
300
301      001 (1): Cancel the temporary transformation and handle as 000.
302
303      010 (2): Set up a SORT CASES and aggregate the output, writing
304      the results to the active file.
305      
306      011 (3): Cancel the temporary transformation and handle as 010.
307
308      100 (4): Pass it through an aggregate transformation that doesn't
309      modify the data but merely writes it to the output file.
310
311      101 (5): Handled as 100.
312
313      110 (6): Set up a SORT CASES and capture the output, aggregate
314      it, write it to the output file without modifying the active
315      file.
316
317      111 (7): Handled as 110. */
318   
319   {
320     unsigned type = 0;
321
322     if (outfile != NULL)
323       type |= 4;
324     if (nv_sort != 0 && (seen & 4) == 0)
325       type |= 2;
326     if (temporary)
327       type |= 1;
328
329     switch (type)
330       {
331       case 3:
332         cancel_temporary ();
333         /* fall through */
334       case 2:
335         sort_cases (0);
336         goto case0;
337           
338       case 1:
339         cancel_temporary ();
340         /* fall through */
341       case 0:
342       case0:
343         {
344           struct trns_header *t = xmalloc (sizeof *t);
345           t->proc = agr_00x_trns_proc;
346           t->free = NULL;
347           add_transformation (t);
348           
349           temporary = 2;
350           temp_dict = agr_dict;
351           temp_trns = n_trns;
352           
353           agr_dict = NULL;
354
355           procedure (NULL, NULL, agr_00x_end_func);
356           break;
357         }
358
359       case 4:
360       case 5:
361         {
362           if (!create_sysfile ())
363             goto lossage;
364           
365           {
366             struct trns_header *t = xmalloc (sizeof *t);
367             t->proc = agr_10x_trns_proc;
368             t->free = agr_10x_trns_free;
369             add_transformation (t);
370
371             procedure (NULL, NULL, agr_10x_end_func);
372           }
373           
374           break;
375         }
376           
377       case 6:
378       case 7:
379         sort_cases (1);
380         
381         if (!create_sysfile ())
382           goto lossage;
383         read_sort_output (agr_11x_func);
384         
385         {
386           struct ccase *save_temp_case = temp_case;
387           temp_case = NULL;
388           agr_11x_func ();
389           temp_case = save_temp_case;
390         }
391         
392         break;
393
394       default:
395         assert (0);
396       }
397   }
398   
399   free (buf64_1xx);
400   free (buf_1xx);
401   
402   /* Clean up. */
403   free (v_sort);
404   free_aggregate_functions ();
405   free (prev_case);
406   
407   return CMD_SUCCESS;
408
409 lossage:
410   /* Clean up. */
411   free (v_sort);
412   free_aggregate_functions ();
413   free (prev_case);
414
415   return CMD_FAILURE;
416 }
417
418 /* Create a system file for use in aggregation to an external file,
419    and allocate temporary buffers for writing out cases. */
420 static int
421 create_sysfile (void)
422 {
423   struct sfm_write_info w;
424   w.h = outfile;
425   w.dict = agr_dict;
426   w.compress = set_scompression;
427   if (!sfm_write_dictionary (&w))
428     {
429       free_aggregate_functions ();
430       free (v_sort);
431       free_dictionary (agr_dict);
432       return 0;
433     }
434     
435   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
436   buf_1xx = xmalloc (sizeof (struct ccase) + sizeof (union value) * (agr_dict->nval - 1));
437
438   return 1;
439 }
440
441 /* Parse all the aggregate functions. */
442 static int
443 parse_aggregate_functions (void)
444 {
445   agr_first = agr_next = NULL;
446
447   /* Anticipate weighting for optimization later. */
448   update_weighting (&default_dict);
449   
450   /* Parse everything. */
451   for (;;)
452     {
453       char **dest;
454       char **dest_label;
455       int n_dest;
456
457       int include_missing;
458       struct agr_func *function;
459       int func_index;
460
461       union value arg[2];
462
463       struct variable **src;
464       int n_src;
465
466       int i;
467
468       dest = NULL;
469       dest_label = NULL;
470       n_dest = 0;
471       src = NULL;
472       n_src = 0;
473       arg[0].c = NULL;
474       arg[1].c = NULL;
475
476       /* Parse the list of target variables. */
477       while (!lex_match ('='))
478         {
479           int n_dest_prev = n_dest;
480           
481           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
482             goto lossage;
483
484           /* Assign empty labels. */
485           {
486             int j;
487
488             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
489             for (j = n_dest_prev; j < n_dest; j++)
490               dest_label[j] = NULL;
491           }
492           
493           if (token == T_STRING)
494             {
495               ds_truncate (&tokstr, 120);
496               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
497               lex_get ();
498             }
499         }
500
501       /* Get the name of the aggregation function. */
502       if (token != T_ID)
503         {
504           lex_error (_("expecting aggregation function"));
505           goto lossage;
506         }
507
508       include_missing = 0;
509       if (tokid[strlen (tokid) - 1] == '.')
510         {
511           include_missing = 1;
512           tokid[strlen (tokid) - 1] = 0;
513         }
514       
515       for (function = agr_func_tab; function->name; function++)
516         if (!strcmp (function->name, tokid))
517           break;
518       if (NULL == function->name)
519         {
520           msg (SE, _("Unknown aggregation function %s."), tokid);
521           goto lossage;
522         }
523       func_index = function - agr_func_tab;
524       lex_get ();
525
526       /* Check for leading lparen. */
527       if (!lex_match ('('))
528         {
529           if (func_index == N)
530             func_index = N_NO_VARS;
531           else if (func_index == NU)
532             func_index = NU_NO_VARS;
533           else
534             {
535               lex_error (_("expecting `('"));
536               goto lossage;
537             }
538         } else {
539           /* Parse list of source variables. */
540           {
541             int pv_opts = PV_NO_SCRATCH;
542
543             if (func_index == SUM || func_index == MEAN || func_index == SD)
544               pv_opts |= PV_NUMERIC;
545             else if (function->n_args)
546               pv_opts |= PV_SAME_TYPE;
547
548             if (!parse_variables (&default_dict, &src, &n_src, pv_opts))
549               goto lossage;
550           }
551
552           /* Parse function arguments, for those functions that
553              require arguments. */
554           if (function->n_args != 0)
555             for (i = 0; i < function->n_args; i++)
556               {
557                 int type;
558             
559                 lex_match (',');
560                 if (token == T_STRING)
561                   {
562                     arg[i].c = xstrdup (ds_value (&tokstr));
563                     type = ALPHA;
564                   }
565                 else if (token == T_NUM)
566                   {
567                     arg[i].f = tokval;
568                     type = NUMERIC;
569                   } else {
570                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
571                     goto lossage;
572                   }
573             
574                 lex_get ();
575
576                 if (type != src[0]->type)
577                   {
578                     msg (SE, _("Arguments to %s must be of same type as "
579                                "source variables."),
580                          function->name);
581                     goto lossage;
582                   }
583               }
584
585           /* Trailing rparen. */
586           if (!lex_match(')'))
587             {
588               lex_error (_("expecting `)'"));
589               goto lossage;
590             }
591           
592           /* Now check that the number of source variables match the
593              number of target variables.  Do this here because if we
594              do it earlier then the user can get very misleading error
595              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
596              error message when a proper message would be more like
597              `unknown variable t'. */
598           if (n_src != n_dest)
599             {
600               msg (SE, _("Number of source variables (%d) does not match "
601                          "number of target variables (%d)."),
602                    n_src, n_dest);
603               goto lossage;
604             }
605         }
606         
607       /* Finally add these to the linked list of aggregation
608          variables. */
609       for (i = 0; i < n_dest; i++)
610         {
611           struct agr_var *v = xmalloc (sizeof *v);
612
613           /* Add variable to chain. */
614           if (agr_first)
615             agr_next = agr_next->next = v;
616           else
617             agr_first = agr_next = v;
618           agr_next->next = NULL;
619           
620           /* Create the target variable in the aggregate
621              dictionary. */
622           {
623             struct variable *destvar;
624             
625             agr_next->function = func_index;
626
627             if (src)
628               {
629                 int output_type;
630
631                 agr_next->src = src[i];
632                 
633                 if (src[i]->type == ALPHA)
634                   {
635                     agr_next->function |= FSTRING;
636                     agr_next->string = xmalloc (src[i]->width);
637                   }
638                 
639                 if (default_dict.weight_index != -1)
640                   agr_next->function |= FWEIGHT;
641
642                 if (agr_next->src->type == NUMERIC)
643                   output_type = NUMERIC;
644                 else
645                   output_type = function->alpha_type;
646
647                 if (function->alpha_type == ALPHA)
648                   destvar = dup_variable (agr_dict, agr_next->src, dest[i]);
649                 else
650                   {
651                     destvar = create_variable (agr_dict, dest[i], output_type,
652                                                agr_next->src->width);
653                     if (output_type == NUMERIC)
654                       destvar->print = destvar->write = function->format;
655                     if (output_type == NUMERIC && default_dict.weight_index != -1
656                         && (func_index == N || func_index == N_NO_VARS
657                             || func_index == NU || func_index == NU_NO_VARS))
658                       {
659                         struct fmt_spec f = {FMT_F, 8, 2};
660                       
661                         destvar->print = destvar->write = f;
662                       }
663                   }
664               } else {
665                 agr_next->src = NULL;
666                 destvar = create_variable (agr_dict, dest[i], NUMERIC, 0);
667               }
668           
669             if (!destvar)
670               {
671                 msg (SE, _("Variable name %s is not unique within the "
672                            "aggregate file dictionary, which contains "
673                            "the aggregate variables and the break "
674                            "variables."),
675                      dest[i]);
676                 free (dest[i]);
677                 goto lossage;
678               }
679
680             free (dest[i]);
681             if (dest_label[i])
682               {
683                 destvar->label = dest_label[i];
684                 dest_label[i] = NULL;
685               }
686             else if (function->alpha_type == ALPHA)
687               destvar->print = destvar->write = function->format;
688
689             agr_next->dest = destvar;
690           }
691           
692           agr_next->include_missing = include_missing;
693
694           if (agr_next->src != NULL)
695             {
696               int j;
697
698               if (agr_next->src->type == NUMERIC)
699                 for (j = 0; j < function->n_args; j++)
700                   agr_next->arg[j].f = arg[j].f;
701               else
702                 for (j = 0; j < function->n_args; j++)
703                   agr_next->arg[j].c = xstrdup (arg[j].c);
704             }
705         }
706       
707       if (src != NULL && src[0]->type == ALPHA)
708         for (i = 0; i < function->n_args; i++)
709           {
710             free (arg[i].c);
711             arg[i].c = NULL;
712           }
713
714       free (src);
715       free (dest);
716       free (dest_label);
717
718       if (!lex_match ('/'))
719         {
720           if (token == '.')
721             return 1;
722
723           lex_error ("expecting end of command");
724           return 0;
725         }
726       continue;
727       
728     lossage:
729       for (i = 0; i < n_dest; i++)
730         {
731           free (dest[i]);
732           free (dest_label[i]);
733         }
734       free (dest);
735       free (dest_label);
736       free (arg[0].c);
737       free (arg[1].c);
738       if (src && n_src && src[0]->type == ALPHA)
739         for (i = 0; i < function->n_args; i++)
740           {
741             free(arg[i].c);
742             arg[i].c = NULL;
743           }
744       free (src);
745         
746       return 0;
747     }
748 }
749
750 /* Frees all the state for the AGGREGATE procedure. */
751 static void
752 free_aggregate_functions (void)
753 {
754   struct agr_var *iter, *next;
755
756   if (agr_dict)
757     free_dictionary (agr_dict);
758   for (iter = agr_first; iter; iter = next)
759     {
760       next = iter->next;
761
762       if (iter->function & FSTRING)
763         {
764           int n_args;
765           int i;
766
767           n_args = agr_func_tab[iter->function & FUNC].n_args;
768           for (i = 0; i < n_args; i++)
769             free (iter->arg[i].c);
770           free (iter->string);
771         }
772       free (iter);
773     }
774 }
775 \f
776 /* Execution. */
777
778 static void accumulate_aggregate_info (struct ccase *input);
779 static void dump_aggregate_info (struct ccase *output);
780
781 /* Processes a single case INPUT for aggregation.  If output is
782    warranted, it is written to case OUTPUT, which may be (but need not
783    be) an alias to INPUT.  Returns -1 when output is performed, -2
784    otherwise. */
785 /* The code in this function has an eerie similarity to
786    vfm.c:SPLIT_FILE_procfunc()... */
787 static int
788 aggregate_single_case (struct ccase *input, struct ccase *output)
789 {
790   /* The first case always begins a new break group.  We also need to
791      preserve the values of the case for later comparison. */
792   if (case_count++ == 0)
793     {
794       int n_elem = 0;
795       
796       {
797         int i;
798
799         for (i = 0; i < nv_sort; i++)
800           n_elem += v_sort[i]->nv;
801       }
802       
803       prev_case = xmalloc (sizeof *prev_case * n_elem);
804
805       /* Copy INPUT into prev_case. */
806       {
807         union value *iter = prev_case;
808         int i;
809
810         for (i = 0; i < nv_sort; i++)
811           {
812             struct variable *v = v_sort[i];
813             
814             if (v->type == NUMERIC)
815               (iter++)->f = input->data[v->fv].f;
816             else
817               {
818                 memcpy (iter->s, input->data[v->fv].s, v->width);
819                 iter += v->nv;
820               }
821           }
822       }
823             
824       accumulate_aggregate_info (input);
825         
826       return -2;
827     }
828       
829   /* Compare the value of each break variable to the values on the
830      previous case. */
831   {
832     union value *iter = prev_case;
833     int i;
834     
835     for (i = 0; i < nv_sort; i++)
836       {
837         struct variable *v = v_sort[i];
838       
839         switch (v->type)
840           {
841           case NUMERIC:
842             if (approx_ne (input->data[v->fv].f, iter->f))
843               goto not_equal;
844             iter++;
845             break;
846           case ALPHA:
847             if (memcmp (input->data[v->fv].s, iter->s, v->width))
848               goto not_equal;
849             iter += v->nv;
850             break;
851           default:
852             assert (0);
853           }
854       }
855   }
856
857   accumulate_aggregate_info (input);
858
859   return -2;
860   
861 not_equal:
862   /* The values of the break variable are different from the values on
863      the previous case.  That means that it's time to dump aggregate
864      info. */
865   dump_aggregate_info (output);
866   initialize_aggregate_info ();
867   accumulate_aggregate_info (input);
868
869   /* Copy INPUT into prev_case. */
870   {
871     union value *iter = prev_case;
872     int i;
873
874     for (i = 0; i < nv_sort; i++)
875       {
876         struct variable *v = v_sort[i];
877             
878         if (v->type == NUMERIC)
879           (iter++)->f = input->data[v->fv].f;
880         else
881           {
882             memcpy (iter->s, input->data[v->fv].s, v->width);
883             iter += v->nv;
884           }
885       }
886   }
887   
888   return -1;
889 }
890
891 /* Accumulates aggregation data from the case INPUT. */
892 static void 
893 accumulate_aggregate_info (struct ccase *input)
894 {
895   struct agr_var *iter;
896
897 #define WEIGHT (input->data[default_dict.weight_index].f)
898
899   for (iter = agr_first; iter; iter = iter->next)
900     if (iter->src)
901       {
902         union value *v = &input->data[iter->src->fv];
903
904         if ((!iter->include_missing && is_missing (v, iter->src))
905             || (iter->include_missing && iter->src->type == NUMERIC
906                 && v->f == SYSMIS))
907           {
908             switch (iter->function)
909               {
910               case NMISS | FWEIGHT:
911                 iter->dbl[0] += WEIGHT;
912                 break;
913               case NMISS:
914               case NUMISS:
915               case NUMISS | FWEIGHT:
916                 iter->int1++;
917                 break;
918               }
919             iter->missing = 1;
920             continue;
921           }
922         
923         /* This is horrible.  There are too many possibilities. */
924         switch (iter->function)
925           {
926           case SUM:
927           case SUM | FWEIGHT:
928             iter->dbl[0] += v->f;
929             break;
930           case MEAN:
931             iter->dbl[0] += v->f;
932             iter->int1++;
933             break;
934           case MEAN | FWEIGHT:
935             {
936               double w = WEIGHT;
937               iter->dbl[0] += v->f * w;
938               iter->dbl[1] += w;
939               break;
940             }
941           case SD:
942             iter->dbl[0] += v->f;
943             iter->dbl[1] += v->f * v->f;
944             iter->int1++;
945             break;
946           case SD | FWEIGHT:
947             {
948               double w = WEIGHT;
949               double product = v->f * w;
950               iter->dbl[0] += product;
951               iter->dbl[1] += product * v->f;
952               iter->dbl[2] += w;
953               break;
954             }
955           case MAX:
956           case MAX | FWEIGHT:
957             iter->dbl[0] = max (iter->dbl[0], v->f);
958             iter->int1 = 1;
959             break;
960           case MAX | FSTRING:
961           case MAX | FSTRING | FWEIGHT:
962             if (memcmp (iter->string, v->s, iter->src->width) < 0)
963               memcpy (iter->string, v->s, iter->src->width);
964             iter->int1 = 1;
965             break;
966           case MIN:
967           case MIN | FWEIGHT:
968             iter->dbl[0] = min (iter->dbl[0], v->f);
969             iter->int1 = 1;
970             break;
971           case MIN | FSTRING:
972           case MIN | FSTRING | FWEIGHT:
973             if (memcmp (iter->string, v->s, iter->src->width) > 0)
974               memcpy (iter->string, v->s, iter->src->width);
975             iter->int1 = 1;
976             break;
977           case FGT:
978           case PGT:
979             if (approx_gt (v->f, iter->arg[0].f))
980               iter->int1++;
981             iter->int2++;
982             break;
983           case FGT | FWEIGHT:
984           case PGT | FWEIGHT:
985             {
986               double w = WEIGHT;
987               if (approx_gt (v->f, iter->arg[0].f))
988                 iter->dbl[0] += w;
989               iter->dbl[1] += w;
990               break;
991             }
992           case FGT | FSTRING:
993           case PGT | FSTRING:
994             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
995               iter->int1++;
996             iter->int2++;
997             break;
998           case FGT | FSTRING | FWEIGHT:
999           case PGT | FSTRING | FWEIGHT:
1000             {
1001               double w = WEIGHT;
1002               if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
1003                 iter->dbl[0] += w;
1004               iter->dbl[1] += w;
1005               break;
1006             }
1007           case FLT:
1008           case PLT:
1009             if (approx_lt (v->f, iter->arg[0].f))
1010               iter->int1++;
1011             iter->int2++;
1012             break;
1013           case FLT | FWEIGHT:
1014           case PLT | FWEIGHT:
1015             {
1016               double w = WEIGHT;
1017               if (approx_lt (v->f, iter->arg[0].f))
1018                 iter->dbl[0] += w;
1019               iter->dbl[1] += w;
1020               break;
1021             }
1022           case FLT | FSTRING:
1023           case PLT | FSTRING:
1024             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
1025               iter->int1++;
1026             iter->int2++;
1027             break;
1028           case FLT | FSTRING | FWEIGHT:
1029           case PLT | FSTRING | FWEIGHT:
1030             {
1031               double w = WEIGHT;
1032               if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
1033                 iter->dbl[0] += w;
1034               iter->dbl[1] += w;
1035               break;
1036             }
1037           case FIN:
1038           case PIN:
1039             if (approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
1040               iter->int1++;
1041             iter->int2++;
1042             break;
1043           case FIN | FWEIGHT:
1044           case PIN | FWEIGHT:
1045             {
1046               double w = WEIGHT;
1047               if (approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
1048                 iter->dbl[0] += w;
1049               iter->dbl[1] += w;
1050               break;
1051             }
1052           case FIN | FSTRING:
1053           case PIN | FSTRING:
1054             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
1055                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
1056               iter->int1++;
1057             iter->int2++;
1058             break;
1059           case FIN | FSTRING | FWEIGHT:
1060           case PIN | FSTRING | FWEIGHT:
1061             {
1062               double w = WEIGHT;
1063               if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
1064                   && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
1065                 iter->dbl[0] += w;
1066               iter->dbl[1] += w;
1067               break;
1068             }
1069           case FOUT:
1070           case POUT:
1071             if (!approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
1072               iter->int1++;
1073             iter->int2++;
1074             break;
1075           case FOUT | FWEIGHT:
1076           case POUT | FWEIGHT:
1077             {
1078               double w = WEIGHT;
1079               if (!approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
1080                 iter->dbl[0] += w;
1081               iter->dbl[1] += w;
1082               break;
1083             }
1084           case FOUT | FSTRING:
1085           case POUT | FSTRING:
1086             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
1087                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
1088               iter->int1++;
1089             iter->int2++;
1090             break;
1091           case FOUT | FSTRING | FWEIGHT:
1092           case POUT | FSTRING | FWEIGHT:
1093             {
1094               double w = WEIGHT;
1095               if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
1096                   && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
1097                 iter->dbl[0] += w;
1098               iter->dbl[1] += w;
1099               break;
1100             }
1101           case N | FWEIGHT:
1102             iter->dbl[0] += WEIGHT;
1103             break;
1104           case N:
1105           case NU:
1106           case NU | FWEIGHT:
1107             iter->int1++;
1108             break;
1109           case FIRST:
1110           case FIRST | FWEIGHT:
1111             if (iter->int1 == 0)
1112               {
1113                 iter->dbl[0] = v->f;
1114                 iter->int1 = 1;
1115               }
1116             break;
1117           case FIRST | FSTRING:
1118           case FIRST | FSTRING | FWEIGHT:
1119             if (iter->int1 == 0)
1120               {
1121                 memcpy (iter->string, v->s, iter->src->width);
1122                 iter->int1 = 1;
1123               }
1124             break;
1125           case LAST:
1126           case LAST | FWEIGHT:
1127             iter->dbl[0] = v->f;
1128             iter->int1 = 1;
1129             break;
1130           case LAST | FSTRING:
1131           case LAST | FSTRING | FWEIGHT:
1132             memcpy (iter->string, v->s, iter->src->width);
1133             iter->int1 = 1;
1134             break;
1135           default:
1136             assert (0);
1137           }
1138     } else {
1139       switch (iter->function)
1140         {
1141         case N_NO_VARS | FWEIGHT:
1142           iter->dbl[0] += WEIGHT;
1143           break;
1144         case N_NO_VARS:
1145         case NU_NO_VARS:
1146         case NU_NO_VARS | FWEIGHT:
1147           iter->int1++;
1148           break;
1149         default:
1150           assert (0);
1151         }
1152     }
1153 }
1154
1155 /* We've come to a record that differs from the previous in one or
1156    more of the break variables.  Make an output record from the
1157    accumulated statistics in the OUTPUT case. */
1158 static void 
1159 dump_aggregate_info (struct ccase *output)
1160 {
1161   debug_printf (("(dumping "));
1162   
1163   {
1164     int n_elem = 0;
1165     
1166     {
1167       int i;
1168
1169       for (i = 0; i < nv_sort; i++)
1170         n_elem += v_sort[i]->nv;
1171     }
1172     debug_printf (("n_elem=%d:", n_elem));
1173     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1174   }
1175   
1176   {
1177     struct agr_var *i;
1178   
1179     for (i = agr_first; i; i = i->next)
1180       {
1181         union value *v = &output->data[i->dest->fv];
1182
1183         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1184
1185         if (missing == COLUMNWISE && i->missing != 0
1186             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1187             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1188           {
1189             if (i->function & FSTRING)
1190               memset (v->s, ' ', i->dest->width);
1191             else
1192               v->f = SYSMIS;
1193             continue;
1194           }
1195         
1196         switch (i->function)
1197           {
1198           case SUM:
1199           case SUM | FWEIGHT:
1200             v->f = i->dbl[0];
1201             break;
1202           case MEAN:
1203             v->f = i->int1 ? i->dbl[0] / i->int1 : SYSMIS;
1204             break;
1205           case MEAN | FWEIGHT:
1206             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1207             break;
1208           case SD:
1209             v->f = ((i->int1 > 1)
1210                     ? calc_stddev (calc_variance (i->dbl, i->int1))
1211                     : SYSMIS);
1212             break;
1213           case SD | FWEIGHT:
1214             v->f = ((i->dbl[2] > 1.0)
1215                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1216                     : SYSMIS);
1217             break;
1218           case MAX:
1219           case MAX | FWEIGHT:
1220           case MIN:
1221           case MIN | FWEIGHT:
1222             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1223             break;
1224           case MAX | FSTRING:
1225           case MAX | FSTRING | FWEIGHT:
1226           case MIN | FSTRING:
1227           case MIN | FSTRING | FWEIGHT:
1228             if (i->int1)
1229               memcpy (v->s, i->string, i->dest->width);
1230             else
1231               memset (v->s, ' ', i->dest->width);
1232             break;
1233           case FGT:
1234           case FGT | FSTRING:
1235           case FLT:
1236           case FLT | FSTRING:
1237           case FIN:
1238           case FIN | FSTRING:
1239           case FOUT:
1240           case FOUT | FSTRING:
1241             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1242             break;
1243           case FGT | FWEIGHT:
1244           case FGT | FSTRING | FWEIGHT:
1245           case FLT | FWEIGHT:
1246           case FLT | FSTRING | FWEIGHT:
1247           case FIN | FWEIGHT:
1248           case FIN | FSTRING | FWEIGHT:
1249           case FOUT | FWEIGHT:
1250           case FOUT | FSTRING | FWEIGHT:
1251             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1252             break;
1253           case PGT:
1254           case PGT | FSTRING:
1255           case PLT:
1256           case PLT | FSTRING:
1257           case PIN:
1258           case PIN | FSTRING:
1259           case POUT:
1260           case POUT | FSTRING:
1261             v->f = (i->int2
1262                     ? (double) i->int1 / (double) i->int2 * 100.0
1263                     : SYSMIS);
1264             break;
1265           case PGT | FWEIGHT:
1266           case PGT | FSTRING | FWEIGHT:
1267           case PLT | FWEIGHT:
1268           case PLT | FSTRING | FWEIGHT:
1269           case PIN | FWEIGHT:
1270           case PIN | FSTRING | FWEIGHT:
1271           case POUT | FWEIGHT:
1272           case POUT | FSTRING | FWEIGHT:
1273             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1274             break;
1275           case N | FWEIGHT:
1276             v->f = i->dbl[0];
1277           case N:
1278           case NU:
1279           case NU | FWEIGHT:
1280             v->f = i->int1;
1281             break;
1282           case FIRST:
1283           case FIRST | FWEIGHT:
1284           case LAST:
1285           case LAST | FWEIGHT:
1286             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1287             break;
1288           case FIRST | FSTRING:
1289           case FIRST | FSTRING | FWEIGHT:
1290           case LAST | FSTRING:
1291           case LAST | FSTRING | FWEIGHT:
1292             if (i->int1)
1293               memcpy (v->s, i->string, i->dest->width);
1294             else
1295               memset (v->s, ' ', i->dest->width);
1296             break;
1297           case N_NO_VARS | FWEIGHT:
1298             v->f = i->dbl[0];
1299             break;
1300           case N_NO_VARS:
1301           case NU_NO_VARS:
1302           case NU_NO_VARS | FWEIGHT:
1303             v->f = i->int1;
1304             break;
1305           case NMISS | FWEIGHT:
1306             v->f = i->dbl[0];
1307             break;
1308           case NMISS:
1309           case NUMISS:
1310           case NUMISS | FWEIGHT:
1311             v->f = i->int1;
1312             break;
1313           default:
1314             assert (0);
1315           }
1316       }
1317   }
1318   debug_printf ((") "));
1319 }
1320
1321 /* Resets the state for all the aggregate functions. */
1322 static void
1323 initialize_aggregate_info (void)
1324 {
1325   struct agr_var *iter;
1326
1327   for (iter = agr_first; iter; iter = iter->next)
1328     {
1329       int plain_function = iter->function & ~FWEIGHT;
1330
1331       iter->missing = 0;
1332       switch (plain_function)
1333         {
1334         case MIN:
1335           iter->dbl[0] = DBL_MAX;
1336           break;
1337         case MIN | FSTRING:
1338           memset (iter->string, 255, iter->src->width);
1339           break;
1340         case MAX:
1341           iter->dbl[0] = -DBL_MAX;
1342           break;
1343         case MAX | FSTRING:
1344           memset (iter->string, 0, iter->src->width);
1345           break;
1346         default:
1347           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1348           iter->int1 = iter->int2 = 0;
1349           break;
1350         }
1351     }
1352 }
1353 \f
1354 /* Aggregate each case as it comes through.  Cases which aren't needed
1355    are dropped. */
1356 static int
1357 agr_00x_trns_proc (struct trns_header *h unused, struct ccase *c)
1358 {
1359   int code = aggregate_single_case (c, compaction_case);
1360   debug_printf (("%d ", code));
1361   return code;
1362 }
1363
1364 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1365    write() method here because end_func is called so soon after all
1366    the cases have been output; very little has been cleaned up at this
1367    point. */
1368 static void
1369 agr_00x_end_func (void)
1370 {
1371   /* Ensure that info for the last break group gets written to the
1372      active file. */
1373   dump_aggregate_info (compaction_case);
1374   vfm_sink_info.ncases++;
1375   vfm_sink->write ();
1376 }
1377
1378 /* Transform the aggregate case buf_1xx, in internal format, to system
1379    file format, in buf64_1xx, and write the resultant case to the
1380    system file. */
1381 static void
1382 write_case_to_sfm (void)
1383 {
1384   flt64 *p = buf64_1xx;
1385   int i;
1386
1387   for (i = 0; i < agr_dict->nvar; i++)
1388     {
1389       struct variable *v = agr_dict->var[i];
1390       
1391       if (v->type == NUMERIC)
1392         {
1393           double src = buf_1xx->data[v->fv].f;
1394           if (src == SYSMIS)
1395             *p++ = -FLT64_MAX;
1396           else
1397             *p++ = src;
1398         }
1399       else
1400         {
1401           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1402           memset (&((char *) p)[v->width], ' ',
1403                   REM_RND_UP (v->width, sizeof (flt64)));
1404           p += DIV_RND_UP (v->width, sizeof (flt64));
1405         }
1406     }
1407
1408   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1409 }
1410
1411 /* Aggregate the current case and output it if we passed a
1412    breakpoint. */
1413 static int
1414 agr_10x_trns_proc (struct trns_header *h unused, struct ccase *c)
1415 {
1416   int code = aggregate_single_case (c, buf_1xx);
1417
1418   assert (code == -2 || code == -1);
1419   if (code == -1)
1420     write_case_to_sfm ();
1421   return -1;
1422 }
1423
1424 /* Close the system file now that we're done with it.  */
1425 static void
1426 agr_10x_trns_free (struct trns_header *h unused)
1427 {
1428   fh_close_handle (outfile);
1429 }
1430
1431 /* Ensure that info for the last break group gets written to the
1432    system file. */
1433 static void
1434 agr_10x_end_func (void)
1435 {
1436   dump_aggregate_info (buf_1xx);
1437   write_case_to_sfm ();
1438 }
1439
1440 /* When called with temp_case non-NULL (the normal case), runs the
1441    case through the aggregater and outputs it to the system file if
1442    appropriate.  If temp_case is NULL, finishes up writing the last
1443    case if necessary. */
1444 static int
1445 agr_11x_func (void)
1446 {
1447   if (temp_case != NULL)
1448     {
1449       int code = aggregate_single_case (temp_case, buf_1xx);
1450       
1451       assert (code == -2 || code == -1);
1452       if (code == -1)
1453         write_case_to_sfm ();
1454     }
1455   else
1456     {
1457       if (case_count)
1458         {
1459           dump_aggregate_info (buf_1xx);
1460           write_case_to_sfm ();
1461         }
1462       fh_close_handle (outfile);
1463     }
1464   return 1;
1465 }
1466 \f
1467 /* Debugging. */
1468 #if DEBUGGING
1469 /* Print out useful debugging information. */
1470 static void
1471 debug_print (int flags)
1472 {
1473   printf ("AGGREGATE\n /OUTFILE=%s\n",
1474           outfile ? fh_handle_filename (outfile) : "*");
1475
1476   if (missing == COLUMNWISE)
1477     puts (" /MISSING=COLUMNWISE");
1478
1479   if (flags & 2)
1480     puts (" /DOCUMENT");
1481   if (flags & 4)
1482     puts (" /PRESORTED");
1483   
1484   {
1485     int i;
1486
1487     printf (" /BREAK=");
1488     for (i = 0; i < nv_sort; i++)
1489       printf ("%s(%c) ", v_sort[i]->name,
1490               v_sort[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1491     putc ('\n', stdout);
1492   }
1493   
1494   {
1495     struct agr_var *iter;
1496     
1497     for (iter = agr_first; iter; iter = iter->next)
1498       {
1499         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1500         
1501         printf (" /%s", iter->dest->name);
1502         if (iter->dest->label)
1503           printf ("'%s'", iter->dest->label);
1504         printf ("=%s(%s", f->name, iter->src->name);
1505         if (f->n_args)
1506           {
1507             int i;
1508             
1509             for (i = 0; i < f->n_args; i++)
1510               {
1511                 putc (',', stdout);
1512                 if (iter->src->type == NUMERIC)
1513                   printf ("%g", iter->arg[i].f);
1514                 else
1515                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1516               }
1517           }
1518         printf (")\n");
1519       }
1520   }
1521 }
1522
1523 #endif /* DEBUGGING */