Encapsulated lexer and updated calling functions accordingly.
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Argument for AGGREGATE function. */
53 union agr_argument
54   {
55     double f;                           /* Numeric. */
56     char *c;                            /* Short or long string. */
57   };
58
59 /* Specifies how to make an aggregate variable. */
60 struct agr_var
61   {
62     struct agr_var *next;               /* Next in list. */
63
64     /* Collected during parsing. */
65     struct variable *src;       /* Source variable. */
66     struct variable *dest;      /* Target variable. */
67     int function;               /* Function. */
68     int include_missing;        /* 1=Include user-missing values. */
69     union agr_argument arg[2];  /* Arguments. */
70
71     /* Accumulated during AGGREGATE execution. */
72     double dbl[3];
73     int int1, int2;
74     char *string;
75     int missing;
76     struct moments1 *moments;
77   };
78
79 /* Aggregation functions. */
80 enum
81   {
82     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85     FUNC = 0x1f, /* Function mask. */
86     FSTRING = 1<<5, /* String function bit. */
87   };
88
89 /* Attributes of an aggregation function. */
90 struct agr_func
91   {
92     const char *name;           /* Aggregation function name. */
93     size_t n_args;              /* Number of arguments. */
94     int alpha_type;             /* When given ALPHA arguments, output type. */
95     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
96   };
97
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] = 
100   {
101     {"<NONE>",  0, -1,      {0, 0, 0}},
102     {"SUM",     0, -1,      {FMT_F, 8, 2}},
103     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
104     {"SD",      0, -1,      {FMT_F, 8, 2}},
105     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
106     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
107     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
108     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
109     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
110     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
111     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
113     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
114     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
119     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
120     {"LAST",    0, ALPHA,   {-1, -1, -1}},
121     {NULL,      0, -1,      {-1, -1, -1}},
122     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
123     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
124   };
125
126 /* Missing value types. */
127 enum missing_treatment
128   {
129     ITEMWISE,           /* Missing values item by item. */
130     COLUMNWISE          /* Missing values column by column. */
131   };
132
133 /* An entire AGGREGATE procedure. */
134 struct agr_proc 
135   {
136     /* We have either an output file or a sink. */
137     struct any_writer *writer;          /* Output file, or null if none. */
138     struct case_sink *sink;             /* Sink, or null if none. */
139
140     /* Break variables. */
141     struct sort_criteria *sort;         /* Sort criteria. */
142     struct variable **break_vars;       /* Break variables. */
143     size_t break_var_cnt;               /* Number of break variables. */
144     struct ccase break_case;            /* Last values of break variables. */
145
146     enum missing_treatment missing;     /* How to treat missing values. */
147     struct agr_var *agr_vars;           /* First aggregate variable. */
148     struct dictionary *dict;            /* Aggregate dictionary. */
149     const struct dictionary *src_dict;  /* Dict of the source */
150     int case_cnt;                       /* Counts aggregated cases. */
151     struct ccase agr_case;              /* Aggregate case for output. */
152   };
153
154 static void initialize_aggregate_info (struct agr_proc *,
155                                        const struct ccase *);
156
157 /* Prototypes. */
158 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
159                                        struct agr_proc *);
160 static void agr_destroy (struct agr_proc *);
161 static bool aggregate_single_case (struct agr_proc *agr,
162                                    const struct ccase *input,
163                                    struct ccase *output);
164 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
165
166 /* Aggregating to the active file. */
167 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
168
169 /* Aggregating to a system file. */
170 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
171 \f
172 /* Parsing. */
173
174 /* Parses and executes the AGGREGATE procedure. */
175 int
176 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
177 {
178   struct dictionary *dict = dataset_dict (ds);
179   struct agr_proc agr;
180   struct file_handle *out_file = NULL;
181
182   bool copy_documents = false;
183   bool presorted = false;
184   bool saw_direction;
185
186   memset(&agr, 0 , sizeof (agr));
187   agr.missing = ITEMWISE;
188   case_nullify (&agr.break_case);
189   
190   agr.dict = dict_create ();
191   agr.src_dict = dict;
192   dict_set_label (agr.dict, dict_get_label (dict));
193   dict_set_documents (agr.dict, dict_get_documents (dict));
194
195   /* OUTFILE subcommand must be first. */
196   if (!lex_force_match_id (lexer, "OUTFILE"))
197     goto error;
198   lex_match (lexer, '=');
199   if (!lex_match (lexer, '*'))
200     {
201       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
202       if (out_file == NULL)
203         goto error;
204     }
205   
206   /* Read most of the subcommands. */
207   for (;;)
208     {
209       lex_match (lexer, '/');
210       
211       if (lex_match_id (lexer, "MISSING"))
212         {
213           lex_match (lexer, '=');
214           if (!lex_match_id (lexer, "COLUMNWISE"))
215             {
216               lex_error (lexer, _("while expecting COLUMNWISE"));
217               goto error;
218             }
219           agr.missing = COLUMNWISE;
220         }
221       else if (lex_match_id (lexer, "DOCUMENT"))
222         copy_documents = true;
223       else if (lex_match_id (lexer, "PRESORTED"))
224         presorted = true;
225       else if (lex_match_id (lexer, "BREAK"))
226         {
227           int i;
228
229           lex_match (lexer, '=');
230           agr.sort = sort_parse_criteria (lexer, dict,
231                                           &agr.break_vars, &agr.break_var_cnt,
232                                           &saw_direction, NULL);
233           if (agr.sort == NULL)
234             goto error;
235           
236           for (i = 0; i < agr.break_var_cnt; i++)
237             dict_clone_var_assert (agr.dict, agr.break_vars[i],
238                                    agr.break_vars[i]->name);
239
240           /* BREAK must follow the options. */
241           break;
242         }
243       else
244         {
245           lex_error (lexer, _("expecting BREAK"));
246           goto error;
247         }
248     }
249   if (presorted && saw_direction)
250     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
251                "with (A) or (D) has no effect.  Output data will be sorted "
252                "the same way as the input data."));
253       
254   /* Read in the aggregate functions. */
255   lex_match (lexer, '/');
256   if (!parse_aggregate_functions (lexer, dict, &agr))
257     goto error;
258
259   /* Delete documents. */
260   if (!copy_documents)
261     dict_set_documents (agr.dict, NULL);
262
263   /* Cancel SPLIT FILE. */
264   dict_set_split_vars (agr.dict, NULL, 0);
265   
266   /* Initialize. */
267   agr.case_cnt = 0;
268   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
269
270   /* Output to active file or external file? */
271   if (out_file == NULL) 
272     {
273       /* The active file will be replaced by the aggregated data,
274          so TEMPORARY is moot. */
275       proc_cancel_temporary_transformations (ds);
276
277       if (agr.sort != NULL && !presorted) 
278         {
279           if (!sort_active_file_in_place (ds, agr.sort))
280             goto error;
281         }
282
283       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, 
288                                        dict, NULL));
289       if (!procedure (ds, agr_to_active_file, &agr))
290         goto error;
291       if (agr.case_cnt > 0) 
292         {
293           dump_aggregate_info (&agr, &agr.agr_case);
294           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
295             goto error;
296         }
297       discard_variables (ds);
298       dict_destroy (dict);
299       dataset_set_dict (ds, agr.dict);
300       agr.dict = NULL;
301       proc_set_source (ds, 
302                        agr.sink->class->make_source (agr.sink));
303       free_case_sink (agr.sink);
304     }
305   else
306     {
307       agr.writer = any_writer_open (out_file, agr.dict);
308       if (agr.writer == NULL)
309         goto error;
310       
311       if (agr.sort != NULL && !presorted) 
312         {
313           /* Sorting is needed. */
314           struct casefile *dst;
315           struct casereader *reader;
316           struct ccase c;
317           bool ok = true;
318           
319           dst = sort_active_file_to_casefile (ds, agr.sort);
320           if (dst == NULL)
321             goto error;
322           reader = casefile_get_destructive_reader (dst);
323           while (ok && casereader_read_xfer (reader, &c)) 
324             {
325               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
326                 ok = any_writer_write (agr.writer, &agr.agr_case);
327               case_destroy (&c);
328             }
329           casereader_destroy (reader);
330           if (ok)
331             ok = !casefile_error (dst);
332           casefile_destroy (dst);
333           if (!ok)
334             goto error;
335         }
336       else 
337         {
338           /* Active file is already sorted. */
339           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
340             goto error;
341         }
342       
343       if (agr.case_cnt > 0) 
344         {
345           dump_aggregate_info (&agr, &agr.agr_case);
346           any_writer_write (agr.writer, &agr.agr_case);
347         }
348       if (any_writer_error (agr.writer))
349         goto error;
350     }
351   
352   agr_destroy (&agr);
353   return CMD_SUCCESS;
354
355 error:
356   agr_destroy (&agr);
357   return CMD_CASCADING_FAILURE;
358 }
359
360 /* Parse all the aggregate functions. */
361 static bool
362 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
363 {
364   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
365
366   /* Parse everything. */
367   tail = NULL;
368   for (;;)
369     {
370       char **dest;
371       char **dest_label;
372       size_t n_dest;
373       struct string function_name;
374
375       int include_missing;
376       const struct agr_func *function;
377       int func_index;
378
379       union agr_argument arg[2];
380
381       struct variable **src;
382       size_t n_src;
383
384       size_t i;
385
386       dest = NULL;
387       dest_label = NULL;
388       n_dest = 0;
389       src = NULL;
390       function = NULL;
391       n_src = 0;
392       arg[0].c = NULL;
393       arg[1].c = NULL;
394
395       /* Parse the list of target variables. */
396       while (!lex_match (lexer, '='))
397         {
398           size_t n_dest_prev = n_dest;
399           
400           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
401                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
402             goto error;
403
404           /* Assign empty labels. */
405           {
406             int j;
407
408             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
409             for (j = n_dest_prev; j < n_dest; j++)
410               dest_label[j] = NULL;
411           }
412
413
414           
415           if (lex_token (lexer) == T_STRING)
416             {
417               struct string label;
418               ds_init_string (&label, lex_tokstr (lexer));
419
420               ds_truncate (&label, 255);
421               dest_label[n_dest - 1] = ds_xstrdup (&label);
422               lex_get (lexer);
423               ds_destroy (&label);
424             }
425         }
426
427       /* Get the name of the aggregation function. */
428       if (lex_token (lexer) != T_ID)
429         {
430           lex_error (lexer, _("expecting aggregation function"));
431           goto error;
432         }
433
434       include_missing = 0;
435
436       ds_init_string (&function_name, lex_tokstr (lexer));
437
438       ds_chomp (&function_name, '.');
439
440       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
441           include_missing = 1;
442
443       for (function = agr_func_tab; function->name; function++)
444         if (!strcasecmp (function->name, ds_cstr (&function_name)))
445           break;
446       if (NULL == function->name)
447         {
448           msg (SE, _("Unknown aggregation function %s."), 
449                ds_cstr (&function_name));
450           goto error;
451         }
452       ds_destroy (&function_name);
453       func_index = function - agr_func_tab;
454       lex_get (lexer);
455
456       /* Check for leading lparen. */
457       if (!lex_match (lexer, '('))
458         {
459           if (func_index == N)
460             func_index = N_NO_VARS;
461           else if (func_index == NU)
462             func_index = NU_NO_VARS;
463           else
464             {
465               lex_error (lexer, _("expecting `('"));
466               goto error;
467             }
468         }
469       else
470         {
471           /* Parse list of source variables. */
472           {
473             int pv_opts = PV_NO_SCRATCH;
474
475             if (func_index == SUM || func_index == MEAN || func_index == SD)
476               pv_opts |= PV_NUMERIC;
477             else if (function->n_args)
478               pv_opts |= PV_SAME_TYPE;
479
480             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
481               goto error;
482           }
483
484           /* Parse function arguments, for those functions that
485              require arguments. */
486           if (function->n_args != 0)
487             for (i = 0; i < function->n_args; i++)
488               {
489                 int type;
490             
491                 lex_match (lexer, ',');
492                 if (lex_token (lexer) == T_STRING)
493                   {
494                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
495                     type = ALPHA;
496                   }
497                 else if (lex_is_number (lexer))
498                   {
499                     arg[i].f = lex_tokval (lexer);
500                     type = NUMERIC;
501                   } else {
502                     msg (SE, _("Missing argument %d to %s."), i + 1,
503                          function->name);
504                     goto error;
505                   }
506             
507                 lex_get (lexer);
508
509                 if (type != src[0]->type)
510                   {
511                     msg (SE, _("Arguments to %s must be of same type as "
512                                "source variables."),
513                          function->name);
514                     goto error;
515                   }
516               }
517
518           /* Trailing rparen. */
519           if (!lex_match (lexer, ')'))
520             {
521               lex_error (lexer, _("expecting `)'"));
522               goto error;
523             }
524           
525           /* Now check that the number of source variables match
526              the number of target variables.  If we check earlier
527              than this, the user can get very misleading error
528              message, i.e. `AGGREGATE x=SUM(y t).' will get this
529              error message when a proper message would be more
530              like `unknown variable t'. */
531           if (n_src != n_dest)
532             {
533               msg (SE, _("Number of source variables (%u) does not match "
534                          "number of target variables (%u)."),
535                    (unsigned) n_src, (unsigned) n_dest);
536               goto error;
537             }
538
539           if ((func_index == PIN || func_index == POUT
540               || func_index == FIN || func_index == FOUT) 
541               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
542                   || (src[0]->type == ALPHA
543                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
544             {
545               union agr_argument t = arg[0];
546               arg[0] = arg[1];
547               arg[1] = t;
548                   
549               msg (SW, _("The value arguments passed to the %s function "
550                          "are out-of-order.  They will be treated as if "
551                          "they had been specified in the correct order."),
552                    function->name);
553             }
554         }
555         
556       /* Finally add these to the linked list of aggregation
557          variables. */
558       for (i = 0; i < n_dest; i++)
559         {
560           struct agr_var *v = xmalloc (sizeof *v);
561
562           /* Add variable to chain. */
563           if (agr->agr_vars != NULL)
564             tail->next = v;
565           else
566             agr->agr_vars = v;
567           tail = v;
568           tail->next = NULL;
569           v->moments = NULL;
570           
571           /* Create the target variable in the aggregate
572              dictionary. */
573           {
574             struct variable *destvar;
575             
576             v->function = func_index;
577
578             if (src)
579               {
580                 v->src = src[i];
581                 
582                 if (src[i]->type == ALPHA)
583                   {
584                     v->function |= FSTRING;
585                     v->string = xmalloc (src[i]->width);
586                   }
587
588                 if (function->alpha_type == ALPHA)
589                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
590                 else
591                   {
592                     assert (v->src->type == NUMERIC
593                             || function->alpha_type == NUMERIC);
594                     destvar = dict_create_var (agr->dict, dest[i], 0);
595                     if (destvar != NULL) 
596                       {
597                         struct fmt_spec f;
598                         if ((func_index == N || func_index == NMISS)
599                             && dict_get_weight (dict) != NULL)
600                           f = fmt_for_output (FMT_F, 8, 2); 
601                         else
602                           f = function->format;
603                         destvar->print = destvar->write = f;
604                       }
605                   }
606               } else {
607                 struct fmt_spec f;
608                 v->src = NULL;
609                 destvar = dict_create_var (agr->dict, dest[i], 0);
610                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
611                   f = fmt_for_output (FMT_F, 8, 2); 
612                 else
613                   f = function->format;
614                 destvar->print = destvar->write = f;
615               }
616           
617             if (!destvar)
618               {
619                 msg (SE, _("Variable name %s is not unique within the "
620                            "aggregate file dictionary, which contains "
621                            "the aggregate variables and the break "
622                            "variables."),
623                      dest[i]);
624                 goto error;
625               }
626
627             free (dest[i]);
628             if (dest_label[i])
629               {
630                 destvar->label = dest_label[i];
631                 dest_label[i] = NULL;
632               }
633
634             v->dest = destvar;
635           }
636           
637           v->include_missing = include_missing;
638
639           if (v->src != NULL)
640             {
641               int j;
642
643               if (v->src->type == NUMERIC)
644                 for (j = 0; j < function->n_args; j++)
645                   v->arg[j].f = arg[j].f;
646               else
647                 for (j = 0; j < function->n_args; j++)
648                   v->arg[j].c = xstrdup (arg[j].c);
649             }
650         }
651       
652       if (src != NULL && src[0]->type == ALPHA)
653         for (i = 0; i < function->n_args; i++)
654           {
655             free (arg[i].c);
656             arg[i].c = NULL;
657           }
658
659       free (src);
660       free (dest);
661       free (dest_label);
662
663       if (!lex_match (lexer, '/'))
664         {
665           if (lex_token (lexer) == '.')
666             return true;
667
668           lex_error (lexer, "expecting end of command");
669           return false;
670         }
671       continue;
672       
673     error:
674       ds_destroy (&function_name);
675       for (i = 0; i < n_dest; i++)
676         {
677           free (dest[i]);
678           free (dest_label[i]);
679         }
680       free (dest);
681       free (dest_label);
682       free (arg[0].c);
683       free (arg[1].c);
684       if (src && n_src && src[0]->type == ALPHA)
685         for (i = 0; i < function->n_args; i++)
686           {
687             free (arg[i].c);
688             arg[i].c = NULL;
689           }
690       free (src);
691         
692       return false;
693     }
694 }
695
696 /* Destroys AGR. */
697 static void
698 agr_destroy (struct agr_proc *agr)
699 {
700   struct agr_var *iter, *next;
701
702   any_writer_close (agr->writer);
703   if (agr->sort != NULL)
704     sort_destroy_criteria (agr->sort);
705   free (agr->break_vars);
706   case_destroy (&agr->break_case);
707   for (iter = agr->agr_vars; iter; iter = next)
708     {
709       next = iter->next;
710
711       if (iter->function & FSTRING)
712         {
713           size_t n_args;
714           size_t i;
715
716           n_args = agr_func_tab[iter->function & FUNC].n_args;
717           for (i = 0; i < n_args; i++)
718             free (iter->arg[i].c);
719           free (iter->string);
720         }
721       else if (iter->function == SD)
722         moments1_destroy (iter->moments);
723       free (iter);
724     }
725   if (agr->dict != NULL)
726     dict_destroy (agr->dict);
727
728   case_destroy (&agr->agr_case);
729 }
730 \f
731 /* Execution. */
732
733 static void accumulate_aggregate_info (struct agr_proc *,
734                                        const struct ccase *);
735 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
736
737 /* Processes a single case INPUT for aggregation.  If output is
738    warranted, writes it to OUTPUT and returns true.
739    Otherwise, returns false and OUTPUT is unmodified. */
740 static bool
741 aggregate_single_case (struct agr_proc *agr,
742                        const struct ccase *input, struct ccase *output)
743 {
744   bool finished_group = false;
745   
746   if (agr->case_cnt++ == 0)
747     initialize_aggregate_info (agr, input);
748   else if (case_compare (&agr->break_case, input,
749                          agr->break_vars, agr->break_var_cnt))
750     {
751       dump_aggregate_info (agr, output);
752       finished_group = true;
753
754       initialize_aggregate_info (agr, input);
755     }
756
757   accumulate_aggregate_info (agr, input);
758   return finished_group;
759 }
760
761 /* Accumulates aggregation data from the case INPUT. */
762 static void 
763 accumulate_aggregate_info (struct agr_proc *agr,
764                            const struct ccase *input)
765 {
766   struct agr_var *iter;
767   double weight;
768   bool bad_warn = true;
769
770   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
771
772   for (iter = agr->agr_vars; iter; iter = iter->next)
773     if (iter->src)
774       {
775         const union value *v = case_data (input, iter->src->fv);
776
777         if ((!iter->include_missing
778              && mv_is_value_missing (&iter->src->miss, v))
779             || (iter->include_missing && iter->src->type == NUMERIC
780                 && v->f == SYSMIS))
781           {
782             switch (iter->function)
783               {
784               case NMISS:
785               case NMISS | FSTRING:
786                 iter->dbl[0] += weight;
787                 break;
788               case NUMISS:
789               case NUMISS | FSTRING:
790                 iter->int1++;
791                 break;
792               }
793             iter->missing = 1;
794             continue;
795           }
796         
797         /* This is horrible.  There are too many possibilities. */
798         switch (iter->function)
799           {
800           case SUM:
801             iter->dbl[0] += v->f * weight;
802             iter->int1 = 1;
803             break;
804           case MEAN:
805             iter->dbl[0] += v->f * weight;
806             iter->dbl[1] += weight;
807             break;
808           case SD:
809             moments1_add (iter->moments, v->f, weight);
810             break;
811           case MAX:
812             iter->dbl[0] = max (iter->dbl[0], v->f);
813             iter->int1 = 1;
814             break;
815           case MAX | FSTRING:
816             if (memcmp (iter->string, v->s, iter->src->width) < 0)
817               memcpy (iter->string, v->s, iter->src->width);
818             iter->int1 = 1;
819             break;
820           case MIN:
821             iter->dbl[0] = min (iter->dbl[0], v->f);
822             iter->int1 = 1;
823             break;
824           case MIN | FSTRING:
825             if (memcmp (iter->string, v->s, iter->src->width) > 0)
826               memcpy (iter->string, v->s, iter->src->width);
827             iter->int1 = 1;
828             break;
829           case FGT:
830           case PGT:
831             if (v->f > iter->arg[0].f)
832               iter->dbl[0] += weight;
833             iter->dbl[1] += weight;
834             break;
835           case FGT | FSTRING:
836           case PGT | FSTRING:
837             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
838               iter->dbl[0] += weight;
839             iter->dbl[1] += weight;
840             break;
841           case FLT:
842           case PLT:
843             if (v->f < iter->arg[0].f)
844               iter->dbl[0] += weight;
845             iter->dbl[1] += weight;
846             break;
847           case FLT | FSTRING:
848           case PLT | FSTRING:
849             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FIN:
854           case PIN:
855             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
856               iter->dbl[0] += weight;
857             iter->dbl[1] += weight;
858             break;
859           case FIN | FSTRING:
860           case PIN | FSTRING:
861             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
862                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FOUT:
867           case POUT:
868             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FOUT | FSTRING:
873           case POUT | FSTRING:
874             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
875                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
876               iter->dbl[0] += weight;
877             iter->dbl[1] += weight;
878             break;
879           case N:
880           case N | FSTRING:
881             iter->dbl[0] += weight;
882             break;
883           case NU:
884           case NU | FSTRING:
885             iter->int1++;
886             break;
887           case FIRST:
888             if (iter->int1 == 0)
889               {
890                 iter->dbl[0] = v->f;
891                 iter->int1 = 1;
892               }
893             break;
894           case FIRST | FSTRING:
895             if (iter->int1 == 0)
896               {
897                 memcpy (iter->string, v->s, iter->src->width);
898                 iter->int1 = 1;
899               }
900             break;
901           case LAST:
902             iter->dbl[0] = v->f;
903             iter->int1 = 1;
904             break;
905           case LAST | FSTRING:
906             memcpy (iter->string, v->s, iter->src->width);
907             iter->int1 = 1;
908             break;
909           case NMISS:
910           case NMISS | FSTRING:
911           case NUMISS:
912           case NUMISS | FSTRING:
913             /* Our value is not missing or it would have been
914                caught earlier.  Nothing to do. */
915             break;
916           default:
917             NOT_REACHED ();
918           }
919     } else {
920       switch (iter->function)
921         {
922         case N_NO_VARS:
923           iter->dbl[0] += weight;
924           break;
925         case NU_NO_VARS:
926           iter->int1++;
927           break;
928         default:
929           NOT_REACHED ();
930         }
931     }
932 }
933
934 /* We've come to a record that differs from the previous in one or
935    more of the break variables.  Make an output record from the
936    accumulated statistics in the OUTPUT case. */
937 static void 
938 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
939 {
940   {
941     int value_idx = 0;
942     int i;
943
944     for (i = 0; i < agr->break_var_cnt; i++) 
945       {
946         struct variable *v = agr->break_vars[i];
947         memcpy (case_data_rw (output, value_idx),
948                 case_data (&agr->break_case, v->fv),
949                 sizeof (union value) * v->nv);
950         value_idx += v->nv; 
951       }
952   }
953   
954   {
955     struct agr_var *i;
956   
957     for (i = agr->agr_vars; i; i = i->next)
958       {
959         union value *v = case_data_rw (output, i->dest->fv);
960
961         if (agr->missing == COLUMNWISE && i->missing != 0
962             && (i->function & FUNC) != N && (i->function & FUNC) != NU
963             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
964           {
965             if (i->dest->type == ALPHA)
966               memset (v->s, ' ', i->dest->width);
967             else
968               v->f = SYSMIS;
969             continue;
970           }
971         
972         switch (i->function)
973           {
974           case SUM:
975             v->f = i->int1 ? i->dbl[0] : SYSMIS;
976             break;
977           case MEAN:
978             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
979             break;
980           case SD:
981             {
982               double variance;
983
984               /* FIXME: we should use two passes. */
985               moments1_calculate (i->moments, NULL, NULL, &variance,
986                                  NULL, NULL);
987               if (variance != SYSMIS)
988                 v->f = sqrt (variance);
989               else
990                 v->f = SYSMIS; 
991             }
992             break;
993           case MAX:
994           case MIN:
995             v->f = i->int1 ? i->dbl[0] : SYSMIS;
996             break;
997           case MAX | FSTRING:
998           case MIN | FSTRING:
999             if (i->int1)
1000               memcpy (v->s, i->string, i->dest->width);
1001             else
1002               memset (v->s, ' ', i->dest->width);
1003             break;
1004           case FGT:
1005           case FGT | FSTRING:
1006           case FLT:
1007           case FLT | FSTRING:
1008           case FIN:
1009           case FIN | FSTRING:
1010           case FOUT:
1011           case FOUT | FSTRING:
1012             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1013             break;
1014           case PGT:
1015           case PGT | FSTRING:
1016           case PLT:
1017           case PLT | FSTRING:
1018           case PIN:
1019           case PIN | FSTRING:
1020           case POUT:
1021           case POUT | FSTRING:
1022             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1023             break;
1024           case N:
1025           case N | FSTRING:
1026             v->f = i->dbl[0];
1027             break;
1028           case NU:
1029           case NU | FSTRING:
1030             v->f = i->int1;
1031             break;
1032           case FIRST:
1033           case LAST:
1034             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1035             break;
1036           case FIRST | FSTRING:
1037           case LAST | FSTRING:
1038             if (i->int1)
1039               memcpy (v->s, i->string, i->dest->width);
1040             else
1041               memset (v->s, ' ', i->dest->width);
1042             break;
1043           case N_NO_VARS:
1044             v->f = i->dbl[0];
1045             break;
1046           case NU_NO_VARS:
1047             v->f = i->int1;
1048             break;
1049           case NMISS:
1050           case NMISS | FSTRING:
1051             v->f = i->dbl[0];
1052             break;
1053           case NUMISS:
1054           case NUMISS | FSTRING:
1055             v->f = i->int1;
1056             break;
1057           default:
1058             NOT_REACHED ();
1059           }
1060       }
1061   }
1062 }
1063
1064 /* Resets the state for all the aggregate functions. */
1065 static void
1066 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1067 {
1068   struct agr_var *iter;
1069
1070   case_destroy (&agr->break_case);
1071   case_clone (&agr->break_case, input);
1072
1073   for (iter = agr->agr_vars; iter; iter = iter->next)
1074     {
1075       iter->missing = 0;
1076       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1077       iter->int1 = iter->int2 = 0;
1078       switch (iter->function)
1079         {
1080         case MIN:
1081           iter->dbl[0] = DBL_MAX;
1082           break;
1083         case MIN | FSTRING:
1084           memset (iter->string, 255, iter->src->width);
1085           break;
1086         case MAX:
1087           iter->dbl[0] = -DBL_MAX;
1088           break;
1089         case MAX | FSTRING:
1090           memset (iter->string, 0, iter->src->width);
1091           break;
1092         case SD:
1093           if (iter->moments == NULL)
1094             iter->moments = moments1_create (MOMENT_VARIANCE);
1095           else
1096             moments1_clear (iter->moments);
1097           break;
1098         default:
1099           break;
1100         }
1101     }
1102 }
1103 \f
1104 /* Aggregate each case as it comes through.  Cases which aren't needed
1105    are dropped.
1106    Returns true if successful, false if an I/O error occurred. */
1107 static bool
1108 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1109 {
1110   struct agr_proc *agr = agr_;
1111
1112   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1113     return agr->sink->class->write (agr->sink, &agr->agr_case);
1114
1115   return true;
1116 }
1117
1118 /* Aggregate the current case and output it if we passed a
1119    breakpoint. */
1120 static bool
1121 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
1122                           const struct dataset *ds UNUSED) 
1123 {
1124   struct agr_proc *agr = agr_;
1125
1126   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1127     return any_writer_write (agr->writer, &agr->agr_case);
1128   
1129   return true;
1130 }