Remove `init' member from struct variable, which was essentially
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <libpspp/message.h>
22 #include <stdlib.h>
23 #include <libpspp/alloc.h>
24 #include <data/any-writer.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <language/command.h>
28 #include <data/dictionary.h>
29 #include <libpspp/message.h>
30 #include <data/file-handle-def.h>
31 #include <language/data-io/file-handle.h>
32 #include <language/lexer/lexer.h>
33 #include <language/stats/sort-criteria.h>
34 #include <libpspp/misc.h>
35 #include <math/moments.h>
36 #include <libpspp/pool.h>
37 #include <data/settings.h>
38 #include <data/sys-file-writer.h>
39 #include <math/sort.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <procedure.h>
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 /* Specifies how to make an aggregate variable. */
48 struct agr_var
49   {
50     struct agr_var *next;               /* Next in list. */
51
52     /* Collected during parsing. */
53     struct variable *src;       /* Source variable. */
54     struct variable *dest;      /* Target variable. */
55     int function;               /* Function. */
56     int include_missing;        /* 1=Include user-missing values. */
57     union value arg[2];         /* Arguments. */
58
59     /* Accumulated during AGGREGATE execution. */
60     double dbl[3];
61     int int1, int2;
62     char *string;
63     int missing;
64     struct moments1 *moments;
65   };
66
67 /* Aggregation functions. */
68 enum
69   {
70     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
71     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
72     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
73     FUNC = 0x1f, /* Function mask. */
74     FSTRING = 1<<5, /* String function bit. */
75   };
76
77 /* Attributes of an aggregation function. */
78 struct agr_func
79   {
80     const char *name;           /* Aggregation function name. */
81     size_t n_args;              /* Number of arguments. */
82     int alpha_type;             /* When given ALPHA arguments, output type. */
83     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
84   };
85
86 /* Attributes of aggregation functions. */
87 static const struct agr_func agr_func_tab[] = 
88   {
89     {"<NONE>",  0, -1,      {0, 0, 0}},
90     {"SUM",     0, -1,      {FMT_F, 8, 2}},
91     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
92     {"SD",      0, -1,      {FMT_F, 8, 2}},
93     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
94     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
95     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
96     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
97     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
98     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
99     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
100     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
101     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
102     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
105     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
106     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
107     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
108     {"LAST",    0, ALPHA,   {-1, -1, -1}},
109     {NULL,      0, -1,      {-1, -1, -1}},
110     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
111     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
112   };
113
114 /* Missing value types. */
115 enum missing_treatment
116   {
117     ITEMWISE,           /* Missing values item by item. */
118     COLUMNWISE          /* Missing values column by column. */
119   };
120
121 /* An entire AGGREGATE procedure. */
122 struct agr_proc 
123   {
124     /* We have either an output file or a sink. */
125     struct any_writer *writer;          /* Output file, or null if none. */
126     struct case_sink *sink;             /* Sink, or null if none. */
127
128     /* Break variables. */
129     struct sort_criteria *sort;         /* Sort criteria. */
130     struct variable **break_vars;       /* Break variables. */
131     size_t break_var_cnt;               /* Number of break variables. */
132     struct ccase break_case;            /* Last values of break variables. */
133
134     enum missing_treatment missing;     /* How to treat missing values. */
135     struct agr_var *agr_vars;           /* First aggregate variable. */
136     struct dictionary *dict;            /* Aggregate dictionary. */
137     int case_cnt;                       /* Counts aggregated cases. */
138     struct ccase agr_case;              /* Aggregate case for output. */
139   };
140
141 static void initialize_aggregate_info (struct agr_proc *,
142                                        const struct ccase *);
143
144 /* Prototypes. */
145 static int parse_aggregate_functions (struct agr_proc *);
146 static void agr_destroy (struct agr_proc *);
147 static int aggregate_single_case (struct agr_proc *agr,
148                                   const struct ccase *input,
149                                   struct ccase *output);
150 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
151
152 /* Aggregating to the active file. */
153 static bool agr_to_active_file (struct ccase *, void *aux);
154
155 /* Aggregating to a system file. */
156 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
157 \f
158 /* Parsing. */
159
160 /* Parses and executes the AGGREGATE procedure. */
161 int
162 cmd_aggregate (void)
163 {
164   struct agr_proc agr;
165   struct file_handle *out_file = NULL;
166
167   bool copy_documents = false;
168   bool presorted = false;
169   bool saw_direction;
170
171   memset(&agr, 0 , sizeof (agr));
172   agr.missing = ITEMWISE;
173   case_nullify (&agr.break_case);
174   
175   agr.dict = dict_create ();
176   dict_set_label (agr.dict, dict_get_label (default_dict));
177   dict_set_documents (agr.dict, dict_get_documents (default_dict));
178
179   /* OUTFILE subcommand must be first. */
180   if (!lex_force_match_id ("OUTFILE"))
181     goto error;
182   lex_match ('=');
183   if (!lex_match ('*'))
184     {
185       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
186       if (out_file == NULL)
187         goto error;
188     }
189   
190   /* Read most of the subcommands. */
191   for (;;)
192     {
193       lex_match ('/');
194       
195       if (lex_match_id ("MISSING"))
196         {
197           lex_match ('=');
198           if (!lex_match_id ("COLUMNWISE"))
199             {
200               lex_error (_("while expecting COLUMNWISE"));
201               goto error;
202             }
203           agr.missing = COLUMNWISE;
204         }
205       else if (lex_match_id ("DOCUMENT"))
206         copy_documents = true;
207       else if (lex_match_id ("PRESORTED"))
208         presorted = true;
209       else if (lex_match_id ("BREAK"))
210         {
211           int i;
212
213           lex_match ('=');
214           agr.sort = sort_parse_criteria (default_dict,
215                                           &agr.break_vars, &agr.break_var_cnt,
216                                           &saw_direction, NULL);
217           if (agr.sort == NULL)
218             goto error;
219           
220           for (i = 0; i < agr.break_var_cnt; i++)
221             dict_clone_var_assert (agr.dict, agr.break_vars[i],
222                                    agr.break_vars[i]->name);
223
224           /* BREAK must follow the options. */
225           break;
226         }
227       else
228         {
229           lex_error (_("expecting BREAK"));
230           goto error;
231         }
232     }
233   if (presorted && saw_direction)
234     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
235                "with (A) or (D) has no effect.  Output data will be sorted "
236                "the same way as the input data."));
237       
238   /* Read in the aggregate functions. */
239   lex_match ('/');
240   if (!parse_aggregate_functions (&agr))
241     goto error;
242
243   /* Delete documents. */
244   if (!copy_documents)
245     dict_set_documents (agr.dict, NULL);
246
247   /* Cancel SPLIT FILE. */
248   dict_set_split_vars (agr.dict, NULL, 0);
249   
250   /* Initialize. */
251   agr.case_cnt = 0;
252   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
253
254   /* Output to active file or external file? */
255   if (out_file == NULL) 
256     {
257       /* The active file will be replaced by the aggregated data,
258          so TEMPORARY is moot. */
259       cancel_temporary ();
260
261       if (agr.sort != NULL && !presorted) 
262         {
263           if (!sort_active_file_in_place (agr.sort))
264             goto error;
265         }
266
267       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
268       if (agr.sink->class->open != NULL)
269         agr.sink->class->open (agr.sink);
270       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
271       if (!procedure (agr_to_active_file, &agr))
272         goto error;
273       if (agr.case_cnt > 0) 
274         {
275           dump_aggregate_info (&agr, &agr.agr_case);
276           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
277             goto error;
278         }
279       dict_destroy (default_dict);
280       default_dict = agr.dict;
281       agr.dict = NULL;
282       vfm_source = agr.sink->class->make_source (agr.sink);
283       free_case_sink (agr.sink);
284     }
285   else
286     {
287       agr.writer = any_writer_open (out_file, agr.dict);
288       if (agr.writer == NULL)
289         goto error;
290       
291       if (agr.sort != NULL && !presorted) 
292         {
293           /* Sorting is needed. */
294           struct casefile *dst;
295           struct casereader *reader;
296           struct ccase c;
297           bool ok = true;
298           
299           dst = sort_active_file_to_casefile (agr.sort);
300           if (dst == NULL)
301             goto error;
302           reader = casefile_get_destructive_reader (dst);
303           while (ok && casereader_read_xfer (reader, &c)) 
304             {
305               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
306                 ok = any_writer_write (agr.writer, &agr.agr_case);
307               case_destroy (&c);
308             }
309           casereader_destroy (reader);
310           if (ok)
311             ok = !casefile_error (dst);
312           casefile_destroy (dst);
313           if (!ok)
314             goto error;
315         }
316       else 
317         {
318           /* Active file is already sorted. */
319           if (!procedure (presorted_agr_to_sysfile, &agr))
320             goto error;
321         }
322       
323       if (agr.case_cnt > 0) 
324         {
325           dump_aggregate_info (&agr, &agr.agr_case);
326           any_writer_write (agr.writer, &agr.agr_case);
327         }
328       if (any_writer_error (agr.writer))
329         goto error;
330     }
331   
332   agr_destroy (&agr);
333   return CMD_SUCCESS;
334
335 error:
336   agr_destroy (&agr);
337   return CMD_CASCADING_FAILURE;
338 }
339
340 /* Parse all the aggregate functions. */
341 static int
342 parse_aggregate_functions (struct agr_proc *agr)
343 {
344   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
345
346   /* Parse everything. */
347   tail = NULL;
348   for (;;)
349     {
350       char **dest;
351       char **dest_label;
352       size_t n_dest;
353
354       int include_missing;
355       const struct agr_func *function;
356       int func_index;
357
358       union value arg[2];
359
360       struct variable **src;
361       size_t n_src;
362
363       size_t i;
364
365       dest = NULL;
366       dest_label = NULL;
367       n_dest = 0;
368       src = NULL;
369       function = NULL;
370       n_src = 0;
371       arg[0].c = NULL;
372       arg[1].c = NULL;
373
374       /* Parse the list of target variables. */
375       while (!lex_match ('='))
376         {
377           size_t n_dest_prev = n_dest;
378           
379           if (!parse_DATA_LIST_vars (&dest, &n_dest,
380                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
381             goto error;
382
383           /* Assign empty labels. */
384           {
385             int j;
386
387             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
388             for (j = n_dest_prev; j < n_dest; j++)
389               dest_label[j] = NULL;
390           }
391           
392           if (token == T_STRING)
393             {
394               ds_truncate (&tokstr, 255);
395               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
396               lex_get ();
397             }
398         }
399
400       /* Get the name of the aggregation function. */
401       if (token != T_ID)
402         {
403           lex_error (_("expecting aggregation function"));
404           goto error;
405         }
406
407       include_missing = 0;
408       if (tokid[strlen (tokid) - 1] == '.')
409         {
410           include_missing = 1;
411           tokid[strlen (tokid) - 1] = 0;
412         }
413       
414       for (function = agr_func_tab; function->name; function++)
415         if (!strcasecmp (function->name, tokid))
416           break;
417       if (NULL == function->name)
418         {
419           msg (SE, _("Unknown aggregation function %s."), tokid);
420           goto error;
421         }
422       func_index = function - agr_func_tab;
423       lex_get ();
424
425       /* Check for leading lparen. */
426       if (!lex_match ('('))
427         {
428           if (func_index == N)
429             func_index = N_NO_VARS;
430           else if (func_index == NU)
431             func_index = NU_NO_VARS;
432           else
433             {
434               lex_error (_("expecting `('"));
435               goto error;
436             }
437         }
438       else
439         {
440           /* Parse list of source variables. */
441           {
442             int pv_opts = PV_NO_SCRATCH;
443
444             if (func_index == SUM || func_index == MEAN || func_index == SD)
445               pv_opts |= PV_NUMERIC;
446             else if (function->n_args)
447               pv_opts |= PV_SAME_TYPE;
448
449             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
450               goto error;
451           }
452
453           /* Parse function arguments, for those functions that
454              require arguments. */
455           if (function->n_args != 0)
456             for (i = 0; i < function->n_args; i++)
457               {
458                 int type;
459             
460                 lex_match (',');
461                 if (token == T_STRING)
462                   {
463                     arg[i].c = xstrdup (ds_c_str (&tokstr));
464                     type = ALPHA;
465                   }
466                 else if (lex_is_number ())
467                   {
468                     arg[i].f = tokval;
469                     type = NUMERIC;
470                   } else {
471                     msg (SE, _("Missing argument %d to %s."), i + 1,
472                          function->name);
473                     goto error;
474                   }
475             
476                 lex_get ();
477
478                 if (type != src[0]->type)
479                   {
480                     msg (SE, _("Arguments to %s must be of same type as "
481                                "source variables."),
482                          function->name);
483                     goto error;
484                   }
485               }
486
487           /* Trailing rparen. */
488           if (!lex_match(')'))
489             {
490               lex_error (_("expecting `)'"));
491               goto error;
492             }
493           
494           /* Now check that the number of source variables match
495              the number of target variables.  If we check earlier
496              than this, the user can get very misleading error
497              message, i.e. `AGGREGATE x=SUM(y t).' will get this
498              error message when a proper message would be more
499              like `unknown variable t'. */
500           if (n_src != n_dest)
501             {
502               msg (SE, _("Number of source variables (%u) does not match "
503                          "number of target variables (%u)."),
504                    (unsigned) n_src, (unsigned) n_dest);
505               goto error;
506             }
507
508           if ((func_index == PIN || func_index == POUT
509               || func_index == FIN || func_index == FOUT) 
510               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
511                   || (src[0]->type == ALPHA
512                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
513             {
514               union value t = arg[0];
515               arg[0] = arg[1];
516               arg[1] = t;
517                   
518               msg (SW, _("The value arguments passed to the %s function "
519                          "are out-of-order.  They will be treated as if "
520                          "they had been specified in the correct order."),
521                    function->name);
522             }
523         }
524         
525       /* Finally add these to the linked list of aggregation
526          variables. */
527       for (i = 0; i < n_dest; i++)
528         {
529           struct agr_var *v = xmalloc (sizeof *v);
530
531           /* Add variable to chain. */
532           if (agr->agr_vars != NULL)
533             tail->next = v;
534           else
535             agr->agr_vars = v;
536           tail = v;
537           tail->next = NULL;
538           v->moments = NULL;
539           
540           /* Create the target variable in the aggregate
541              dictionary. */
542           {
543             struct variable *destvar;
544             
545             v->function = func_index;
546
547             if (src)
548               {
549                 v->src = src[i];
550                 
551                 if (src[i]->type == ALPHA)
552                   {
553                     v->function |= FSTRING;
554                     v->string = xmalloc (src[i]->width);
555                   }
556
557                 if (function->alpha_type == ALPHA)
558                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
559                 else
560                   {
561                     assert (v->src->type == NUMERIC
562                             || function->alpha_type == NUMERIC);
563                     destvar = dict_create_var (agr->dict, dest[i], 0);
564                     if (destvar != NULL) 
565                       {
566                         if ((func_index == N || func_index == NMISS)
567                             && dict_get_weight (default_dict) != NULL)
568                           destvar->print = destvar->write = f8_2; 
569                         else
570                           destvar->print = destvar->write = function->format;
571                       }
572                   }
573               } else {
574                 v->src = NULL;
575                 destvar = dict_create_var (agr->dict, dest[i], 0);
576                 if (func_index == N_NO_VARS
577                     && dict_get_weight (default_dict) != NULL)
578                   destvar->print = destvar->write = f8_2; 
579                 else
580                   destvar->print = destvar->write = function->format;
581               }
582           
583             if (!destvar)
584               {
585                 msg (SE, _("Variable name %s is not unique within the "
586                            "aggregate file dictionary, which contains "
587                            "the aggregate variables and the break "
588                            "variables."),
589                      dest[i]);
590                 goto error;
591               }
592
593             free (dest[i]);
594             if (dest_label[i])
595               {
596                 destvar->label = dest_label[i];
597                 dest_label[i] = NULL;
598               }
599
600             v->dest = destvar;
601           }
602           
603           v->include_missing = include_missing;
604
605           if (v->src != NULL)
606             {
607               int j;
608
609               if (v->src->type == NUMERIC)
610                 for (j = 0; j < function->n_args; j++)
611                   v->arg[j].f = arg[j].f;
612               else
613                 for (j = 0; j < function->n_args; j++)
614                   v->arg[j].c = xstrdup (arg[j].c);
615             }
616         }
617       
618       if (src != NULL && src[0]->type == ALPHA)
619         for (i = 0; i < function->n_args; i++)
620           {
621             free (arg[i].c);
622             arg[i].c = NULL;
623           }
624
625       free (src);
626       free (dest);
627       free (dest_label);
628
629       if (!lex_match ('/'))
630         {
631           if (token == '.')
632             return 1;
633
634           lex_error ("expecting end of command");
635           return 0;
636         }
637       continue;
638       
639     error:
640       for (i = 0; i < n_dest; i++)
641         {
642           free (dest[i]);
643           free (dest_label[i]);
644         }
645       free (dest);
646       free (dest_label);
647       free (arg[0].c);
648       free (arg[1].c);
649       if (src && n_src && src[0]->type == ALPHA)
650         for (i = 0; i < function->n_args; i++)
651           {
652             free (arg[i].c);
653             arg[i].c = NULL;
654           }
655       free (src);
656         
657       return 0;
658     }
659 }
660
661 /* Destroys AGR. */
662 static void
663 agr_destroy (struct agr_proc *agr)
664 {
665   struct agr_var *iter, *next;
666
667   any_writer_close (agr->writer);
668   if (agr->sort != NULL)
669     sort_destroy_criteria (agr->sort);
670   free (agr->break_vars);
671   case_destroy (&agr->break_case);
672   for (iter = agr->agr_vars; iter; iter = next)
673     {
674       next = iter->next;
675
676       if (iter->function & FSTRING)
677         {
678           size_t n_args;
679           size_t i;
680
681           n_args = agr_func_tab[iter->function & FUNC].n_args;
682           for (i = 0; i < n_args; i++)
683             free (iter->arg[i].c);
684           free (iter->string);
685         }
686       else if (iter->function == SD)
687         moments1_destroy (iter->moments);
688       free (iter);
689     }
690   if (agr->dict != NULL)
691     dict_destroy (agr->dict);
692
693   case_destroy (&agr->agr_case);
694 }
695 \f
696 /* Execution. */
697
698 static void accumulate_aggregate_info (struct agr_proc *,
699                                        const struct ccase *);
700 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
701
702 /* Processes a single case INPUT for aggregation.  If output is
703    warranted, writes it to OUTPUT and returns nonzero.
704    Otherwise, returns zero and OUTPUT is unmodified. */
705 static int
706 aggregate_single_case (struct agr_proc *agr,
707                        const struct ccase *input, struct ccase *output)
708 {
709   bool finished_group = false;
710   
711   if (agr->case_cnt++ == 0)
712     initialize_aggregate_info (agr, input);
713   else if (case_compare (&agr->break_case, input,
714                          agr->break_vars, agr->break_var_cnt))
715     {
716       dump_aggregate_info (agr, output);
717       finished_group = true;
718
719       initialize_aggregate_info (agr, input);
720     }
721
722   accumulate_aggregate_info (agr, input);
723   return finished_group;
724 }
725
726 /* Accumulates aggregation data from the case INPUT. */
727 static void 
728 accumulate_aggregate_info (struct agr_proc *agr,
729                            const struct ccase *input)
730 {
731   struct agr_var *iter;
732   double weight;
733   int bad_warn = 1;
734
735   weight = dict_get_case_weight (default_dict, input, &bad_warn);
736
737   for (iter = agr->agr_vars; iter; iter = iter->next)
738     if (iter->src)
739       {
740         const union value *v = case_data (input, iter->src->fv);
741
742         if ((!iter->include_missing
743              && mv_is_value_missing (&iter->src->miss, v))
744             || (iter->include_missing && iter->src->type == NUMERIC
745                 && v->f == SYSMIS))
746           {
747             switch (iter->function)
748               {
749               case NMISS:
750               case NMISS | FSTRING:
751                 iter->dbl[0] += weight;
752                 break;
753               case NUMISS:
754               case NUMISS | FSTRING:
755                 iter->int1++;
756                 break;
757               }
758             iter->missing = 1;
759             continue;
760           }
761         
762         /* This is horrible.  There are too many possibilities. */
763         switch (iter->function)
764           {
765           case SUM:
766             iter->dbl[0] += v->f * weight;
767             iter->int1 = 1;
768             break;
769           case MEAN:
770             iter->dbl[0] += v->f * weight;
771             iter->dbl[1] += weight;
772             break;
773           case SD:
774             moments1_add (iter->moments, v->f, weight);
775             break;
776           case MAX:
777             iter->dbl[0] = max (iter->dbl[0], v->f);
778             iter->int1 = 1;
779             break;
780           case MAX | FSTRING:
781             if (memcmp (iter->string, v->s, iter->src->width) < 0)
782               memcpy (iter->string, v->s, iter->src->width);
783             iter->int1 = 1;
784             break;
785           case MIN:
786             iter->dbl[0] = min (iter->dbl[0], v->f);
787             iter->int1 = 1;
788             break;
789           case MIN | FSTRING:
790             if (memcmp (iter->string, v->s, iter->src->width) > 0)
791               memcpy (iter->string, v->s, iter->src->width);
792             iter->int1 = 1;
793             break;
794           case FGT:
795           case PGT:
796             if (v->f > iter->arg[0].f)
797               iter->dbl[0] += weight;
798             iter->dbl[1] += weight;
799             break;
800           case FGT | FSTRING:
801           case PGT | FSTRING:
802             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
803               iter->dbl[0] += weight;
804             iter->dbl[1] += weight;
805             break;
806           case FLT:
807           case PLT:
808             if (v->f < iter->arg[0].f)
809               iter->dbl[0] += weight;
810             iter->dbl[1] += weight;
811             break;
812           case FLT | FSTRING:
813           case PLT | FSTRING:
814             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
815               iter->dbl[0] += weight;
816             iter->dbl[1] += weight;
817             break;
818           case FIN:
819           case PIN:
820             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
821               iter->dbl[0] += weight;
822             iter->dbl[1] += weight;
823             break;
824           case FIN | FSTRING:
825           case PIN | FSTRING:
826             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
827                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
828               iter->dbl[0] += weight;
829             iter->dbl[1] += weight;
830             break;
831           case FOUT:
832           case POUT:
833             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
834               iter->dbl[0] += weight;
835             iter->dbl[1] += weight;
836             break;
837           case FOUT | FSTRING:
838           case POUT | FSTRING:
839             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
840                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
841               iter->dbl[0] += weight;
842             iter->dbl[1] += weight;
843             break;
844           case N:
845           case N | FSTRING:
846             iter->dbl[0] += weight;
847             break;
848           case NU:
849           case NU | FSTRING:
850             iter->int1++;
851             break;
852           case FIRST:
853             if (iter->int1 == 0)
854               {
855                 iter->dbl[0] = v->f;
856                 iter->int1 = 1;
857               }
858             break;
859           case FIRST | FSTRING:
860             if (iter->int1 == 0)
861               {
862                 memcpy (iter->string, v->s, iter->src->width);
863                 iter->int1 = 1;
864               }
865             break;
866           case LAST:
867             iter->dbl[0] = v->f;
868             iter->int1 = 1;
869             break;
870           case LAST | FSTRING:
871             memcpy (iter->string, v->s, iter->src->width);
872             iter->int1 = 1;
873             break;
874           case NMISS:
875           case NMISS | FSTRING:
876           case NUMISS:
877           case NUMISS | FSTRING:
878             /* Our value is not missing or it would have been
879                caught earlier.  Nothing to do. */
880             break;
881           default:
882             assert (0);
883           }
884     } else {
885       switch (iter->function)
886         {
887         case N_NO_VARS:
888           iter->dbl[0] += weight;
889           break;
890         case NU_NO_VARS:
891           iter->int1++;
892           break;
893         default:
894           assert (0);
895         }
896     }
897 }
898
899 /* We've come to a record that differs from the previous in one or
900    more of the break variables.  Make an output record from the
901    accumulated statistics in the OUTPUT case. */
902 static void 
903 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
904 {
905   {
906     int value_idx = 0;
907     int i;
908
909     for (i = 0; i < agr->break_var_cnt; i++) 
910       {
911         struct variable *v = agr->break_vars[i];
912         memcpy (case_data_rw (output, value_idx),
913                 case_data (&agr->break_case, v->fv),
914                 sizeof (union value) * v->nv);
915         value_idx += v->nv; 
916       }
917   }
918   
919   {
920     struct agr_var *i;
921   
922     for (i = agr->agr_vars; i; i = i->next)
923       {
924         union value *v = case_data_rw (output, i->dest->fv);
925
926         if (agr->missing == COLUMNWISE && i->missing != 0
927             && (i->function & FUNC) != N && (i->function & FUNC) != NU
928             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
929           {
930             if (i->dest->type == ALPHA)
931               memset (v->s, ' ', i->dest->width);
932             else
933               v->f = SYSMIS;
934             continue;
935           }
936         
937         switch (i->function)
938           {
939           case SUM:
940             v->f = i->int1 ? i->dbl[0] : SYSMIS;
941             break;
942           case MEAN:
943             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
944             break;
945           case SD:
946             {
947               double variance;
948
949               /* FIXME: we should use two passes. */
950               moments1_calculate (i->moments, NULL, NULL, &variance,
951                                  NULL, NULL);
952               if (variance != SYSMIS)
953                 v->f = sqrt (variance);
954               else
955                 v->f = SYSMIS; 
956             }
957             break;
958           case MAX:
959           case MIN:
960             v->f = i->int1 ? i->dbl[0] : SYSMIS;
961             break;
962           case MAX | FSTRING:
963           case MIN | FSTRING:
964             if (i->int1)
965               memcpy (v->s, i->string, i->dest->width);
966             else
967               memset (v->s, ' ', i->dest->width);
968             break;
969           case FGT:
970           case FGT | FSTRING:
971           case FLT:
972           case FLT | FSTRING:
973           case FIN:
974           case FIN | FSTRING:
975           case FOUT:
976           case FOUT | FSTRING:
977             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
978             break;
979           case PGT:
980           case PGT | FSTRING:
981           case PLT:
982           case PLT | FSTRING:
983           case PIN:
984           case PIN | FSTRING:
985           case POUT:
986           case POUT | FSTRING:
987             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
988             break;
989           case N:
990           case N | FSTRING:
991             v->f = i->dbl[0];
992             break;
993           case NU:
994           case NU | FSTRING:
995             v->f = i->int1;
996             break;
997           case FIRST:
998           case LAST:
999             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1000             break;
1001           case FIRST | FSTRING:
1002           case LAST | FSTRING:
1003             if (i->int1)
1004               memcpy (v->s, i->string, i->dest->width);
1005             else
1006               memset (v->s, ' ', i->dest->width);
1007             break;
1008           case N_NO_VARS:
1009             v->f = i->dbl[0];
1010             break;
1011           case NU_NO_VARS:
1012             v->f = i->int1;
1013             break;
1014           case NMISS:
1015           case NMISS | FSTRING:
1016             v->f = i->dbl[0];
1017             break;
1018           case NUMISS:
1019           case NUMISS | FSTRING:
1020             v->f = i->int1;
1021             break;
1022           default:
1023             assert (0);
1024           }
1025       }
1026   }
1027 }
1028
1029 /* Resets the state for all the aggregate functions. */
1030 static void
1031 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1032 {
1033   struct agr_var *iter;
1034
1035   case_destroy (&agr->break_case);
1036   case_clone (&agr->break_case, input);
1037
1038   for (iter = agr->agr_vars; iter; iter = iter->next)
1039     {
1040       iter->missing = 0;
1041       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1042       iter->int1 = iter->int2 = 0;
1043       switch (iter->function)
1044         {
1045         case MIN:
1046           iter->dbl[0] = DBL_MAX;
1047           break;
1048         case MIN | FSTRING:
1049           memset (iter->string, 255, iter->src->width);
1050           break;
1051         case MAX:
1052           iter->dbl[0] = -DBL_MAX;
1053           break;
1054         case MAX | FSTRING:
1055           memset (iter->string, 0, iter->src->width);
1056           break;
1057         case SD:
1058           if (iter->moments == NULL)
1059             iter->moments = moments1_create (MOMENT_VARIANCE);
1060           else
1061             moments1_clear (iter->moments);
1062           break;
1063         default:
1064           break;
1065         }
1066     }
1067 }
1068 \f
1069 /* Aggregate each case as it comes through.  Cases which aren't needed
1070    are dropped.
1071    Returns true if successful, false if an I/O error occurred. */
1072 static bool
1073 agr_to_active_file (struct ccase *c, void *agr_)
1074 {
1075   struct agr_proc *agr = agr_;
1076
1077   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1078     return agr->sink->class->write (agr->sink, &agr->agr_case);
1079
1080   return true;
1081 }
1082
1083 /* Aggregate the current case and output it if we passed a
1084    breakpoint. */
1085 static bool
1086 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1087 {
1088   struct agr_proc *agr = agr_;
1089
1090   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1091     return any_writer_write (agr->writer, &agr->agr_case);
1092   
1093   return true;
1094 }