Fully implement arbitrary delimiters on DATA LIST, extending the half
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "moments.h"
30 #include "pool.h"
31 #include "settings.h"
32 #include "sfm.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 /* Specifies how to make an aggregate variable. */
40 struct agr_var
41   {
42     struct agr_var *next;               /* Next in list. */
43
44     /* Collected during parsing. */
45     struct variable *src;       /* Source variable. */
46     struct variable *dest;      /* Target variable. */
47     int function;               /* Function. */
48     int include_missing;        /* 1=Include user-missing values. */
49     union value arg[2];         /* Arguments. */
50
51     /* Accumulated during AGGREGATE execution. */
52     double dbl[3];
53     int int1, int2;
54     char *string;
55     int missing;
56     struct moments1 *moments;
57   };
58
59 /* Aggregation functions. */
60 enum
61   {
62     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
63     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
64     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
65     FUNC = 0x1f, /* Function mask. */
66     FSTRING = 1<<5, /* String function bit. */
67   };
68
69 /* Attributes of an aggregation function. */
70 struct agr_func
71   {
72     const char *name;           /* Aggregation function name. */
73     int n_args;                 /* Number of arguments. */
74     int alpha_type;             /* When given ALPHA arguments, output type. */
75     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
76   };
77
78 /* Attributes of aggregation functions. */
79 static const struct agr_func agr_func_tab[] = 
80   {
81     {"<NONE>",  0, -1,      {0, 0, 0}},
82     {"SUM",     0, -1,      {FMT_F, 8, 2}},
83     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
84     {"SD",      0, -1,      {FMT_F, 8, 2}},
85     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
86     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
87     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
88     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
89     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
90     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
91     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
94     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
95     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
99     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
100     {"LAST",    0, ALPHA,   {-1, -1, -1}},
101     {NULL,      0, -1,      {-1, -1, -1}},
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
104   };
105
106 /* Missing value types. */
107 enum missing_treatment
108   {
109     ITEMWISE,           /* Missing values item by item. */
110     COLUMNWISE          /* Missing values column by column. */
111   };
112
113 /* An entire AGGREGATE procedure. */
114 struct agr_proc 
115   {
116     /* We have either an output file or a sink. */
117     struct file_handle *out_file;       /* Output file, or null if none. */
118     struct case_sink *sink;             /* Sink, or null if none. */
119
120     enum missing_treatment missing;     /* How to treat missing values. */
121     struct sort_cases_pgm *sort;        /* Sort program. */
122     struct agr_var *vars;               /* First aggregate variable. */
123     struct dictionary *dict;            /* Aggregate dictionary. */
124     int case_cnt;                       /* Counts aggregated cases. */
125     union value *prev_break;            /* Last values of break variables. */
126     struct ccase *agr_case;             /* Aggregate case for output. */
127     flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
128   };
129
130 static void initialize_aggregate_info (struct agr_proc *);
131
132 /* Prototypes. */
133 static int parse_aggregate_functions (struct agr_proc *);
134 static void agr_destroy (struct agr_proc *);
135 static int aggregate_single_case (struct agr_proc *agr,
136                                   const struct ccase *input,
137                                   struct ccase *output);
138 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
139 static int create_sysfile (struct agr_proc *);
140
141 /* Aggregating to the active file. */
142 static int agr_to_active_file (struct ccase *, void *aux);
143
144 /* Aggregating to a system file. */
145 static void write_case_to_sfm (struct agr_proc *agr);
146 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
147 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
148 \f
149 /* Parsing. */
150
151 /* Parses and executes the AGGREGATE procedure. */
152 int
153 cmd_aggregate (void)
154 {
155   struct agr_proc agr;
156
157   /* Have we seen these subcommands? */
158   unsigned seen = 0;
159
160   agr.out_file = NULL;
161   agr.sink = NULL;
162   agr.missing = ITEMWISE;
163   agr.sort = NULL;
164   agr.vars = NULL;
165   agr.dict = NULL;
166   agr.case_cnt = 0;
167   agr.prev_break = NULL;
168   
169   agr.dict = dict_create ();
170   dict_set_label (agr.dict, dict_get_label (default_dict));
171   dict_set_documents (agr.dict, dict_get_documents (default_dict));
172   
173   /* Read most of the subcommands. */
174   for (;;)
175     {
176       lex_match ('/');
177       
178       if (lex_match_id ("OUTFILE"))
179         {
180           if (seen & 1)
181             {
182               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
183               goto lossage;
184             }
185           seen |= 1;
186               
187           lex_match ('=');
188           if (lex_match ('*'))
189             agr.out_file = NULL;
190           else 
191             {
192               agr.out_file = fh_parse_file_handle ();
193               if (agr.out_file == NULL)
194                 goto lossage;
195             }
196         }
197       else if (lex_match_id ("MISSING"))
198         {
199           lex_match ('=');
200           if (!lex_match_id ("COLUMNWISE"))
201             {
202               lex_error (_("while expecting COLUMNWISE"));
203               goto lossage;
204             }
205           agr.missing = COLUMNWISE;
206         }
207       else if (lex_match_id ("DOCUMENT"))
208         seen |= 2;
209       else if (lex_match_id ("PRESORTED"))
210         seen |= 4;
211       else if (lex_match_id ("BREAK"))
212         {
213           if (seen & 8)
214             {
215               msg (SE, _("%s subcommand given multiple times."),"BREAK");
216               goto lossage;
217             }
218           seen |= 8;
219
220           lex_match ('=');
221           agr.sort = parse_sort ();
222           if (agr.sort == NULL)
223             goto lossage;
224           
225           {
226             int i;
227             
228             for (i = 0; i < agr.sort->var_cnt; i++)
229               {
230                 struct variable *v;
231               
232                 v = dict_clone_var (agr.dict, agr.sort->vars[i],
233                                     agr.sort->vars[i]->name);
234                 assert (v != NULL);
235               }
236           }
237         }
238       else break;
239     }
240
241   /* Check for proper syntax. */
242   if (!(seen & 8))
243     msg (SW, _("BREAK subcommand not specified."));
244       
245   /* Read in the aggregate functions. */
246   if (!parse_aggregate_functions (&agr))
247     goto lossage;
248
249   /* Delete documents. */
250   if (!(seen & 2))
251     dict_set_documents (agr.dict, NULL);
252
253   /* Cancel SPLIT FILE. */
254   dict_set_split_vars (agr.dict, NULL, 0);
255   
256   /* Initialize. */
257   agr.case_cnt = 0;
258   agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
259   initialize_aggregate_info (&agr);
260
261   /* Output to active file or external file? */
262   if (agr.out_file == NULL) 
263     {
264       /* The active file will be replaced by the aggregated data,
265          so TEMPORARY is moot. */
266       cancel_temporary ();
267
268       if (agr.sort != NULL && (seen & 4) == 0)
269         sort_cases (agr.sort, 0);
270
271       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272       if (agr.sink->class->open != NULL)
273         agr.sink->class->open (agr.sink);
274       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275       procedure (agr_to_active_file, &agr);
276       if (agr.case_cnt > 0) 
277         {
278           dump_aggregate_info (&agr, agr.agr_case);
279           agr.sink->class->write (agr.sink, agr.agr_case);
280         }
281       dict_destroy (default_dict);
282       default_dict = agr.dict;
283       agr.dict = NULL;
284       vfm_source = agr.sink->class->make_source (agr.sink);
285       free_case_sink (agr.sink);
286     }
287   else
288     {
289       if (!create_sysfile (&agr))
290         goto lossage;
291
292       if (agr.sort != NULL && (seen & 4) == 0) 
293         {
294           /* Sorting is needed. */
295           sort_cases (agr.sort, 1);
296           read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
297         }
298       else 
299         {
300           /* Active file is already sorted. */
301           procedure (presorted_agr_to_sysfile, &agr);
302         }
303       
304       if (agr.case_cnt > 0) 
305         {
306           dump_aggregate_info (&agr, agr.agr_case);
307           write_case_to_sfm (&agr);
308         }
309       fh_close_handle (agr.out_file);
310     }
311   
312   agr_destroy (&agr);
313   return CMD_SUCCESS;
314
315 lossage:
316   agr_destroy (&agr);
317   return CMD_FAILURE;
318 }
319
320 /* Create a system file for use in aggregation to an external
321    file. */
322 static int
323 create_sysfile (struct agr_proc *agr)
324 {
325   struct sfm_write_info w;
326   w.h = agr->out_file;
327   w.dict = agr->dict;
328   w.compress = get_scompression();
329   if (!sfm_write_dictionary (&w))
330     return 0;
331
332   agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
333     
334   return 1;
335 }
336
337 /* Parse all the aggregate functions. */
338 static int
339 parse_aggregate_functions (struct agr_proc *agr)
340 {
341   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
342
343   /* Parse everything. */
344   tail = NULL;
345   for (;;)
346     {
347       char **dest;
348       char **dest_label;
349       int n_dest;
350
351       int include_missing;
352       const struct agr_func *function;
353       int func_index;
354
355       union value arg[2];
356
357       struct variable **src;
358       int n_src;
359
360       int i;
361
362       dest = NULL;
363       dest_label = NULL;
364       n_dest = 0;
365       src = NULL;
366       function = NULL;
367       n_src = 0;
368       arg[0].c = NULL;
369       arg[1].c = NULL;
370
371       /* Parse the list of target variables. */
372       while (!lex_match ('='))
373         {
374           int n_dest_prev = n_dest;
375           
376           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
377             goto lossage;
378
379           /* Assign empty labels. */
380           {
381             int j;
382
383             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
384             for (j = n_dest_prev; j < n_dest; j++)
385               dest_label[j] = NULL;
386           }
387           
388           if (token == T_STRING)
389             {
390               ds_truncate (&tokstr, 255);
391               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
392               lex_get ();
393             }
394         }
395
396       /* Get the name of the aggregation function. */
397       if (token != T_ID)
398         {
399           lex_error (_("expecting aggregation function"));
400           goto lossage;
401         }
402
403       include_missing = 0;
404       if (tokid[strlen (tokid) - 1] == '.')
405         {
406           include_missing = 1;
407           tokid[strlen (tokid) - 1] = 0;
408         }
409       
410       for (function = agr_func_tab; function->name; function++)
411         if (!strcmp (function->name, tokid))
412           break;
413       if (NULL == function->name)
414         {
415           msg (SE, _("Unknown aggregation function %s."), tokid);
416           goto lossage;
417         }
418       func_index = function - agr_func_tab;
419       lex_get ();
420
421       /* Check for leading lparen. */
422       if (!lex_match ('('))
423         {
424           if (func_index == N)
425             func_index = N_NO_VARS;
426           else if (func_index == NU)
427             func_index = NU_NO_VARS;
428           else
429             {
430               lex_error (_("expecting `('"));
431               goto lossage;
432             }
433         } else {
434           /* Parse list of source variables. */
435           {
436             int pv_opts = PV_NO_SCRATCH;
437
438             if (func_index == SUM || func_index == MEAN || func_index == SD)
439               pv_opts |= PV_NUMERIC;
440             else if (function->n_args)
441               pv_opts |= PV_SAME_TYPE;
442
443             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
444               goto lossage;
445           }
446
447           /* Parse function arguments, for those functions that
448              require arguments. */
449           if (function->n_args != 0)
450             for (i = 0; i < function->n_args; i++)
451               {
452                 int type;
453             
454                 lex_match (',');
455                 if (token == T_STRING)
456                   {
457                     arg[i].c = xstrdup (ds_c_str (&tokstr));
458                     type = ALPHA;
459                   }
460                 else if (token == T_NUM)
461                   {
462                     arg[i].f = tokval;
463                     type = NUMERIC;
464                   } else {
465                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
466                     goto lossage;
467                   }
468             
469                 lex_get ();
470
471                 if (type != src[0]->type)
472                   {
473                     msg (SE, _("Arguments to %s must be of same type as "
474                                "source variables."),
475                          function->name);
476                     goto lossage;
477                   }
478               }
479
480           /* Trailing rparen. */
481           if (!lex_match(')'))
482             {
483               lex_error (_("expecting `)'"));
484               goto lossage;
485             }
486           
487           /* Now check that the number of source variables match the
488              number of target variables.  Do this here because if we
489              do it earlier then the user can get very misleading error
490              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
491              error message when a proper message would be more like
492              `unknown variable t'. */
493           if (n_src != n_dest)
494             {
495               msg (SE, _("Number of source variables (%d) does not match "
496                          "number of target variables (%d)."),
497                    n_src, n_dest);
498               goto lossage;
499             }
500         }
501         
502       /* Finally add these to the linked list of aggregation
503          variables. */
504       for (i = 0; i < n_dest; i++)
505         {
506           struct agr_var *v = xmalloc (sizeof *v);
507
508           /* Add variable to chain. */
509           if (agr->vars != NULL)
510             tail->next = v;
511           else
512             agr->vars = v;
513           tail = v;
514           tail->next = NULL;
515           v->moments = NULL;
516           
517           /* Create the target variable in the aggregate
518              dictionary. */
519           {
520             struct variable *destvar;
521             
522             v->function = func_index;
523
524             if (src)
525               {
526                 int output_width;
527
528                 v->src = src[i];
529                 
530                 if (src[i]->type == ALPHA)
531                   {
532                     v->function |= FSTRING;
533                     v->string = xmalloc (src[i]->width);
534                   }
535                 
536                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
537                   output_width = 0;
538                 else
539                   output_width = v->src->width;
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else
544                   {
545                     destvar = dict_create_var (agr->dict, dest[i], output_width);
546                     if (output_width == 0)
547                       destvar->print = destvar->write = function->format;
548                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
549                         && (func_index == N || func_index == N_NO_VARS
550                             || func_index == NU || func_index == NU_NO_VARS))
551                       {
552                         struct fmt_spec f = {FMT_F, 8, 2};
553                       
554                         destvar->print = destvar->write = f;
555                       }
556                   }
557               } else {
558                 v->src = NULL;
559                 destvar = dict_create_var (agr->dict, dest[i], 0);
560               }
561           
562             if (!destvar)
563               {
564                 msg (SE, _("Variable name %s is not unique within the "
565                            "aggregate file dictionary, which contains "
566                            "the aggregate variables and the break "
567                            "variables."),
568                      dest[i]);
569                 free (dest[i]);
570                 goto lossage;
571               }
572
573             free (dest[i]);
574             destvar->init = 0;
575             if (dest_label[i])
576               {
577                 destvar->label = dest_label[i];
578                 dest_label[i] = NULL;
579               }
580             else if (function->alpha_type == ALPHA)
581               destvar->print = destvar->write = function->format;
582
583             v->dest = destvar;
584           }
585           
586           v->include_missing = include_missing;
587
588           if (v->src != NULL)
589             {
590               int j;
591
592               if (v->src->type == NUMERIC)
593                 for (j = 0; j < function->n_args; j++)
594                   v->arg[j].f = arg[j].f;
595               else
596                 for (j = 0; j < function->n_args; j++)
597                   v->arg[j].c = xstrdup (arg[j].c);
598             }
599         }
600       
601       if (src != NULL && src[0]->type == ALPHA)
602         for (i = 0; i < function->n_args; i++)
603           {
604             free (arg[i].c);
605             arg[i].c = NULL;
606           }
607
608       free (src);
609       free (dest);
610       free (dest_label);
611
612       if (!lex_match ('/'))
613         {
614           if (token == '.')
615             return 1;
616
617           lex_error ("expecting end of command");
618           return 0;
619         }
620       continue;
621       
622     lossage:
623       for (i = 0; i < n_dest; i++)
624         {
625           free (dest[i]);
626           free (dest_label[i]);
627         }
628       free (dest);
629       free (dest_label);
630       free (arg[0].c);
631       free (arg[1].c);
632       if (src && n_src && src[0]->type == ALPHA)
633         for (i = 0; i < function->n_args; i++)
634           {
635             free (arg[i].c);
636             arg[i].c = NULL;
637           }
638       free (src);
639         
640       return 0;
641     }
642 }
643
644 /* Destroys AGR. */
645 static void
646 agr_destroy (struct agr_proc *agr)
647 {
648   struct agr_var *iter, *next;
649
650   if (agr->dict != NULL)
651     dict_destroy (agr->dict);
652   if (agr->sort != NULL)
653     destroy_sort_cases_pgm (agr->sort);
654   for (iter = agr->vars; iter; iter = next)
655     {
656       next = iter->next;
657
658       if (iter->function & FSTRING)
659         {
660           int n_args;
661           int i;
662
663           n_args = agr_func_tab[iter->function & FUNC].n_args;
664           for (i = 0; i < n_args; i++)
665             free (iter->arg[i].c);
666           free (iter->string);
667         }
668       else if (iter->function == SD)
669         moments1_destroy (iter->moments);
670       free (iter);
671     }
672   free (agr->prev_break);
673   free (agr->agr_case);
674 }
675 \f
676 /* Execution. */
677
678 static void accumulate_aggregate_info (struct agr_proc *,
679                                        const struct ccase *);
680 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
681
682 /* Processes a single case INPUT for aggregation.  If output is
683    warranted, writes it to OUTPUT and returns nonzero.
684    Otherwise, returns zero and OUTPUT is unmodified. */
685 static int
686 aggregate_single_case (struct agr_proc *agr,
687                        const struct ccase *input, struct ccase *output)
688 {
689   /* The first case always begins a new break group.  We also need to
690      preserve the values of the case for later comparison. */
691   if (agr->case_cnt++ == 0)
692     {
693       int n_elem = 0;
694       
695       {
696         int i;
697
698         for (i = 0; i < agr->sort->var_cnt; i++)
699           n_elem += agr->sort->vars[i]->nv;
700       }
701       
702       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
703
704       /* Copy INPUT into prev_break. */
705       {
706         union value *iter = agr->prev_break;
707         int i;
708
709         for (i = 0; i < agr->sort->var_cnt; i++)
710           {
711             struct variable *v = agr->sort->vars[i];
712             
713             if (v->type == NUMERIC)
714               (iter++)->f = input->data[v->fv].f;
715             else
716               {
717                 memcpy (iter->s, input->data[v->fv].s, v->width);
718                 iter += v->nv;
719               }
720           }
721       }
722             
723       accumulate_aggregate_info (agr, input);
724         
725       return 0;
726     }
727       
728   /* Compare the value of each break variable to the values on the
729      previous case. */
730   {
731     union value *iter = agr->prev_break;
732     int i;
733     
734     for (i = 0; i < agr->sort->var_cnt; i++)
735       {
736         struct variable *v = agr->sort->vars[i];
737       
738         switch (v->type)
739           {
740           case NUMERIC:
741             if (input->data[v->fv].f != iter->f)
742               goto not_equal;
743             iter++;
744             break;
745           case ALPHA:
746             if (memcmp (input->data[v->fv].s, iter->s, v->width))
747               goto not_equal;
748             iter += v->nv;
749             break;
750           default:
751             assert (0);
752           }
753       }
754   }
755
756   accumulate_aggregate_info (agr, input);
757
758   return 0;
759   
760 not_equal:
761   /* The values of the break variable are different from the values on
762      the previous case.  That means that it's time to dump aggregate
763      info. */
764   dump_aggregate_info (agr, output);
765   initialize_aggregate_info (agr);
766   accumulate_aggregate_info (agr, input);
767
768   /* Copy INPUT into prev_break. */
769   {
770     union value *iter = agr->prev_break;
771     int i;
772
773     for (i = 0; i < agr->sort->var_cnt; i++)
774       {
775         struct variable *v = agr->sort->vars[i];
776             
777         if (v->type == NUMERIC)
778           (iter++)->f = input->data[v->fv].f;
779         else
780           {
781             memcpy (iter->s, input->data[v->fv].s, v->width);
782             iter += v->nv;
783           }
784       }
785   }
786   
787   return 1;
788 }
789
790 /* Accumulates aggregation data from the case INPUT. */
791 static void 
792 accumulate_aggregate_info (struct agr_proc *agr,
793                            const struct ccase *input)
794 {
795   struct agr_var *iter;
796   double weight;
797   int bad_warn = 1;
798
799   weight = dict_get_case_weight (default_dict, input, &bad_warn);
800
801   for (iter = agr->vars; iter; iter = iter->next)
802     if (iter->src)
803       {
804         const union value *v = &input->data[iter->src->fv];
805
806         if ((!iter->include_missing && is_missing (v, iter->src))
807             || (iter->include_missing && iter->src->type == NUMERIC
808                 && v->f == SYSMIS))
809           {
810             switch (iter->function)
811               {
812               case NMISS:
813                 iter->dbl[0] += weight;
814                 break;
815               case NUMISS:
816                 iter->int1++;
817                 break;
818               }
819             iter->missing = 1;
820             continue;
821           }
822         
823         /* This is horrible.  There are too many possibilities. */
824         switch (iter->function)
825           {
826           case SUM:
827             iter->dbl[0] += v->f;
828             break;
829           case MEAN:
830             iter->dbl[0] += v->f * weight;
831             iter->dbl[1] += weight;
832             break;
833           case SD:
834             moments1_add (iter->moments, v->f, weight);
835             break;
836           case MAX:
837             iter->dbl[0] = max (iter->dbl[0], v->f);
838             iter->int1 = 1;
839             break;
840           case MAX | FSTRING:
841             if (memcmp (iter->string, v->s, iter->src->width) < 0)
842               memcpy (iter->string, v->s, iter->src->width);
843             iter->int1 = 1;
844             break;
845           case MIN:
846             iter->dbl[0] = min (iter->dbl[0], v->f);
847             iter->int1 = 1;
848             break;
849           case MIN | FSTRING:
850             if (memcmp (iter->string, v->s, iter->src->width) > 0)
851               memcpy (iter->string, v->s, iter->src->width);
852             iter->int1 = 1;
853             break;
854           case FGT:
855           case PGT:
856             if (v->f > iter->arg[0].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FGT | FSTRING:
861           case PGT | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FLT:
867           case PLT:
868             if (v->f < iter->arg[0].f)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FLT | FSTRING:
873           case PLT | FSTRING:
874             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FIN:
879           case PIN:
880             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FIN | FSTRING:
885           case PIN | FSTRING:
886             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
887                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
888               iter->dbl[0] += weight;
889             iter->dbl[1] += weight;
890             break;
891           case FOUT:
892           case POUT:
893             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
894               iter->dbl[0] += weight;
895             iter->dbl[1] += weight;
896             break;
897           case FOUT | FSTRING:
898           case POUT | FSTRING:
899             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
900                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
901               iter->dbl[0] += weight;
902             iter->dbl[1] += weight;
903             break;
904           case N:
905             iter->dbl[0] += weight;
906             break;
907           case NU:
908             iter->int1++;
909             break;
910           case FIRST:
911             if (iter->int1 == 0)
912               {
913                 iter->dbl[0] = v->f;
914                 iter->int1 = 1;
915               }
916             break;
917           case FIRST | FSTRING:
918             if (iter->int1 == 0)
919               {
920                 memcpy (iter->string, v->s, iter->src->width);
921                 iter->int1 = 1;
922               }
923             break;
924           case LAST:
925             iter->dbl[0] = v->f;
926             iter->int1 = 1;
927             break;
928           case LAST | FSTRING:
929             memcpy (iter->string, v->s, iter->src->width);
930             iter->int1 = 1;
931             break;
932           default:
933             assert (0);
934           }
935     } else {
936       switch (iter->function)
937         {
938         case N_NO_VARS:
939           iter->dbl[0] += weight;
940           break;
941         case NU_NO_VARS:
942           iter->int1++;
943           break;
944         default:
945           assert (0);
946         }
947     }
948 }
949
950 /* We've come to a record that differs from the previous in one or
951    more of the break variables.  Make an output record from the
952    accumulated statistics in the OUTPUT case. */
953 static void 
954 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
955 {
956   {
957     int n_elem = 0;
958     
959     {
960       int i;
961
962       for (i = 0; i < agr->sort->var_cnt; i++)
963         n_elem += agr->sort->vars[i]->nv;
964     }
965     memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
966   }
967   
968   {
969     struct agr_var *i;
970   
971     for (i = agr->vars; i; i = i->next)
972       {
973         union value *v = &output->data[i->dest->fv];
974
975         if (agr->missing == COLUMNWISE && i->missing != 0
976             && (i->function & FUNC) != N && (i->function & FUNC) != NU
977             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
978           {
979             if (i->function & FSTRING)
980               memset (v->s, ' ', i->dest->width);
981             else
982               v->f = SYSMIS;
983             continue;
984           }
985         
986         switch (i->function)
987           {
988           case SUM:
989             v->f = i->dbl[0];
990             break;
991           case MEAN:
992             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
993             break;
994           case SD:
995             {
996               double variance;
997
998               /* FIXME: we should use two passes. */
999               moments1_calculate (i->moments, NULL, NULL, &variance,
1000                                  NULL, NULL);
1001               if (variance != SYSMIS)
1002                 v->f = sqrt (variance);
1003               else
1004                 v->f = SYSMIS; 
1005             }
1006             break;
1007           case MAX:
1008           case MIN:
1009             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1010             break;
1011           case MAX | FSTRING:
1012           case MIN | FSTRING:
1013             if (i->int1)
1014               memcpy (v->s, i->string, i->dest->width);
1015             else
1016               memset (v->s, ' ', i->dest->width);
1017             break;
1018           case FGT | FSTRING:
1019           case FLT | FSTRING:
1020           case FIN | FSTRING:
1021           case FOUT | FSTRING:
1022             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1023             break;
1024           case FGT:
1025           case FLT:
1026           case FIN:
1027           case FOUT:
1028             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1029             break;
1030           case PGT:
1031           case PGT | FSTRING:
1032           case PLT:
1033           case PLT | FSTRING:
1034           case PIN:
1035           case PIN | FSTRING:
1036           case POUT:
1037           case POUT | FSTRING:
1038             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1039             break;
1040           case N:
1041             v->f = i->dbl[0];
1042             break;
1043           case NU:
1044             v->f = i->int1;
1045             break;
1046           case FIRST:
1047           case LAST:
1048             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1049             break;
1050           case FIRST | FSTRING:
1051           case LAST | FSTRING:
1052             if (i->int1)
1053               memcpy (v->s, i->string, i->dest->width);
1054             else
1055               memset (v->s, ' ', i->dest->width);
1056             break;
1057           case N_NO_VARS:
1058             v->f = i->dbl[0];
1059             break;
1060           case NU_NO_VARS:
1061             v->f = i->int1;
1062             break;
1063           case NMISS:
1064             v->f = i->dbl[0];
1065             break;
1066           case NUMISS:
1067             v->f = i->int1;
1068             break;
1069           default:
1070             assert (0);
1071           }
1072       }
1073   }
1074 }
1075
1076 /* Resets the state for all the aggregate functions. */
1077 static void
1078 initialize_aggregate_info (struct agr_proc *agr)
1079 {
1080   struct agr_var *iter;
1081
1082   for (iter = agr->vars; iter; iter = iter->next)
1083     {
1084       iter->missing = 0;
1085       switch (iter->function)
1086         {
1087         case MIN:
1088           iter->dbl[0] = DBL_MAX;
1089           break;
1090         case MIN | FSTRING:
1091           memset (iter->string, 255, iter->src->width);
1092           break;
1093         case MAX:
1094           iter->dbl[0] = -DBL_MAX;
1095           break;
1096         case MAX | FSTRING:
1097           memset (iter->string, 0, iter->src->width);
1098           break;
1099         case SD:
1100           if (iter->moments == NULL)
1101             iter->moments = moments1_create (MOMENT_VARIANCE);
1102           else
1103             moments1_clear (iter->moments);
1104           break;
1105         default:
1106           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1107           iter->int1 = iter->int2 = 0;
1108           break;
1109         }
1110     }
1111 }
1112 \f
1113 /* Aggregate each case as it comes through.  Cases which aren't needed
1114    are dropped. */
1115 static int
1116 agr_to_active_file (struct ccase *c, void *agr_)
1117 {
1118   struct agr_proc *agr = agr_;
1119
1120   if (aggregate_single_case (agr, c, agr->agr_case)) 
1121     agr->sink->class->write (agr->sink, agr->agr_case);
1122
1123   return 1;
1124 }
1125
1126 /* Writes AGR->agr_case to AGR->out_file. */
1127 static void
1128 write_case_to_sfm (struct agr_proc *agr)
1129 {
1130   flt64 *p;
1131   int i;
1132
1133   p = agr->sfm_agr_case;
1134   for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1135     {
1136       struct variable *v = dict_get_var (agr->dict, i);
1137       
1138       if (v->type == NUMERIC)
1139         {
1140           double src = agr->agr_case->data[v->fv].f;
1141           if (src == SYSMIS)
1142             *p++ = -FLT64_MAX;
1143           else
1144             *p++ = src;
1145         }
1146       else
1147         {
1148           memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1149           memset (&((char *) p)[v->width], ' ',
1150                   REM_RND_UP (v->width, sizeof (flt64)));
1151           p += DIV_RND_UP (v->width, sizeof (flt64));
1152         }
1153     }
1154
1155   sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1156 }
1157
1158 /* Aggregate the current case and output it if we passed a
1159    breakpoint. */
1160 static int
1161 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1162 {
1163   sort_agr_to_sysfile (c, agr_);
1164   return 1;
1165 }
1166
1167 /* Aggregate the current case and output it if we passed a
1168    breakpoint. */
1169 static int
1170 sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
1171 {
1172   struct agr_proc *agr = agr_;
1173
1174   if (aggregate_single_case (agr, c, agr->agr_case)) 
1175     write_case_to_sfm (agr);
1176
1177   return 1;
1178 }