Beginning of VFM cleanup.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #include "debug-print.h"
40
41 /* Specifies how to make an aggregate variable. */
42 struct agr_var
43   {
44     struct agr_var *next;               /* Next in list. */
45
46     /* Collected during parsing. */
47     struct variable *src;       /* Source variable. */
48     struct variable *dest;      /* Target variable. */
49     int function;               /* Function. */
50     int include_missing;        /* 1=Include user-missing values. */
51     union value arg[2];         /* Arguments. */
52
53     /* Accumulated during AGGREGATE execution. */
54     double dbl[3];
55     int int1, int2;
56     char *string;
57     int missing;
58   };
59
60 /* Aggregation functions. */
61 enum
62   {
63     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
64     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
65     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
66     FUNC = 0x1f, /* Function mask. */
67     FSTRING = 1<<5, /* String function bit. */
68   };
69
70 /* Attributes of an aggregation function. */
71 struct agr_func
72   {
73     const char *name;           /* Aggregation function name. */
74     int n_args;                 /* Number of arguments. */
75     int alpha_type;             /* When given ALPHA arguments, output type. */
76     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
77   };
78
79 /* Attributes of aggregation functions. */
80 static const struct agr_func agr_func_tab[] = 
81   {
82     {"<NONE>",  0, -1,      {0, 0, 0}},
83     {"SUM",     0, -1,      {FMT_F, 8, 2}},
84     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
85     {"SD",      0, -1,      {FMT_F, 8, 2}},
86     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
87     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
88     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
89     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
90     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
91     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
92     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
94     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
95     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
96     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
100     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
101     {"LAST",    0, ALPHA,   {-1, -1, -1}},
102     {NULL,      0, -1,      {-1, -1, -1}},
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
105   };
106
107 /* Output file, or NULL for the active file. */
108 static struct file_handle *outfile;
109
110 /* Missing value types. */
111 enum
112   {
113     ITEMWISE,           /* Missing values item by item. */
114     COLUMNWISE          /* Missing values column by column. */
115   };
116
117 /* ITEMWISE or COLUMNWISE. */
118 static int missing;
119
120 /* Sort program. */
121 static struct sort_cases_pgm *sort;
122
123 /* Aggregate variables. */
124 static struct agr_var *agr_first, *agr_next;
125
126 /* Aggregate dictionary. */
127 static struct dictionary *agr_dict;
128
129 /* Number of cases passed through aggregation. */
130 static int case_count;
131
132 /* Last values of the break variables. */
133 static union value *prev_case;
134
135 /* Buffers for use by the 10x transformation. */
136 static flt64 *buf64_1xx;
137 static struct ccase *buf_1xx;
138
139 static void initialize_aggregate_info (void);
140
141 /* Prototypes. */
142 static int parse_aggregate_functions (void);
143 static void free_aggregate_functions (void);
144 static int aggregate_single_case (struct ccase *input, struct ccase *output);
145 static int create_sysfile (void);
146
147 static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
148 static void agr_00x_end_func (void *);
149 static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
150 static void agr_10x_trns_free (struct trns_header *);
151 static void agr_10x_end_func (void *);
152 static int agr_11x_func (write_case_data);
153
154 #if DEBUGGING
155 static void debug_print (int flags);
156 #endif
157 \f
158 /* Parsing. */
159
160 /* Parses and executes the AGGREGATE procedure. */
161 int
162 cmd_aggregate (void)
163 {
164   /* Have we seen these subcommands? */
165   unsigned seen = 0;
166
167   outfile = NULL;
168   missing = ITEMWISE;
169   sort = NULL;
170   prev_case = NULL;
171   
172   agr_dict = dict_create ();
173   dict_set_label (agr_dict, dict_get_label (default_dict));
174   dict_set_documents (agr_dict, dict_get_documents (default_dict));
175   
176   lex_match_id ("AGGREGATE");
177
178   /* Read most of the subcommands. */
179   for (;;)
180     {
181       lex_match('/');
182       
183       if (lex_match_id ("OUTFILE"))
184         {
185           if (seen & 1)
186             {
187               destroy_sort_cases_pgm (sort);
188               dict_destroy (agr_dict);
189               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
190               return CMD_FAILURE;
191             }
192           seen |= 1;
193               
194           lex_match ('=');
195           if (lex_match ('*'))
196             outfile = NULL;
197           else 
198             {
199               outfile = fh_parse_file_handle ();
200               if (outfile == NULL)
201                 {
202                   destroy_sort_cases_pgm (sort);
203                   dict_destroy (agr_dict);
204                   return CMD_FAILURE;
205                 }
206             }
207         }
208       else if (lex_match_id ("MISSING"))
209         {
210           lex_match ('=');
211           if (!lex_match_id ("COLUMNWISE"))
212             {
213               destroy_sort_cases_pgm (sort);
214               dict_destroy (agr_dict);
215               lex_error (_("while expecting COLUMNWISE"));
216               return CMD_FAILURE;
217             }
218           missing = COLUMNWISE;
219         }
220       else if (lex_match_id ("DOCUMENT"))
221         seen |= 2;
222       else if (lex_match_id ("PRESORTED"))
223         seen |= 4;
224       else if (lex_match_id ("BREAK"))
225         {
226           if (seen & 8)
227             {
228               destroy_sort_cases_pgm (sort);
229               dict_destroy (agr_dict);
230               msg (SE, _("%s subcommand given multiple times."),"BREAK");
231               return CMD_FAILURE;
232             }
233           seen |= 8;
234
235           lex_match ('=');
236           sort = parse_sort ();
237           if (sort == NULL)
238             {
239               dict_destroy (agr_dict);
240               return CMD_FAILURE;
241             }
242           
243           {
244             int i;
245             
246             for (i = 0; i < sort->var_cnt; i++)
247               {
248                 struct variable *v;
249               
250                 v = dict_clone_var (agr_dict, sort->vars[i], sort->vars[i]->name);
251                 assert (v != NULL);
252               }
253           }
254         }
255       else break;
256     }
257
258   /* Check for proper syntax. */
259   if (!(seen & 8))
260     msg (SW, _("BREAK subcommand not specified."));
261       
262   /* Read in the aggregate functions. */
263   if (!parse_aggregate_functions ())
264     {
265       free_aggregate_functions ();
266       destroy_sort_cases_pgm (sort);
267       return CMD_FAILURE;
268     }
269
270   /* Delete documents. */
271   if (!(seen & 2))
272     dict_set_documents (agr_dict, NULL);
273
274   /* Cancel SPLIT FILE. */
275   dict_set_split_vars (agr_dict, NULL, 0);
276   
277 #if DEBUGGING
278   debug_print (seen);
279 #endif
280
281   /* Initialize. */
282   case_count = 0;
283   initialize_aggregate_info ();
284
285   /* How to implement all this... There are three important variables:
286      whether output is going to the active file (0) or a separate file
287      (1); whether the input data is presorted (0) or needs sorting
288      (1); whether there is a temporary transformation (1) or not (0).
289      The eight cases are as follows:
290
291      000 (0): Pass it through an aggregate transformation that
292      modifies the data.
293
294      001 (1): Cancel the temporary transformation and handle as 000.
295
296      010 (2): Set up a SORT CASES and aggregate the output, writing
297      the results to the active file.
298      
299      011 (3): Cancel the temporary transformation and handle as 010.
300
301      100 (4): Pass it through an aggregate transformation that doesn't
302      modify the data but merely writes it to the output file.
303
304      101 (5): Handled as 100.
305
306      110 (6): Set up a SORT CASES and capture the output, aggregate
307      it, write it to the output file without modifying the active
308      file.
309
310      111 (7): Handled as 110. */
311   
312   {
313     unsigned type = 0;
314
315     if (outfile != NULL)
316       type |= 4;
317     if (sort != NULL && (seen & 4) == 0)
318       type |= 2;
319     if (temporary)
320       type |= 1;
321
322     switch (type)
323       {
324       case 3:
325         cancel_temporary ();
326         /* fall through */
327       case 2:
328         sort_cases (sort, 0);
329         goto case0;
330           
331       case 1:
332         cancel_temporary ();
333         /* fall through */
334       case 0:
335       case0:
336         {
337           struct trns_header *t = xmalloc (sizeof *t);
338           t->proc = agr_00x_trns_proc;
339           t->free = NULL;
340           add_transformation (t);
341           
342           temporary = 2;
343           temp_dict = agr_dict;
344           temp_trns = n_trns;
345           
346           agr_dict = NULL;
347
348           procedure (NULL, NULL, agr_00x_end_func, NULL);
349           break;
350         }
351
352       case 4:
353       case 5:
354         {
355           if (!create_sysfile ())
356             goto lossage;
357           
358           {
359             struct trns_header *t = xmalloc (sizeof *t);
360             t->proc = agr_10x_trns_proc;
361             t->free = agr_10x_trns_free;
362             add_transformation (t);
363
364             procedure (NULL, NULL, agr_10x_end_func, NULL);
365           }
366           
367           break;
368         }
369           
370       case 6:
371       case 7:
372         sort_cases (sort, 1);
373         
374         if (!create_sysfile ())
375           goto lossage;
376         read_sort_output (sort, agr_11x_func, NULL);
377         
378         {
379           struct ccase *save_temp_case = temp_case;
380           temp_case = NULL;
381           agr_11x_func (NULL);
382           temp_case = save_temp_case;
383         }
384         
385         break;
386
387       default:
388         assert (0);
389       }
390   }
391   
392   free (buf64_1xx);
393   free (buf_1xx);
394   
395   /* Clean up. */
396   destroy_sort_cases_pgm (sort);
397   free_aggregate_functions ();
398   free (prev_case);
399   
400   return CMD_SUCCESS;
401
402 lossage:
403   /* Clean up. */
404   destroy_sort_cases_pgm (sort);
405   free_aggregate_functions ();
406   free (prev_case);
407
408   return CMD_FAILURE;
409 }
410
411 /* Create a system file for use in aggregation to an external file,
412    and allocate temporary buffers for writing out cases. */
413 static int
414 create_sysfile (void)
415 {
416   struct sfm_write_info w;
417   w.h = outfile;
418   w.dict = agr_dict;
419   w.compress = set_scompression;
420   if (!sfm_write_dictionary (&w))
421     {
422       free_aggregate_functions ();
423       destroy_sort_cases_pgm (sort);
424       dict_destroy (agr_dict);
425       return 0;
426     }
427     
428   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
429   buf_1xx = xmalloc (dict_get_case_size (agr_dict));
430
431   return 1;
432 }
433
434 /* Parse all the aggregate functions. */
435 static int
436 parse_aggregate_functions (void)
437 {
438   agr_first = agr_next = NULL;
439
440   /* Parse everything. */
441   for (;;)
442     {
443       char **dest;
444       char **dest_label;
445       int n_dest;
446
447       int include_missing;
448       const struct agr_func *function;
449       int func_index;
450
451       union value arg[2];
452
453       struct variable **src;
454       int n_src;
455
456       int i;
457
458       dest = NULL;
459       dest_label = NULL;
460       n_dest = 0;
461       src = NULL;
462       n_src = 0;
463       arg[0].c = NULL;
464       arg[1].c = NULL;
465
466       /* Parse the list of target variables. */
467       while (!lex_match ('='))
468         {
469           int n_dest_prev = n_dest;
470           
471           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
472             goto lossage;
473
474           /* Assign empty labels. */
475           {
476             int j;
477
478             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
479             for (j = n_dest_prev; j < n_dest; j++)
480               dest_label[j] = NULL;
481           }
482           
483           if (token == T_STRING)
484             {
485               ds_truncate (&tokstr, 120);
486               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
487               lex_get ();
488             }
489         }
490
491       /* Get the name of the aggregation function. */
492       if (token != T_ID)
493         {
494           lex_error (_("expecting aggregation function"));
495           goto lossage;
496         }
497
498       include_missing = 0;
499       if (tokid[strlen (tokid) - 1] == '.')
500         {
501           include_missing = 1;
502           tokid[strlen (tokid) - 1] = 0;
503         }
504       
505       for (function = agr_func_tab; function->name; function++)
506         if (!strcmp (function->name, tokid))
507           break;
508       if (NULL == function->name)
509         {
510           msg (SE, _("Unknown aggregation function %s."), tokid);
511           goto lossage;
512         }
513       func_index = function - agr_func_tab;
514       lex_get ();
515
516       /* Check for leading lparen. */
517       if (!lex_match ('('))
518         {
519           if (func_index == N)
520             func_index = N_NO_VARS;
521           else if (func_index == NU)
522             func_index = NU_NO_VARS;
523           else
524             {
525               lex_error (_("expecting `('"));
526               goto lossage;
527             }
528         } else {
529           /* Parse list of source variables. */
530           {
531             int pv_opts = PV_NO_SCRATCH;
532
533             if (func_index == SUM || func_index == MEAN || func_index == SD)
534               pv_opts |= PV_NUMERIC;
535             else if (function->n_args)
536               pv_opts |= PV_SAME_TYPE;
537
538             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
539               goto lossage;
540           }
541
542           /* Parse function arguments, for those functions that
543              require arguments. */
544           if (function->n_args != 0)
545             for (i = 0; i < function->n_args; i++)
546               {
547                 int type;
548             
549                 lex_match (',');
550                 if (token == T_STRING)
551                   {
552                     arg[i].c = xstrdup (ds_value (&tokstr));
553                     type = ALPHA;
554                   }
555                 else if (token == T_NUM)
556                   {
557                     arg[i].f = tokval;
558                     type = NUMERIC;
559                   } else {
560                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
561                     goto lossage;
562                   }
563             
564                 lex_get ();
565
566                 if (type != src[0]->type)
567                   {
568                     msg (SE, _("Arguments to %s must be of same type as "
569                                "source variables."),
570                          function->name);
571                     goto lossage;
572                   }
573               }
574
575           /* Trailing rparen. */
576           if (!lex_match(')'))
577             {
578               lex_error (_("expecting `)'"));
579               goto lossage;
580             }
581           
582           /* Now check that the number of source variables match the
583              number of target variables.  Do this here because if we
584              do it earlier then the user can get very misleading error
585              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
586              error message when a proper message would be more like
587              `unknown variable t'. */
588           if (n_src != n_dest)
589             {
590               msg (SE, _("Number of source variables (%d) does not match "
591                          "number of target variables (%d)."),
592                    n_src, n_dest);
593               goto lossage;
594             }
595         }
596         
597       /* Finally add these to the linked list of aggregation
598          variables. */
599       for (i = 0; i < n_dest; i++)
600         {
601           struct agr_var *v = xmalloc (sizeof *v);
602
603           /* Add variable to chain. */
604           if (agr_first)
605             agr_next = agr_next->next = v;
606           else
607             agr_first = agr_next = v;
608           agr_next->next = NULL;
609           
610           /* Create the target variable in the aggregate
611              dictionary. */
612           {
613             struct variable *destvar;
614             
615             agr_next->function = func_index;
616
617             if (src)
618               {
619                 int output_width;
620
621                 agr_next->src = src[i];
622                 
623                 if (src[i]->type == ALPHA)
624                   {
625                     agr_next->function |= FSTRING;
626                     agr_next->string = xmalloc (src[i]->width);
627                   }
628                 
629                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
630                   output_width = 0;
631                 else
632                   output_width = agr_next->src->width;
633
634                 if (function->alpha_type == ALPHA)
635                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
636                 else
637                   {
638                     destvar = dict_create_var (agr_dict, dest[i], output_width);
639                     if (output_width == 0)
640                       destvar->print = destvar->write = function->format;
641                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
642                         && (func_index == N || func_index == N_NO_VARS
643                             || func_index == NU || func_index == NU_NO_VARS))
644                       {
645                         struct fmt_spec f = {FMT_F, 8, 2};
646                       
647                         destvar->print = destvar->write = f;
648                       }
649                   }
650               } else {
651                 agr_next->src = NULL;
652                 destvar = dict_create_var (agr_dict, dest[i], 0);
653               }
654           
655             if (!destvar)
656               {
657                 msg (SE, _("Variable name %s is not unique within the "
658                            "aggregate file dictionary, which contains "
659                            "the aggregate variables and the break "
660                            "variables."),
661                      dest[i]);
662                 free (dest[i]);
663                 goto lossage;
664               }
665
666             free (dest[i]);
667             destvar->init = 0;
668             if (dest_label[i])
669               {
670                 destvar->label = dest_label[i];
671                 dest_label[i] = NULL;
672               }
673             else if (function->alpha_type == ALPHA)
674               destvar->print = destvar->write = function->format;
675
676             agr_next->dest = destvar;
677           }
678           
679           agr_next->include_missing = include_missing;
680
681           if (agr_next->src != NULL)
682             {
683               int j;
684
685               if (agr_next->src->type == NUMERIC)
686                 for (j = 0; j < function->n_args; j++)
687                   agr_next->arg[j].f = arg[j].f;
688               else
689                 for (j = 0; j < function->n_args; j++)
690                   agr_next->arg[j].c = xstrdup (arg[j].c);
691             }
692         }
693       
694       if (src != NULL && src[0]->type == ALPHA)
695         for (i = 0; i < function->n_args; i++)
696           {
697             free (arg[i].c);
698             arg[i].c = NULL;
699           }
700
701       free (src);
702       free (dest);
703       free (dest_label);
704
705       if (!lex_match ('/'))
706         {
707           if (token == '.')
708             return 1;
709
710           lex_error ("expecting end of command");
711           return 0;
712         }
713       continue;
714       
715     lossage:
716       for (i = 0; i < n_dest; i++)
717         {
718           free (dest[i]);
719           free (dest_label[i]);
720         }
721       free (dest);
722       free (dest_label);
723       free (arg[0].c);
724       free (arg[1].c);
725       if (src && n_src && src[0]->type == ALPHA)
726         for (i = 0; i < function->n_args; i++)
727           {
728             free(arg[i].c);
729             arg[i].c = NULL;
730           }
731       free (src);
732         
733       return 0;
734     }
735 }
736
737 /* Frees all the state for the AGGREGATE procedure. */
738 static void
739 free_aggregate_functions (void)
740 {
741   struct agr_var *iter, *next;
742
743   if (agr_dict)
744     dict_destroy (agr_dict);
745   for (iter = agr_first; iter; iter = next)
746     {
747       next = iter->next;
748
749       if (iter->function & FSTRING)
750         {
751           int n_args;
752           int i;
753
754           n_args = agr_func_tab[iter->function & FUNC].n_args;
755           for (i = 0; i < n_args; i++)
756             free (iter->arg[i].c);
757           free (iter->string);
758         }
759       free (iter);
760     }
761 }
762 \f
763 /* Execution. */
764
765 static void accumulate_aggregate_info (struct ccase *input);
766 static void dump_aggregate_info (struct ccase *output);
767
768 /* Processes a single case INPUT for aggregation.  If output is
769    warranted, it is written to case OUTPUT, which may be (but need not
770    be) an alias to INPUT.  Returns -1 when output is performed, -2
771    otherwise. */
772 /* The code in this function has an eerie similarity to
773    vfm.c:SPLIT_FILE_procfunc()... */
774 static int
775 aggregate_single_case (struct ccase *input, struct ccase *output)
776 {
777   /* The first case always begins a new break group.  We also need to
778      preserve the values of the case for later comparison. */
779   if (case_count++ == 0)
780     {
781       int n_elem = 0;
782       
783       {
784         int i;
785
786         for (i = 0; i < sort->var_cnt; i++)
787           n_elem += sort->vars[i]->nv;
788       }
789       
790       prev_case = xmalloc (sizeof *prev_case * n_elem);
791
792       /* Copy INPUT into prev_case. */
793       {
794         union value *iter = prev_case;
795         int i;
796
797         for (i = 0; i < sort->var_cnt; i++)
798           {
799             struct variable *v = sort->vars[i];
800             
801             if (v->type == NUMERIC)
802               (iter++)->f = input->data[v->fv].f;
803             else
804               {
805                 memcpy (iter->s, input->data[v->fv].s, v->width);
806                 iter += v->nv;
807               }
808           }
809       }
810             
811       accumulate_aggregate_info (input);
812         
813       return -2;
814     }
815       
816   /* Compare the value of each break variable to the values on the
817      previous case. */
818   {
819     union value *iter = prev_case;
820     int i;
821     
822     for (i = 0; i < sort->var_cnt; i++)
823       {
824         struct variable *v = sort->vars[i];
825       
826         switch (v->type)
827           {
828           case NUMERIC:
829             if (input->data[v->fv].f != iter->f)
830               goto not_equal;
831             iter++;
832             break;
833           case ALPHA:
834             if (memcmp (input->data[v->fv].s, iter->s, v->width))
835               goto not_equal;
836             iter += v->nv;
837             break;
838           default:
839             assert (0);
840           }
841       }
842   }
843
844   accumulate_aggregate_info (input);
845
846   return -2;
847   
848 not_equal:
849   /* The values of the break variable are different from the values on
850      the previous case.  That means that it's time to dump aggregate
851      info. */
852   dump_aggregate_info (output);
853   initialize_aggregate_info ();
854   accumulate_aggregate_info (input);
855
856   /* Copy INPUT into prev_case. */
857   {
858     union value *iter = prev_case;
859     int i;
860
861     for (i = 0; i < sort->var_cnt; i++)
862       {
863         struct variable *v = sort->vars[i];
864             
865         if (v->type == NUMERIC)
866           (iter++)->f = input->data[v->fv].f;
867         else
868           {
869             memcpy (iter->s, input->data[v->fv].s, v->width);
870             iter += v->nv;
871           }
872       }
873   }
874   
875   return -1;
876 }
877
878 /* Accumulates aggregation data from the case INPUT. */
879 static void 
880 accumulate_aggregate_info (struct ccase *input)
881 {
882   struct agr_var *iter;
883   double weight;
884
885   weight = dict_get_case_weight (default_dict, input);
886
887   for (iter = agr_first; iter; iter = iter->next)
888     if (iter->src)
889       {
890         union value *v = &input->data[iter->src->fv];
891
892         if ((!iter->include_missing && is_missing (v, iter->src))
893             || (iter->include_missing && iter->src->type == NUMERIC
894                 && v->f == SYSMIS))
895           {
896             switch (iter->function)
897               {
898               case NMISS:
899                 iter->dbl[0] += weight;
900                 break;
901               case NUMISS:
902                 iter->int1++;
903                 break;
904               }
905             iter->missing = 1;
906             continue;
907           }
908         
909         /* This is horrible.  There are too many possibilities. */
910         switch (iter->function)
911           {
912           case SUM:
913             iter->dbl[0] += v->f;
914             break;
915           case MEAN:
916             iter->dbl[0] += v->f * weight;
917             iter->dbl[1] += weight;
918             break;
919           case SD: 
920             {
921               double product = v->f * weight;
922               iter->dbl[0] += product;
923               iter->dbl[1] += product * v->f;
924               iter->dbl[2] += weight;
925               break; 
926             }
927           case MAX:
928             iter->dbl[0] = max (iter->dbl[0], v->f);
929             iter->int1 = 1;
930             break;
931           case MAX | FSTRING:
932             if (memcmp (iter->string, v->s, iter->src->width) < 0)
933               memcpy (iter->string, v->s, iter->src->width);
934             iter->int1 = 1;
935             break;
936           case MIN:
937             iter->dbl[0] = min (iter->dbl[0], v->f);
938             iter->int1 = 1;
939             break;
940           case MIN | FSTRING:
941             if (memcmp (iter->string, v->s, iter->src->width) > 0)
942               memcpy (iter->string, v->s, iter->src->width);
943             iter->int1 = 1;
944             break;
945           case FGT:
946           case PGT:
947             if (v->f > iter->arg[0].f)
948               iter->dbl[0] += weight;
949             iter->dbl[1] += weight;
950             break;
951           case FGT | FSTRING:
952           case PGT | FSTRING:
953             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
954               iter->dbl[0] += weight;
955             iter->dbl[1] += weight;
956             break;
957           case FLT:
958           case PLT:
959             if (v->f < iter->arg[0].f)
960               iter->dbl[0] += weight;
961             iter->dbl[1] += weight;
962             break;
963           case FLT | FSTRING:
964           case PLT | FSTRING:
965             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
966               iter->dbl[0] += weight;
967             iter->dbl[1] += weight;
968             break;
969           case FIN:
970           case PIN:
971             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
972               iter->dbl[0] += weight;
973             iter->dbl[1] += weight;
974             break;
975           case FIN | FSTRING:
976           case PIN | FSTRING:
977             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
978                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
979               iter->dbl[0] += weight;
980             iter->dbl[1] += weight;
981             break;
982           case FOUT:
983           case POUT:
984             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
985               iter->dbl[0] += weight;
986             iter->dbl[1] += weight;
987             break;
988           case FOUT | FSTRING:
989           case POUT | FSTRING:
990             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
991                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
992               iter->dbl[0] += weight;
993             iter->dbl[1] += weight;
994             break;
995           case N:
996             iter->dbl[0] += weight;
997             break;
998           case NU:
999             iter->int1++;
1000             break;
1001           case FIRST:
1002             if (iter->int1 == 0)
1003               {
1004                 iter->dbl[0] = v->f;
1005                 iter->int1 = 1;
1006               }
1007             break;
1008           case FIRST | FSTRING:
1009             if (iter->int1 == 0)
1010               {
1011                 memcpy (iter->string, v->s, iter->src->width);
1012                 iter->int1 = 1;
1013               }
1014             break;
1015           case LAST:
1016             iter->dbl[0] = v->f;
1017             iter->int1 = 1;
1018             break;
1019           case LAST | FSTRING:
1020             memcpy (iter->string, v->s, iter->src->width);
1021             iter->int1 = 1;
1022             break;
1023           default:
1024             assert (0);
1025           }
1026     } else {
1027       switch (iter->function)
1028         {
1029         case N_NO_VARS:
1030           iter->dbl[0] += weight;
1031           break;
1032         case NU_NO_VARS:
1033           iter->int1++;
1034           break;
1035         default:
1036           assert (0);
1037         }
1038     }
1039 }
1040
1041 /* We've come to a record that differs from the previous in one or
1042    more of the break variables.  Make an output record from the
1043    accumulated statistics in the OUTPUT case. */
1044 static void 
1045 dump_aggregate_info (struct ccase *output)
1046 {
1047   debug_printf (("(dumping "));
1048   
1049   {
1050     int n_elem = 0;
1051     
1052     {
1053       int i;
1054
1055       for (i = 0; i < sort->var_cnt; i++)
1056         n_elem += sort->vars[i]->nv;
1057     }
1058     debug_printf (("n_elem=%d:", n_elem));
1059     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1060   }
1061   
1062   {
1063     struct agr_var *i;
1064   
1065     for (i = agr_first; i; i = i->next)
1066       {
1067         union value *v = &output->data[i->dest->fv];
1068
1069         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1070
1071         if (missing == COLUMNWISE && i->missing != 0
1072             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1073             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1074           {
1075             if (i->function & FSTRING)
1076               memset (v->s, ' ', i->dest->width);
1077             else
1078               v->f = SYSMIS;
1079             continue;
1080           }
1081         
1082         switch (i->function)
1083           {
1084           case SUM:
1085             v->f = i->dbl[0];
1086             break;
1087           case MEAN:
1088             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1089             break;
1090           case SD:
1091             v->f = ((i->dbl[2] > 1.0)
1092                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1093                     : SYSMIS);
1094             break;
1095           case MAX:
1096           case MIN:
1097             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1098             break;
1099           case MAX | FSTRING:
1100           case MIN | FSTRING:
1101             if (i->int1)
1102               memcpy (v->s, i->string, i->dest->width);
1103             else
1104               memset (v->s, ' ', i->dest->width);
1105             break;
1106           case FGT | FSTRING:
1107           case FLT | FSTRING:
1108           case FIN | FSTRING:
1109           case FOUT | FSTRING:
1110             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1111             break;
1112           case FGT:
1113           case FLT:
1114           case FIN:
1115           case FOUT:
1116             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1117             break;
1118           case PGT:
1119           case PGT | FSTRING:
1120           case PLT:
1121           case PLT | FSTRING:
1122           case PIN:
1123           case PIN | FSTRING:
1124           case POUT:
1125           case POUT | FSTRING:
1126             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1127             break;
1128           case N:
1129             v->f = i->dbl[0];
1130             break;
1131           case NU:
1132             v->f = i->int1;
1133             break;
1134           case FIRST:
1135           case LAST:
1136             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1137             break;
1138           case FIRST | FSTRING:
1139           case LAST | FSTRING:
1140             if (i->int1)
1141               memcpy (v->s, i->string, i->dest->width);
1142             else
1143               memset (v->s, ' ', i->dest->width);
1144             break;
1145           case N_NO_VARS:
1146             v->f = i->dbl[0];
1147             break;
1148           case NU_NO_VARS:
1149             v->f = i->int1;
1150             break;
1151           case NMISS:
1152             v->f = i->dbl[0];
1153             break;
1154           case NUMISS:
1155             v->f = i->int1;
1156             break;
1157           default:
1158             assert (0);
1159           }
1160       }
1161   }
1162   debug_printf ((") "));
1163 }
1164
1165 /* Resets the state for all the aggregate functions. */
1166 static void
1167 initialize_aggregate_info (void)
1168 {
1169   struct agr_var *iter;
1170
1171   for (iter = agr_first; iter; iter = iter->next)
1172     {
1173       iter->missing = 0;
1174       switch (iter->function)
1175         {
1176         case MIN:
1177           iter->dbl[0] = DBL_MAX;
1178           break;
1179         case MIN | FSTRING:
1180           memset (iter->string, 255, iter->src->width);
1181           break;
1182         case MAX:
1183           iter->dbl[0] = -DBL_MAX;
1184           break;
1185         case MAX | FSTRING:
1186           memset (iter->string, 0, iter->src->width);
1187           break;
1188         default:
1189           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1190           iter->int1 = iter->int2 = 0;
1191           break;
1192         }
1193     }
1194 }
1195 \f
1196 /* Aggregate each case as it comes through.  Cases which aren't needed
1197    are dropped. */
1198 static int
1199 agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c)
1200 {
1201   int code = aggregate_single_case (c, compaction_case);
1202   debug_printf (("%d ", code));
1203   return code;
1204 }
1205
1206 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1207    write() method here because end_func is called so soon after all
1208    the cases have been output; very little has been cleaned up at this
1209    point. */
1210 static void
1211 agr_00x_end_func (void *aux UNUSED)
1212 {
1213   /* Ensure that info for the last break group gets written to the
1214      active file. */
1215   dump_aggregate_info (compaction_case);
1216   vfm_sink_info.ncases++;
1217   vfm_sink->class->write (vfm_sink, temp_case);
1218 }
1219
1220 /* Transform the aggregate case buf_1xx, in internal format, to system
1221    file format, in buf64_1xx, and write the resultant case to the
1222    system file. */
1223 static void
1224 write_case_to_sfm (void)
1225 {
1226   flt64 *p = buf64_1xx;
1227   int i;
1228
1229   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1230     {
1231       struct variable *v = dict_get_var (agr_dict, i);
1232       
1233       if (v->type == NUMERIC)
1234         {
1235           double src = buf_1xx->data[v->fv].f;
1236           if (src == SYSMIS)
1237             *p++ = -FLT64_MAX;
1238           else
1239             *p++ = src;
1240         }
1241       else
1242         {
1243           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1244           memset (&((char *) p)[v->width], ' ',
1245                   REM_RND_UP (v->width, sizeof (flt64)));
1246           p += DIV_RND_UP (v->width, sizeof (flt64));
1247         }
1248     }
1249
1250   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1251 }
1252
1253 /* Aggregate the current case and output it if we passed a
1254    breakpoint. */
1255 static int
1256 agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c)
1257 {
1258   int code = aggregate_single_case (c, buf_1xx);
1259
1260   assert (code == -2 || code == -1);
1261   if (code == -1)
1262     write_case_to_sfm ();
1263   return -1;
1264 }
1265
1266 /* Close the system file now that we're done with it.  */
1267 static void
1268 agr_10x_trns_free (struct trns_header *h UNUSED)
1269 {
1270   fh_close_handle (outfile);
1271 }
1272
1273 /* Ensure that info for the last break group gets written to the
1274    system file. */
1275 static void
1276 agr_10x_end_func (void *aux UNUSED)
1277 {
1278   dump_aggregate_info (buf_1xx);
1279   write_case_to_sfm ();
1280 }
1281
1282 /* When called with temp_case non-NULL (the normal case), runs the
1283    case through the aggregater and outputs it to the system file if
1284    appropriate.  If temp_case is NULL, finishes up writing the last
1285    case if necessary. */
1286 static int
1287 agr_11x_func (write_case_data wc_data UNUSED)
1288 {
1289   if (temp_case != NULL)
1290     {
1291       int code = aggregate_single_case (temp_case, buf_1xx);
1292       
1293       assert (code == -2 || code == -1);
1294       if (code == -1)
1295         write_case_to_sfm ();
1296     }
1297   else
1298     {
1299       if (case_count)
1300         {
1301           dump_aggregate_info (buf_1xx);
1302           write_case_to_sfm ();
1303         }
1304       fh_close_handle (outfile);
1305     }
1306   return 1;
1307 }
1308 \f
1309 /* Debugging. */
1310 #if DEBUGGING
1311 /* Print out useful debugging information. */
1312 static void
1313 debug_print (int flags)
1314 {
1315   printf ("AGGREGATE\n /OUTFILE=%s\n",
1316           outfile ? fh_handle_filename (outfile) : "*");
1317
1318   if (missing == COLUMNWISE)
1319     puts (" /MISSING=COLUMNWISE");
1320
1321   if (flags & 2)
1322     puts (" /DOCUMENT");
1323   if (flags & 4)
1324     puts (" /PRESORTED");
1325   
1326   {
1327     int i;
1328
1329     printf (" /BREAK=");
1330     for (i = 0; i < sort->var_cnt; i++)
1331       printf ("%s(%c) ", sort->vars[i]->name,
1332               sort->vars[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1333     putc ('\n', stdout);
1334   }
1335   
1336   {
1337     struct agr_var *iter;
1338     
1339     for (iter = agr_first; iter; iter = iter->next)
1340       {
1341         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1342         
1343         printf (" /%s", iter->dest->name);
1344         if (iter->dest->label)
1345           printf ("'%s'", iter->dest->label);
1346         printf ("=%s(%s", f->name, iter->src->name);
1347         if (f->n_args)
1348           {
1349             int i;
1350             
1351             for (i = 0; i < f->n_args; i++)
1352               {
1353                 putc (',', stdout);
1354                 if (iter->src->type == NUMERIC)
1355                   printf ("%g", iter->arg[i].f);
1356                 else
1357                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1358               }
1359           }
1360         printf (")\n");
1361       }
1362   }
1363 }
1364
1365 #endif /* DEBUGGING */