Eliminate temp_case, and a few other cleanups.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #include "debug-print.h"
40
41 /* Specifies how to make an aggregate variable. */
42 struct agr_var
43   {
44     struct agr_var *next;               /* Next in list. */
45
46     /* Collected during parsing. */
47     struct variable *src;       /* Source variable. */
48     struct variable *dest;      /* Target variable. */
49     int function;               /* Function. */
50     int include_missing;        /* 1=Include user-missing values. */
51     union value arg[2];         /* Arguments. */
52
53     /* Accumulated during AGGREGATE execution. */
54     double dbl[3];
55     int int1, int2;
56     char *string;
57     int missing;
58   };
59
60 /* Aggregation functions. */
61 enum
62   {
63     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
64     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
65     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
66     FUNC = 0x1f, /* Function mask. */
67     FSTRING = 1<<5, /* String function bit. */
68   };
69
70 /* Attributes of an aggregation function. */
71 struct agr_func
72   {
73     const char *name;           /* Aggregation function name. */
74     int n_args;                 /* Number of arguments. */
75     int alpha_type;             /* When given ALPHA arguments, output type. */
76     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
77   };
78
79 /* Attributes of aggregation functions. */
80 static const struct agr_func agr_func_tab[] = 
81   {
82     {"<NONE>",  0, -1,      {0, 0, 0}},
83     {"SUM",     0, -1,      {FMT_F, 8, 2}},
84     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
85     {"SD",      0, -1,      {FMT_F, 8, 2}},
86     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
87     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
88     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
89     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
90     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
91     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
92     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
94     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
95     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
96     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
100     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
101     {"LAST",    0, ALPHA,   {-1, -1, -1}},
102     {NULL,      0, -1,      {-1, -1, -1}},
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
105   };
106
107 /* Output file, or NULL for the active file. */
108 static struct file_handle *outfile;
109
110 /* Missing value types. */
111 enum
112   {
113     ITEMWISE,           /* Missing values item by item. */
114     COLUMNWISE          /* Missing values column by column. */
115   };
116
117 /* ITEMWISE or COLUMNWISE. */
118 static int missing;
119
120 /* Sort program. */
121 static struct sort_cases_pgm *sort;
122
123 /* Aggregate variables. */
124 static struct agr_var *agr_first, *agr_next;
125
126 /* Aggregate dictionary. */
127 static struct dictionary *agr_dict;
128
129 /* Number of cases passed through aggregation. */
130 static int case_count;
131
132 /* Last values of the break variables. */
133 static union value *prev_case;
134
135 /* Buffers for use by the 10x transformation. */
136 static flt64 *buf64_1xx;
137 static struct ccase *buf_1xx;
138
139 static void initialize_aggregate_info (void);
140
141 /* Prototypes. */
142 static int parse_aggregate_functions (void);
143 static void free_aggregate_functions (void);
144 static int aggregate_single_case (const struct ccase *input,
145                                   struct ccase *output);
146 static int create_sysfile (void);
147
148 static trns_proc_func agr_00x_trns_proc, agr_10x_trns_proc;
149 static trns_free_func agr_10x_trns_free;
150 static void agr_00x_end_func (void *aux);
151 static void agr_10x_end_func (void *);
152 static read_sort_output_func agr_11x_read;
153 static void agr_11x_finish (void);
154
155 #if DEBUGGING
156 static void debug_print (int flags);
157 #endif
158 \f
159 /* Parsing. */
160
161 /* Parses and executes the AGGREGATE procedure. */
162 int
163 cmd_aggregate (void)
164 {
165   /* Have we seen these subcommands? */
166   unsigned seen = 0;
167
168   outfile = NULL;
169   missing = ITEMWISE;
170   sort = NULL;
171   prev_case = NULL;
172   
173   agr_dict = dict_create ();
174   dict_set_label (agr_dict, dict_get_label (default_dict));
175   dict_set_documents (agr_dict, dict_get_documents (default_dict));
176   
177   lex_match_id ("AGGREGATE");
178
179   /* Read most of the subcommands. */
180   for (;;)
181     {
182       lex_match('/');
183       
184       if (lex_match_id ("OUTFILE"))
185         {
186           if (seen & 1)
187             {
188               destroy_sort_cases_pgm (sort);
189               dict_destroy (agr_dict);
190               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
191               return CMD_FAILURE;
192             }
193           seen |= 1;
194               
195           lex_match ('=');
196           if (lex_match ('*'))
197             outfile = NULL;
198           else 
199             {
200               outfile = fh_parse_file_handle ();
201               if (outfile == NULL)
202                 {
203                   destroy_sort_cases_pgm (sort);
204                   dict_destroy (agr_dict);
205                   return CMD_FAILURE;
206                 }
207             }
208         }
209       else if (lex_match_id ("MISSING"))
210         {
211           lex_match ('=');
212           if (!lex_match_id ("COLUMNWISE"))
213             {
214               destroy_sort_cases_pgm (sort);
215               dict_destroy (agr_dict);
216               lex_error (_("while expecting COLUMNWISE"));
217               return CMD_FAILURE;
218             }
219           missing = COLUMNWISE;
220         }
221       else if (lex_match_id ("DOCUMENT"))
222         seen |= 2;
223       else if (lex_match_id ("PRESORTED"))
224         seen |= 4;
225       else if (lex_match_id ("BREAK"))
226         {
227           if (seen & 8)
228             {
229               destroy_sort_cases_pgm (sort);
230               dict_destroy (agr_dict);
231               msg (SE, _("%s subcommand given multiple times."),"BREAK");
232               return CMD_FAILURE;
233             }
234           seen |= 8;
235
236           lex_match ('=');
237           sort = parse_sort ();
238           if (sort == NULL)
239             {
240               dict_destroy (agr_dict);
241               return CMD_FAILURE;
242             }
243           
244           {
245             int i;
246             
247             for (i = 0; i < sort->var_cnt; i++)
248               {
249                 struct variable *v;
250               
251                 v = dict_clone_var (agr_dict, sort->vars[i], sort->vars[i]->name);
252                 assert (v != NULL);
253               }
254           }
255         }
256       else break;
257     }
258
259   /* Check for proper syntax. */
260   if (!(seen & 8))
261     msg (SW, _("BREAK subcommand not specified."));
262       
263   /* Read in the aggregate functions. */
264   if (!parse_aggregate_functions ())
265     {
266       free_aggregate_functions ();
267       destroy_sort_cases_pgm (sort);
268       return CMD_FAILURE;
269     }
270
271   /* Delete documents. */
272   if (!(seen & 2))
273     dict_set_documents (agr_dict, NULL);
274
275   /* Cancel SPLIT FILE. */
276   dict_set_split_vars (agr_dict, NULL, 0);
277   
278 #if DEBUGGING
279   debug_print (seen);
280 #endif
281
282   /* Initialize. */
283   case_count = 0;
284   initialize_aggregate_info ();
285
286   /* How to implement all this... There are three important variables:
287      whether output is going to the active file (0) or a separate file
288      (1); whether the input data is presorted (0) or needs sorting
289      (1); whether there is a temporary transformation (1) or not (0).
290      The eight cases are as follows:
291
292      000 (0): Pass it through an aggregate transformation that
293      modifies the data.
294
295      001 (1): Cancel the temporary transformation and handle as 000.
296
297      010 (2): Set up a SORT CASES and aggregate the output, writing
298      the results to the active file.
299      
300      011 (3): Cancel the temporary transformation and handle as 010.
301
302      100 (4): Pass it through an aggregate transformation that doesn't
303      modify the data but merely writes it to the output file.
304
305      101 (5): Handled as 100.
306
307      110 (6): Set up a SORT CASES and capture the output, aggregate
308      it, write it to the output file without modifying the active
309      file.
310
311      111 (7): Handled as 110. */
312   
313   {
314     unsigned type = 0;
315
316     if (outfile != NULL)
317       type |= 4;
318     if (sort != NULL && (seen & 4) == 0)
319       type |= 2;
320     if (temporary)
321       type |= 1;
322
323     switch (type)
324       {
325       case 3:
326         cancel_temporary ();
327         /* fall through */
328       case 2:
329         sort_cases (sort, 0);
330         goto case0;
331           
332       case 1:
333         cancel_temporary ();
334         /* fall through */
335       case 0:
336       case0:
337         {
338           struct trns_header *t = xmalloc (sizeof *t);
339           t->proc = agr_00x_trns_proc;
340           t->free = NULL;
341           add_transformation (t);
342           
343           temporary = 2;
344           temp_dict = agr_dict;
345           temp_trns = n_trns;
346           
347           agr_dict = NULL;
348
349           procedure (NULL, NULL, agr_00x_end_func, NULL);
350           break;
351         }
352
353       case 4:
354       case 5:
355         {
356           if (!create_sysfile ())
357             goto lossage;
358           
359           {
360             struct trns_header *t = xmalloc (sizeof *t);
361             t->proc = agr_10x_trns_proc;
362             t->free = agr_10x_trns_free;
363             add_transformation (t);
364
365             procedure (NULL, NULL, agr_10x_end_func, NULL);
366           }
367           
368           break;
369         }
370           
371       case 6:
372       case 7: 
373         sort_cases (sort, 1);
374         
375         if (!create_sysfile ())
376           goto lossage;
377         read_sort_output (sort, agr_11x_read, NULL);
378         agr_11x_finish ();
379
380         break;
381
382       default:
383         assert (0);
384       }
385   }
386   
387   free (buf64_1xx);
388   free (buf_1xx);
389   
390   /* Clean up. */
391   destroy_sort_cases_pgm (sort);
392   free_aggregate_functions ();
393   free (prev_case);
394   
395   return CMD_SUCCESS;
396
397 lossage:
398   /* Clean up. */
399   destroy_sort_cases_pgm (sort);
400   free_aggregate_functions ();
401   free (prev_case);
402
403   return CMD_FAILURE;
404 }
405
406 /* Create a system file for use in aggregation to an external file,
407    and allocate temporary buffers for writing out cases. */
408 static int
409 create_sysfile (void)
410 {
411   struct sfm_write_info w;
412   w.h = outfile;
413   w.dict = agr_dict;
414   w.compress = set_scompression;
415   if (!sfm_write_dictionary (&w))
416     {
417       free_aggregate_functions ();
418       destroy_sort_cases_pgm (sort);
419       dict_destroy (agr_dict);
420       return 0;
421     }
422     
423   buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
424   buf_1xx = xmalloc (dict_get_case_size (agr_dict));
425
426   return 1;
427 }
428
429 /* Parse all the aggregate functions. */
430 static int
431 parse_aggregate_functions (void)
432 {
433   agr_first = agr_next = NULL;
434
435   /* Parse everything. */
436   for (;;)
437     {
438       char **dest;
439       char **dest_label;
440       int n_dest;
441
442       int include_missing;
443       const struct agr_func *function;
444       int func_index;
445
446       union value arg[2];
447
448       struct variable **src;
449       int n_src;
450
451       int i;
452
453       dest = NULL;
454       dest_label = NULL;
455       n_dest = 0;
456       src = NULL;
457       function = NULL;
458       n_src = 0;
459       arg[0].c = NULL;
460       arg[1].c = NULL;
461
462       /* Parse the list of target variables. */
463       while (!lex_match ('='))
464         {
465           int n_dest_prev = n_dest;
466           
467           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
468             goto lossage;
469
470           /* Assign empty labels. */
471           {
472             int j;
473
474             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
475             for (j = n_dest_prev; j < n_dest; j++)
476               dest_label[j] = NULL;
477           }
478           
479           if (token == T_STRING)
480             {
481               ds_truncate (&tokstr, 120);
482               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
483               lex_get ();
484             }
485         }
486
487       /* Get the name of the aggregation function. */
488       if (token != T_ID)
489         {
490           lex_error (_("expecting aggregation function"));
491           goto lossage;
492         }
493
494       include_missing = 0;
495       if (tokid[strlen (tokid) - 1] == '.')
496         {
497           include_missing = 1;
498           tokid[strlen (tokid) - 1] = 0;
499         }
500       
501       for (function = agr_func_tab; function->name; function++)
502         if (!strcmp (function->name, tokid))
503           break;
504       if (NULL == function->name)
505         {
506           msg (SE, _("Unknown aggregation function %s."), tokid);
507           goto lossage;
508         }
509       func_index = function - agr_func_tab;
510       lex_get ();
511
512       /* Check for leading lparen. */
513       if (!lex_match ('('))
514         {
515           if (func_index == N)
516             func_index = N_NO_VARS;
517           else if (func_index == NU)
518             func_index = NU_NO_VARS;
519           else
520             {
521               lex_error (_("expecting `('"));
522               goto lossage;
523             }
524         } else {
525           /* Parse list of source variables. */
526           {
527             int pv_opts = PV_NO_SCRATCH;
528
529             if (func_index == SUM || func_index == MEAN || func_index == SD)
530               pv_opts |= PV_NUMERIC;
531             else if (function->n_args)
532               pv_opts |= PV_SAME_TYPE;
533
534             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
535               goto lossage;
536           }
537
538           /* Parse function arguments, for those functions that
539              require arguments. */
540           if (function->n_args != 0)
541             for (i = 0; i < function->n_args; i++)
542               {
543                 int type;
544             
545                 lex_match (',');
546                 if (token == T_STRING)
547                   {
548                     arg[i].c = xstrdup (ds_value (&tokstr));
549                     type = ALPHA;
550                   }
551                 else if (token == T_NUM)
552                   {
553                     arg[i].f = tokval;
554                     type = NUMERIC;
555                   } else {
556                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
557                     goto lossage;
558                   }
559             
560                 lex_get ();
561
562                 if (type != src[0]->type)
563                   {
564                     msg (SE, _("Arguments to %s must be of same type as "
565                                "source variables."),
566                          function->name);
567                     goto lossage;
568                   }
569               }
570
571           /* Trailing rparen. */
572           if (!lex_match(')'))
573             {
574               lex_error (_("expecting `)'"));
575               goto lossage;
576             }
577           
578           /* Now check that the number of source variables match the
579              number of target variables.  Do this here because if we
580              do it earlier then the user can get very misleading error
581              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
582              error message when a proper message would be more like
583              `unknown variable t'. */
584           if (n_src != n_dest)
585             {
586               msg (SE, _("Number of source variables (%d) does not match "
587                          "number of target variables (%d)."),
588                    n_src, n_dest);
589               goto lossage;
590             }
591         }
592         
593       /* Finally add these to the linked list of aggregation
594          variables. */
595       for (i = 0; i < n_dest; i++)
596         {
597           struct agr_var *v = xmalloc (sizeof *v);
598
599           /* Add variable to chain. */
600           if (agr_first)
601             agr_next = agr_next->next = v;
602           else
603             agr_first = agr_next = v;
604           agr_next->next = NULL;
605           
606           /* Create the target variable in the aggregate
607              dictionary. */
608           {
609             struct variable *destvar;
610             
611             agr_next->function = func_index;
612
613             if (src)
614               {
615                 int output_width;
616
617                 agr_next->src = src[i];
618                 
619                 if (src[i]->type == ALPHA)
620                   {
621                     agr_next->function |= FSTRING;
622                     agr_next->string = xmalloc (src[i]->width);
623                   }
624                 
625                 if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
626                   output_width = 0;
627                 else
628                   output_width = agr_next->src->width;
629
630                 if (function->alpha_type == ALPHA)
631                   destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
632                 else
633                   {
634                     destvar = dict_create_var (agr_dict, dest[i], output_width);
635                     if (output_width == 0)
636                       destvar->print = destvar->write = function->format;
637                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
638                         && (func_index == N || func_index == N_NO_VARS
639                             || func_index == NU || func_index == NU_NO_VARS))
640                       {
641                         struct fmt_spec f = {FMT_F, 8, 2};
642                       
643                         destvar->print = destvar->write = f;
644                       }
645                   }
646               } else {
647                 agr_next->src = NULL;
648                 destvar = dict_create_var (agr_dict, dest[i], 0);
649               }
650           
651             if (!destvar)
652               {
653                 msg (SE, _("Variable name %s is not unique within the "
654                            "aggregate file dictionary, which contains "
655                            "the aggregate variables and the break "
656                            "variables."),
657                      dest[i]);
658                 free (dest[i]);
659                 goto lossage;
660               }
661
662             free (dest[i]);
663             destvar->init = 0;
664             if (dest_label[i])
665               {
666                 destvar->label = dest_label[i];
667                 dest_label[i] = NULL;
668               }
669             else if (function->alpha_type == ALPHA)
670               destvar->print = destvar->write = function->format;
671
672             agr_next->dest = destvar;
673           }
674           
675           agr_next->include_missing = include_missing;
676
677           if (agr_next->src != NULL)
678             {
679               int j;
680
681               if (agr_next->src->type == NUMERIC)
682                 for (j = 0; j < function->n_args; j++)
683                   agr_next->arg[j].f = arg[j].f;
684               else
685                 for (j = 0; j < function->n_args; j++)
686                   agr_next->arg[j].c = xstrdup (arg[j].c);
687             }
688         }
689       
690       if (src != NULL && src[0]->type == ALPHA)
691         for (i = 0; i < function->n_args; i++)
692           {
693             free (arg[i].c);
694             arg[i].c = NULL;
695           }
696
697       free (src);
698       free (dest);
699       free (dest_label);
700
701       if (!lex_match ('/'))
702         {
703           if (token == '.')
704             return 1;
705
706           lex_error ("expecting end of command");
707           return 0;
708         }
709       continue;
710       
711     lossage:
712       for (i = 0; i < n_dest; i++)
713         {
714           free (dest[i]);
715           free (dest_label[i]);
716         }
717       free (dest);
718       free (dest_label);
719       free (arg[0].c);
720       free (arg[1].c);
721       if (src && n_src && src[0]->type == ALPHA)
722         for (i = 0; i < function->n_args; i++)
723           {
724             free(arg[i].c);
725             arg[i].c = NULL;
726           }
727       free (src);
728         
729       return 0;
730     }
731 }
732
733 /* Frees all the state for the AGGREGATE procedure. */
734 static void
735 free_aggregate_functions (void)
736 {
737   struct agr_var *iter, *next;
738
739   if (agr_dict)
740     dict_destroy (agr_dict);
741   for (iter = agr_first; iter; iter = next)
742     {
743       next = iter->next;
744
745       if (iter->function & FSTRING)
746         {
747           int n_args;
748           int i;
749
750           n_args = agr_func_tab[iter->function & FUNC].n_args;
751           for (i = 0; i < n_args; i++)
752             free (iter->arg[i].c);
753           free (iter->string);
754         }
755       free (iter);
756     }
757 }
758 \f
759 /* Execution. */
760
761 static void accumulate_aggregate_info (const struct ccase *input);
762 static void dump_aggregate_info (struct ccase *output);
763
764 /* Processes a single case INPUT for aggregation.  If output is
765    warranted, it is written to case OUTPUT, which may be (but need not
766    be) an alias to INPUT.  Returns -1 when output is performed, -2
767    otherwise. */
768 /* The code in this function has an eerie similarity to
769    vfm.c:SPLIT_FILE_procfunc()... */
770 static int
771 aggregate_single_case (const struct ccase *input, struct ccase *output)
772 {
773   /* The first case always begins a new break group.  We also need to
774      preserve the values of the case for later comparison. */
775   if (case_count++ == 0)
776     {
777       int n_elem = 0;
778       
779       {
780         int i;
781
782         for (i = 0; i < sort->var_cnt; i++)
783           n_elem += sort->vars[i]->nv;
784       }
785       
786       prev_case = xmalloc (sizeof *prev_case * n_elem);
787
788       /* Copy INPUT into prev_case. */
789       {
790         union value *iter = prev_case;
791         int i;
792
793         for (i = 0; i < sort->var_cnt; i++)
794           {
795             struct variable *v = sort->vars[i];
796             
797             if (v->type == NUMERIC)
798               (iter++)->f = input->data[v->fv].f;
799             else
800               {
801                 memcpy (iter->s, input->data[v->fv].s, v->width);
802                 iter += v->nv;
803               }
804           }
805       }
806             
807       accumulate_aggregate_info (input);
808         
809       return -2;
810     }
811       
812   /* Compare the value of each break variable to the values on the
813      previous case. */
814   {
815     union value *iter = prev_case;
816     int i;
817     
818     for (i = 0; i < sort->var_cnt; i++)
819       {
820         struct variable *v = sort->vars[i];
821       
822         switch (v->type)
823           {
824           case NUMERIC:
825             if (input->data[v->fv].f != iter->f)
826               goto not_equal;
827             iter++;
828             break;
829           case ALPHA:
830             if (memcmp (input->data[v->fv].s, iter->s, v->width))
831               goto not_equal;
832             iter += v->nv;
833             break;
834           default:
835             assert (0);
836           }
837       }
838   }
839
840   accumulate_aggregate_info (input);
841
842   return -2;
843   
844 not_equal:
845   /* The values of the break variable are different from the values on
846      the previous case.  That means that it's time to dump aggregate
847      info. */
848   dump_aggregate_info (output);
849   initialize_aggregate_info ();
850   accumulate_aggregate_info (input);
851
852   /* Copy INPUT into prev_case. */
853   {
854     union value *iter = prev_case;
855     int i;
856
857     for (i = 0; i < sort->var_cnt; i++)
858       {
859         struct variable *v = sort->vars[i];
860             
861         if (v->type == NUMERIC)
862           (iter++)->f = input->data[v->fv].f;
863         else
864           {
865             memcpy (iter->s, input->data[v->fv].s, v->width);
866             iter += v->nv;
867           }
868       }
869   }
870   
871   return -1;
872 }
873
874 /* Accumulates aggregation data from the case INPUT. */
875 static void 
876 accumulate_aggregate_info (const struct ccase *input)
877 {
878   struct agr_var *iter;
879   double weight;
880
881   weight = dict_get_case_weight (default_dict, input);
882
883   for (iter = agr_first; iter; iter = iter->next)
884     if (iter->src)
885       {
886         const union value *v = &input->data[iter->src->fv];
887
888         if ((!iter->include_missing && is_missing (v, iter->src))
889             || (iter->include_missing && iter->src->type == NUMERIC
890                 && v->f == SYSMIS))
891           {
892             switch (iter->function)
893               {
894               case NMISS:
895                 iter->dbl[0] += weight;
896                 break;
897               case NUMISS:
898                 iter->int1++;
899                 break;
900               }
901             iter->missing = 1;
902             continue;
903           }
904         
905         /* This is horrible.  There are too many possibilities. */
906         switch (iter->function)
907           {
908           case SUM:
909             iter->dbl[0] += v->f;
910             break;
911           case MEAN:
912             iter->dbl[0] += v->f * weight;
913             iter->dbl[1] += weight;
914             break;
915           case SD: 
916             {
917               double product = v->f * weight;
918               iter->dbl[0] += product;
919               iter->dbl[1] += product * v->f;
920               iter->dbl[2] += weight;
921               break; 
922             }
923           case MAX:
924             iter->dbl[0] = max (iter->dbl[0], v->f);
925             iter->int1 = 1;
926             break;
927           case MAX | FSTRING:
928             if (memcmp (iter->string, v->s, iter->src->width) < 0)
929               memcpy (iter->string, v->s, iter->src->width);
930             iter->int1 = 1;
931             break;
932           case MIN:
933             iter->dbl[0] = min (iter->dbl[0], v->f);
934             iter->int1 = 1;
935             break;
936           case MIN | FSTRING:
937             if (memcmp (iter->string, v->s, iter->src->width) > 0)
938               memcpy (iter->string, v->s, iter->src->width);
939             iter->int1 = 1;
940             break;
941           case FGT:
942           case PGT:
943             if (v->f > iter->arg[0].f)
944               iter->dbl[0] += weight;
945             iter->dbl[1] += weight;
946             break;
947           case FGT | FSTRING:
948           case PGT | FSTRING:
949             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
950               iter->dbl[0] += weight;
951             iter->dbl[1] += weight;
952             break;
953           case FLT:
954           case PLT:
955             if (v->f < iter->arg[0].f)
956               iter->dbl[0] += weight;
957             iter->dbl[1] += weight;
958             break;
959           case FLT | FSTRING:
960           case PLT | FSTRING:
961             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
962               iter->dbl[0] += weight;
963             iter->dbl[1] += weight;
964             break;
965           case FIN:
966           case PIN:
967             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
968               iter->dbl[0] += weight;
969             iter->dbl[1] += weight;
970             break;
971           case FIN | FSTRING:
972           case PIN | FSTRING:
973             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
974                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
975               iter->dbl[0] += weight;
976             iter->dbl[1] += weight;
977             break;
978           case FOUT:
979           case POUT:
980             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
981               iter->dbl[0] += weight;
982             iter->dbl[1] += weight;
983             break;
984           case FOUT | FSTRING:
985           case POUT | FSTRING:
986             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
987                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
988               iter->dbl[0] += weight;
989             iter->dbl[1] += weight;
990             break;
991           case N:
992             iter->dbl[0] += weight;
993             break;
994           case NU:
995             iter->int1++;
996             break;
997           case FIRST:
998             if (iter->int1 == 0)
999               {
1000                 iter->dbl[0] = v->f;
1001                 iter->int1 = 1;
1002               }
1003             break;
1004           case FIRST | FSTRING:
1005             if (iter->int1 == 0)
1006               {
1007                 memcpy (iter->string, v->s, iter->src->width);
1008                 iter->int1 = 1;
1009               }
1010             break;
1011           case LAST:
1012             iter->dbl[0] = v->f;
1013             iter->int1 = 1;
1014             break;
1015           case LAST | FSTRING:
1016             memcpy (iter->string, v->s, iter->src->width);
1017             iter->int1 = 1;
1018             break;
1019           default:
1020             assert (0);
1021           }
1022     } else {
1023       switch (iter->function)
1024         {
1025         case N_NO_VARS:
1026           iter->dbl[0] += weight;
1027           break;
1028         case NU_NO_VARS:
1029           iter->int1++;
1030           break;
1031         default:
1032           assert (0);
1033         }
1034     }
1035 }
1036
1037 /* We've come to a record that differs from the previous in one or
1038    more of the break variables.  Make an output record from the
1039    accumulated statistics in the OUTPUT case. */
1040 static void 
1041 dump_aggregate_info (struct ccase *output)
1042 {
1043   debug_printf (("(dumping "));
1044   
1045   {
1046     int n_elem = 0;
1047     
1048     {
1049       int i;
1050
1051       for (i = 0; i < sort->var_cnt; i++)
1052         n_elem += sort->vars[i]->nv;
1053     }
1054     debug_printf (("n_elem=%d:", n_elem));
1055     memcpy (output->data, prev_case, sizeof (union value) * n_elem);
1056   }
1057   
1058   {
1059     struct agr_var *i;
1060   
1061     for (i = agr_first; i; i = i->next)
1062       {
1063         union value *v = &output->data[i->dest->fv];
1064
1065         debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
1066
1067         if (missing == COLUMNWISE && i->missing != 0
1068             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1069             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1070           {
1071             if (i->function & FSTRING)
1072               memset (v->s, ' ', i->dest->width);
1073             else
1074               v->f = SYSMIS;
1075             continue;
1076           }
1077         
1078         switch (i->function)
1079           {
1080           case SUM:
1081             v->f = i->dbl[0];
1082             break;
1083           case MEAN:
1084             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1085             break;
1086           case SD:
1087             v->f = ((i->dbl[2] > 1.0)
1088                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1089                     : SYSMIS);
1090             break;
1091           case MAX:
1092           case MIN:
1093             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1094             break;
1095           case MAX | FSTRING:
1096           case MIN | FSTRING:
1097             if (i->int1)
1098               memcpy (v->s, i->string, i->dest->width);
1099             else
1100               memset (v->s, ' ', i->dest->width);
1101             break;
1102           case FGT | FSTRING:
1103           case FLT | FSTRING:
1104           case FIN | FSTRING:
1105           case FOUT | FSTRING:
1106             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1107             break;
1108           case FGT:
1109           case FLT:
1110           case FIN:
1111           case FOUT:
1112             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1113             break;
1114           case PGT:
1115           case PGT | FSTRING:
1116           case PLT:
1117           case PLT | FSTRING:
1118           case PIN:
1119           case PIN | FSTRING:
1120           case POUT:
1121           case POUT | FSTRING:
1122             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1123             break;
1124           case N:
1125             v->f = i->dbl[0];
1126             break;
1127           case NU:
1128             v->f = i->int1;
1129             break;
1130           case FIRST:
1131           case LAST:
1132             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1133             break;
1134           case FIRST | FSTRING:
1135           case LAST | FSTRING:
1136             if (i->int1)
1137               memcpy (v->s, i->string, i->dest->width);
1138             else
1139               memset (v->s, ' ', i->dest->width);
1140             break;
1141           case N_NO_VARS:
1142             v->f = i->dbl[0];
1143             break;
1144           case NU_NO_VARS:
1145             v->f = i->int1;
1146             break;
1147           case NMISS:
1148             v->f = i->dbl[0];
1149             break;
1150           case NUMISS:
1151             v->f = i->int1;
1152             break;
1153           default:
1154             assert (0);
1155           }
1156       }
1157   }
1158   debug_printf ((") "));
1159 }
1160
1161 /* Resets the state for all the aggregate functions. */
1162 static void
1163 initialize_aggregate_info (void)
1164 {
1165   struct agr_var *iter;
1166
1167   for (iter = agr_first; iter; iter = iter->next)
1168     {
1169       iter->missing = 0;
1170       switch (iter->function)
1171         {
1172         case MIN:
1173           iter->dbl[0] = DBL_MAX;
1174           break;
1175         case MIN | FSTRING:
1176           memset (iter->string, 255, iter->src->width);
1177           break;
1178         case MAX:
1179           iter->dbl[0] = -DBL_MAX;
1180           break;
1181         case MAX | FSTRING:
1182           memset (iter->string, 0, iter->src->width);
1183           break;
1184         default:
1185           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1186           iter->int1 = iter->int2 = 0;
1187           break;
1188         }
1189     }
1190 }
1191 \f
1192 /* Aggregate each case as it comes through.  Cases which aren't needed
1193    are dropped. */
1194 static int
1195 agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1196                    int case_num UNUSED)
1197 {
1198   int code = aggregate_single_case (c, compaction_case);
1199   debug_printf (("%d ", code));
1200   return code;
1201 }
1202
1203 /* Output the last aggregate case.  It's okay to call the vfm_sink's
1204    write() method here because end_func is called so soon after all
1205    the cases have been output; very little has been cleaned up at this
1206    point. */
1207 static void
1208 agr_00x_end_func (void *aux UNUSED)
1209 {
1210   /* Ensure that info for the last break group gets written to the
1211      active file. */
1212   dump_aggregate_info (compaction_case);
1213   vfm_sink->class->write (vfm_sink, compaction_case);
1214 }
1215
1216 /* Transform the aggregate case buf_1xx, in internal format, to system
1217    file format, in buf64_1xx, and write the resultant case to the
1218    system file. */
1219 static void
1220 write_case_to_sfm (void)
1221 {
1222   flt64 *p = buf64_1xx;
1223   int i;
1224
1225   for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
1226     {
1227       struct variable *v = dict_get_var (agr_dict, i);
1228       
1229       if (v->type == NUMERIC)
1230         {
1231           double src = buf_1xx->data[v->fv].f;
1232           if (src == SYSMIS)
1233             *p++ = -FLT64_MAX;
1234           else
1235             *p++ = src;
1236         }
1237       else
1238         {
1239           memcpy (p, buf_1xx->data[v->fv].s, v->width);
1240           memset (&((char *) p)[v->width], ' ',
1241                   REM_RND_UP (v->width, sizeof (flt64)));
1242           p += DIV_RND_UP (v->width, sizeof (flt64));
1243         }
1244     }
1245
1246   sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
1247 }
1248
1249 /* Aggregate the current case and output it if we passed a
1250    breakpoint. */
1251 static int
1252 agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c,
1253                    int case_num UNUSED)
1254 {
1255   int code = aggregate_single_case (c, buf_1xx);
1256
1257   assert (code == -2 || code == -1);
1258   if (code == -1)
1259     write_case_to_sfm ();
1260   return -1;
1261 }
1262
1263 /* Close the system file now that we're done with it.  */
1264 static void
1265 agr_10x_trns_free (struct trns_header *h UNUSED)
1266 {
1267   fh_close_handle (outfile);
1268 }
1269
1270 /* Ensure that info for the last break group gets written to the
1271    system file. */
1272 static void
1273 agr_10x_end_func (void *aux UNUSED)
1274 {
1275   dump_aggregate_info (buf_1xx);
1276   write_case_to_sfm ();
1277 }
1278
1279 /* Runs case C through the aggregater and outputs it to the
1280    system file if appropriate.  */
1281 static int
1282 agr_11x_read (const struct ccase *c, void *aux UNUSED)
1283 {
1284   int code = aggregate_single_case (c, buf_1xx);
1285       
1286   assert (code == -2 || code == -1);
1287   if (code == -1)
1288     write_case_to_sfm ();
1289
1290   return 1;
1291 }
1292
1293 /* Finishes up writing the last case if necessary. */
1294 static void
1295 agr_11x_finish (void) 
1296 {
1297   if (case_count)
1298     {
1299       dump_aggregate_info (buf_1xx);
1300       write_case_to_sfm ();
1301     }
1302   fh_close_handle (outfile);
1303 }
1304 \f
1305 /* Debugging. */
1306 #if DEBUGGING
1307 /* Print out useful debugging information. */
1308 static void
1309 debug_print (int flags)
1310 {
1311   printf ("AGGREGATE\n /OUTFILE=%s\n",
1312           outfile ? fh_handle_filename (outfile) : "*");
1313
1314   if (missing == COLUMNWISE)
1315     puts (" /MISSING=COLUMNWISE");
1316
1317   if (flags & 2)
1318     puts (" /DOCUMENT");
1319   if (flags & 4)
1320     puts (" /PRESORTED");
1321   
1322   {
1323     int i;
1324
1325     printf (" /BREAK=");
1326     for (i = 0; i < sort->var_cnt; i++)
1327       printf ("%s(%c) ", sort->vars[i]->name,
1328               sort->vars[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
1329     putc ('\n', stdout);
1330   }
1331   
1332   {
1333     struct agr_var *iter;
1334     
1335     for (iter = agr_first; iter; iter = iter->next)
1336       {
1337         struct agr_func *f = &agr_func_tab[iter->function & FUNC];
1338         
1339         printf (" /%s", iter->dest->name);
1340         if (iter->dest->label)
1341           printf ("'%s'", iter->dest->label);
1342         printf ("=%s(%s", f->name, iter->src->name);
1343         if (f->n_args)
1344           {
1345             int i;
1346             
1347             for (i = 0; i < f->n_args; i++)
1348               {
1349                 putc (',', stdout);
1350                 if (iter->src->type == NUMERIC)
1351                   printf ("%g", iter->arg[i].f);
1352                 else
1353                   printf ("%.*s", iter->src->width, iter->arg[i].c);
1354               }
1355           }
1356         printf (")\n");
1357       }
1358   }
1359 }
1360
1361 #endif /* DEBUGGING */