Added abstract factory to create casefiles. Updated procedures to use
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-writer.h>
24 #include <data/case-sink.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <data/dictionary.h>
28 #include <data/file-handle-def.h>
29 #include <data/format.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     int include_missing;        /* 1=Include user-missing values. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     int missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,          {0, 0, 0}},
104     {"SUM",     0, -1,          {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
106     {"SD",      0, -1,          {FMT_F, 8, 2}},
107     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
108     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
122     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
123     {NULL,      0, -1,          {-1, -1, -1}},
124     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167 \f
168 /* Parsing. */
169
170 /* Parses and executes the AGGREGATE procedure. */
171 int
172 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
173 {
174   struct dictionary *dict = dataset_dict (ds);
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185
186   agr.dict = dict_create ();
187   agr.src_dict = dict;
188   dict_set_label (agr.dict, dict_get_label (dict));
189   dict_set_documents (agr.dict, dict_get_documents (dict));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id (lexer, "OUTFILE"))
193     goto error;
194   lex_match (lexer, '=');
195   if (!lex_match (lexer, '*'))
196     {
197       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match (lexer, '/');
206       
207       if (lex_match_id (lexer, "MISSING"))
208         {
209           lex_match (lexer, '=');
210           if (!lex_match_id (lexer, "COLUMNWISE"))
211             {
212               lex_error (lexer, _("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id (lexer, "DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id (lexer, "PRESORTED"))
220         presorted = true;
221       else if (lex_match_id (lexer, "BREAK"))
222         {
223           int i;
224
225           lex_match (lexer, '=');
226           agr.sort = sort_parse_criteria (lexer, dict,
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    var_get_name (agr.break_vars[i]));
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (lexer, _("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match (lexer, '/');
252   if (!parse_aggregate_functions (lexer, dict, &agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_set_documents (agr.dict, NULL);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       struct ccase *c;
270       
271       /* The active file will be replaced by the aggregated data,
272          so TEMPORARY is moot. */
273       proc_cancel_temporary_transformations (ds);
274
275       if (agr.sort != NULL && !presorted) 
276         {
277           if (!sort_active_file_in_place (ds, agr.sort))
278             goto error;
279         }
280
281       agr.sink = create_case_sink (&storage_sink_class, agr.dict,
282                                    dataset_get_casefile_factory (ds),
283                                    NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, dict,
288                                        dataset_get_casefile_factory (ds),
289                                        NULL));
290       proc_open (ds);
291       while (proc_read (ds, &c))
292         if (aggregate_single_case (&agr, c, &agr.agr_case)) 
293           if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
294             {
295               proc_close (ds);
296               goto error; 
297             }
298       if (!proc_close (ds))
299         goto error;
300
301       if (agr.case_cnt > 0) 
302         {
303           dump_aggregate_info (&agr, &agr.agr_case);
304           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
305             goto error;
306         }
307       discard_variables (ds);
308       dict_destroy (dict);
309       dataset_set_dict (ds, agr.dict);
310       agr.dict = NULL;
311       proc_set_source (ds, agr.sink->class->make_source (agr.sink));
312       free_case_sink (agr.sink);
313     }
314   else
315     {
316       agr.writer = any_writer_open (out_file, agr.dict);
317       if (agr.writer == NULL)
318         goto error;
319       
320       if (agr.sort != NULL && !presorted) 
321         {
322           /* Sorting is needed. */
323           struct casefile *dst;
324           struct casereader *reader;
325           struct ccase c;
326           bool ok = true;
327           
328           dst = sort_active_file_to_casefile (ds, agr.sort);
329           if (dst == NULL)
330             goto error;
331           reader = casefile_get_destructive_reader (dst);
332           while (ok && casereader_read_xfer (reader, &c)) 
333             {
334               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
335                 ok = any_writer_write (agr.writer, &agr.agr_case);
336               case_destroy (&c);
337             }
338           casereader_destroy (reader);
339           if (ok)
340             ok = !casefile_error (dst);
341           casefile_destroy (dst);
342           if (!ok)
343             goto error;
344         }
345       else 
346         {
347           /* Active file is already sorted. */
348           struct ccase *c;
349           
350           proc_open (ds);
351           while (proc_read (ds, &c))
352             if (aggregate_single_case (&agr, c, &agr.agr_case)) 
353               if (!any_writer_write (agr.writer, &agr.agr_case)) 
354                 {
355                   proc_close (ds);
356                   goto error;
357                 }
358           if (!proc_close (ds))
359             goto error;
360         }
361       
362       if (agr.case_cnt > 0) 
363         {
364           dump_aggregate_info (&agr, &agr.agr_case);
365           any_writer_write (agr.writer, &agr.agr_case);
366         }
367       if (any_writer_error (agr.writer))
368         goto error;
369     }
370   
371   agr_destroy (&agr);
372   return CMD_SUCCESS;
373
374 error:
375   agr_destroy (&agr);
376   return CMD_CASCADING_FAILURE;
377 }
378
379 /* Parse all the aggregate functions. */
380 static bool
381 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
382 {
383   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
384
385   /* Parse everything. */
386   tail = NULL;
387   for (;;)
388     {
389       char **dest;
390       char **dest_label;
391       size_t n_dest;
392       struct string function_name;
393
394       int include_missing;
395       const struct agr_func *function;
396       int func_index;
397
398       union agr_argument arg[2];
399
400       struct variable **src;
401       size_t n_src;
402
403       size_t i;
404
405       dest = NULL;
406       dest_label = NULL;
407       n_dest = 0;
408       src = NULL;
409       function = NULL;
410       n_src = 0;
411       arg[0].c = NULL;
412       arg[1].c = NULL;
413
414       /* Parse the list of target variables. */
415       while (!lex_match (lexer, '='))
416         {
417           size_t n_dest_prev = n_dest;
418           
419           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
420                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
421             goto error;
422
423           /* Assign empty labels. */
424           {
425             int j;
426
427             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
428             for (j = n_dest_prev; j < n_dest; j++)
429               dest_label[j] = NULL;
430           }
431
432
433           
434           if (lex_token (lexer) == T_STRING)
435             {
436               struct string label;
437               ds_init_string (&label, lex_tokstr (lexer));
438
439               ds_truncate (&label, 255);
440               dest_label[n_dest - 1] = ds_xstrdup (&label);
441               lex_get (lexer);
442               ds_destroy (&label);
443             }
444         }
445
446       /* Get the name of the aggregation function. */
447       if (lex_token (lexer) != T_ID)
448         {
449           lex_error (lexer, _("expecting aggregation function"));
450           goto error;
451         }
452
453       include_missing = 0;
454
455       ds_init_string (&function_name, lex_tokstr (lexer));
456
457       ds_chomp (&function_name, '.');
458
459       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
460           include_missing = 1;
461
462       for (function = agr_func_tab; function->name; function++)
463         if (!strcasecmp (function->name, ds_cstr (&function_name)))
464           break;
465       if (NULL == function->name)
466         {
467           msg (SE, _("Unknown aggregation function %s."), 
468                ds_cstr (&function_name));
469           goto error;
470         }
471       ds_destroy (&function_name);
472       func_index = function - agr_func_tab;
473       lex_get (lexer);
474
475       /* Check for leading lparen. */
476       if (!lex_match (lexer, '('))
477         {
478           if (func_index == N)
479             func_index = N_NO_VARS;
480           else if (func_index == NU)
481             func_index = NU_NO_VARS;
482           else
483             {
484               lex_error (lexer, _("expecting `('"));
485               goto error;
486             }
487         }
488       else
489         {
490           /* Parse list of source variables. */
491           {
492             int pv_opts = PV_NO_SCRATCH;
493
494             if (func_index == SUM || func_index == MEAN || func_index == SD)
495               pv_opts |= PV_NUMERIC;
496             else if (function->n_args)
497               pv_opts |= PV_SAME_TYPE;
498
499             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
500               goto error;
501           }
502
503           /* Parse function arguments, for those functions that
504              require arguments. */
505           if (function->n_args != 0)
506             for (i = 0; i < function->n_args; i++)
507               {
508                 int type;
509             
510                 lex_match (lexer, ',');
511                 if (lex_token (lexer) == T_STRING)
512                   {
513                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
514                     type = VAR_STRING;
515                   }
516                 else if (lex_is_number (lexer))
517                   {
518                     arg[i].f = lex_tokval (lexer);
519                     type = VAR_NUMERIC;
520                   }
521                 else
522                   {
523                     msg (SE, _("Missing argument %d to %s."), i + 1,
524                          function->name);
525                     goto error;
526                   }
527             
528                 lex_get (lexer);
529
530                 if (type != var_get_type (src[0]))
531                   {
532                     msg (SE, _("Arguments to %s must be of same type as "
533                                "source variables."),
534                          function->name);
535                     goto error;
536                   }
537               }
538
539           /* Trailing rparen. */
540           if (!lex_match (lexer, ')'))
541             {
542               lex_error (lexer, _("expecting `)'"));
543               goto error;
544             }
545           
546           /* Now check that the number of source variables match
547              the number of target variables.  If we check earlier
548              than this, the user can get very misleading error
549              message, i.e. `AGGREGATE x=SUM(y t).' will get this
550              error message when a proper message would be more
551              like `unknown variable t'. */
552           if (n_src != n_dest)
553             {
554               msg (SE, _("Number of source variables (%u) does not match "
555                          "number of target variables (%u)."),
556                    (unsigned) n_src, (unsigned) n_dest);
557               goto error;
558             }
559
560           if ((func_index == PIN || func_index == POUT
561               || func_index == FIN || func_index == FOUT) 
562               && (var_is_numeric (src[0])
563                   ? arg[0].f > arg[1].f
564                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
565             {
566               union agr_argument t = arg[0];
567               arg[0] = arg[1];
568               arg[1] = t;
569                   
570               msg (SW, _("The value arguments passed to the %s function "
571                          "are out-of-order.  They will be treated as if "
572                          "they had been specified in the correct order."),
573                    function->name);
574             }
575         }
576         
577       /* Finally add these to the linked list of aggregation
578          variables. */
579       for (i = 0; i < n_dest; i++)
580         {
581           struct agr_var *v = xmalloc (sizeof *v);
582
583           /* Add variable to chain. */
584           if (agr->agr_vars != NULL)
585             tail->next = v;
586           else
587             agr->agr_vars = v;
588           tail = v;
589           tail->next = NULL;
590           v->moments = NULL;
591           
592           /* Create the target variable in the aggregate
593              dictionary. */
594           {
595             struct variable *destvar;
596             
597             v->function = func_index;
598
599             if (src)
600               {
601                 v->src = src[i];
602                 
603                 if (var_is_alpha (src[i]))
604                   {
605                     v->function |= FSTRING;
606                     v->string = xmalloc (var_get_width (src[i]));
607                   }
608
609                 if (function->alpha_type == VAR_STRING)
610                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
611                 else
612                   {
613                     assert (var_is_numeric (v->src)
614                             || function->alpha_type == VAR_NUMERIC);
615                     destvar = dict_create_var (agr->dict, dest[i], 0);
616                     if (destvar != NULL) 
617                       {
618                         struct fmt_spec f;
619                         if ((func_index == N || func_index == NMISS)
620                             && dict_get_weight (dict) != NULL)
621                           f = fmt_for_output (FMT_F, 8, 2); 
622                         else
623                           f = function->format;
624                         var_set_both_formats (destvar, &f);
625                       }
626                   }
627               } else {
628                 struct fmt_spec f;
629                 v->src = NULL;
630                 destvar = dict_create_var (agr->dict, dest[i], 0);
631                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
632                   f = fmt_for_output (FMT_F, 8, 2); 
633                 else
634                   f = function->format;
635                 var_set_both_formats (destvar, &f);
636               }
637           
638             if (!destvar)
639               {
640                 msg (SE, _("Variable name %s is not unique within the "
641                            "aggregate file dictionary, which contains "
642                            "the aggregate variables and the break "
643                            "variables."),
644                      dest[i]);
645                 goto error;
646               }
647
648             free (dest[i]);
649             if (dest_label[i])
650               var_set_label (destvar, dest_label[i]);
651
652             v->dest = destvar;
653           }
654           
655           v->include_missing = include_missing;
656
657           if (v->src != NULL)
658             {
659               int j;
660
661               if (var_is_numeric (v->src))
662                 for (j = 0; j < function->n_args; j++)
663                   v->arg[j].f = arg[j].f;
664               else
665                 for (j = 0; j < function->n_args; j++)
666                   v->arg[j].c = xstrdup (arg[j].c);
667             }
668         }
669       
670       if (src != NULL && var_is_alpha (src[0]))
671         for (i = 0; i < function->n_args; i++)
672           {
673             free (arg[i].c);
674             arg[i].c = NULL;
675           }
676
677       free (src);
678       free (dest);
679       free (dest_label);
680
681       if (!lex_match (lexer, '/'))
682         {
683           if (lex_token (lexer) == '.')
684             return true;
685
686           lex_error (lexer, "expecting end of command");
687           return false;
688         }
689       continue;
690       
691     error:
692       ds_destroy (&function_name);
693       for (i = 0; i < n_dest; i++)
694         {
695           free (dest[i]);
696           free (dest_label[i]);
697         }
698       free (dest);
699       free (dest_label);
700       free (arg[0].c);
701       free (arg[1].c);
702       if (src && n_src && var_is_alpha (src[0]))
703         for (i = 0; i < function->n_args; i++)
704           {
705             free (arg[i].c);
706             arg[i].c = NULL;
707           }
708       free (src);
709         
710       return false;
711     }
712 }
713
714 /* Destroys AGR. */
715 static void
716 agr_destroy (struct agr_proc *agr)
717 {
718   struct agr_var *iter, *next;
719
720   any_writer_close (agr->writer);
721   if (agr->sort != NULL)
722     sort_destroy_criteria (agr->sort);
723   free (agr->break_vars);
724   case_destroy (&agr->break_case);
725   for (iter = agr->agr_vars; iter; iter = next)
726     {
727       next = iter->next;
728
729       if (iter->function & FSTRING)
730         {
731           size_t n_args;
732           size_t i;
733
734           n_args = agr_func_tab[iter->function & FUNC].n_args;
735           for (i = 0; i < n_args; i++)
736             free (iter->arg[i].c);
737           free (iter->string);
738         }
739       else if (iter->function == SD)
740         moments1_destroy (iter->moments);
741       free (iter);
742     }
743   if (agr->dict != NULL)
744     dict_destroy (agr->dict);
745
746   case_destroy (&agr->agr_case);
747 }
748 \f
749 /* Execution. */
750
751 static void accumulate_aggregate_info (struct agr_proc *,
752                                        const struct ccase *);
753 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
754
755 /* Processes a single case INPUT for aggregation.  If output is
756    warranted, writes it to OUTPUT and returns true.
757    Otherwise, returns false and OUTPUT is unmodified. */
758 static bool
759 aggregate_single_case (struct agr_proc *agr,
760                        const struct ccase *input, struct ccase *output)
761 {
762   bool finished_group = false;
763   
764   if (agr->case_cnt++ == 0)
765     initialize_aggregate_info (agr, input);
766   else if (case_compare (&agr->break_case, input,
767                          agr->break_vars, agr->break_var_cnt))
768     {
769       dump_aggregate_info (agr, output);
770       finished_group = true;
771
772       initialize_aggregate_info (agr, input);
773     }
774
775   accumulate_aggregate_info (agr, input);
776   return finished_group;
777 }
778
779 /* Accumulates aggregation data from the case INPUT. */
780 static void 
781 accumulate_aggregate_info (struct agr_proc *agr,
782                            const struct ccase *input)
783 {
784   struct agr_var *iter;
785   double weight;
786   bool bad_warn = true;
787
788   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
789
790   for (iter = agr->agr_vars; iter; iter = iter->next)
791     if (iter->src)
792       {
793         const union value *v = case_data (input, iter->src);
794         int src_width = var_get_width (iter->src);
795
796         if (iter->include_missing
797             ? var_is_numeric (iter->src) && v->f == SYSMIS
798             : var_is_value_missing (iter->src, v))
799           {
800             switch (iter->function)
801               {
802               case NMISS:
803               case NMISS | FSTRING:
804                 iter->dbl[0] += weight;
805                 break;
806               case NUMISS:
807               case NUMISS | FSTRING:
808                 iter->int1++;
809                 break;
810               }
811             iter->missing = 1;
812             continue;
813           }
814         
815         /* This is horrible.  There are too many possibilities. */
816         switch (iter->function)
817           {
818           case SUM:
819             iter->dbl[0] += v->f * weight;
820             iter->int1 = 1;
821             break;
822           case MEAN:
823             iter->dbl[0] += v->f * weight;
824             iter->dbl[1] += weight;
825             break;
826           case SD:
827             moments1_add (iter->moments, v->f, weight);
828             break;
829           case MAX:
830             iter->dbl[0] = MAX (iter->dbl[0], v->f);
831             iter->int1 = 1;
832             break;
833           case MAX | FSTRING:
834             if (memcmp (iter->string, v->s, src_width) < 0)
835               memcpy (iter->string, v->s, src_width);
836             iter->int1 = 1;
837             break;
838           case MIN:
839             iter->dbl[0] = MIN (iter->dbl[0], v->f);
840             iter->int1 = 1;
841             break;
842           case MIN | FSTRING:
843             if (memcmp (iter->string, v->s, src_width) > 0)
844               memcpy (iter->string, v->s, src_width);
845             iter->int1 = 1;
846             break;
847           case FGT:
848           case PGT:
849             if (v->f > iter->arg[0].f)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FGT | FSTRING:
854           case PGT | FSTRING:
855             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
856               iter->dbl[0] += weight;
857             iter->dbl[1] += weight;
858             break;
859           case FLT:
860           case PLT:
861             if (v->f < iter->arg[0].f)
862               iter->dbl[0] += weight;
863             iter->dbl[1] += weight;
864             break;
865           case FLT | FSTRING:
866           case PLT | FSTRING:
867             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
868               iter->dbl[0] += weight;
869             iter->dbl[1] += weight;
870             break;
871           case FIN:
872           case PIN:
873             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
874               iter->dbl[0] += weight;
875             iter->dbl[1] += weight;
876             break;
877           case FIN | FSTRING:
878           case PIN | FSTRING:
879             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
880                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FOUT:
885           case POUT:
886             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
887               iter->dbl[0] += weight;
888             iter->dbl[1] += weight;
889             break;
890           case FOUT | FSTRING:
891           case POUT | FSTRING:
892             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
893                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
894               iter->dbl[0] += weight;
895             iter->dbl[1] += weight;
896             break;
897           case N:
898           case N | FSTRING:
899             iter->dbl[0] += weight;
900             break;
901           case NU:
902           case NU | FSTRING:
903             iter->int1++;
904             break;
905           case FIRST:
906             if (iter->int1 == 0)
907               {
908                 iter->dbl[0] = v->f;
909                 iter->int1 = 1;
910               }
911             break;
912           case FIRST | FSTRING:
913             if (iter->int1 == 0)
914               {
915                 memcpy (iter->string, v->s, src_width);
916                 iter->int1 = 1;
917               }
918             break;
919           case LAST:
920             iter->dbl[0] = v->f;
921             iter->int1 = 1;
922             break;
923           case LAST | FSTRING:
924             memcpy (iter->string, v->s, src_width);
925             iter->int1 = 1;
926             break;
927           case NMISS:
928           case NMISS | FSTRING:
929           case NUMISS:
930           case NUMISS | FSTRING:
931             /* Our value is not missing or it would have been
932                caught earlier.  Nothing to do. */
933             break;
934           default:
935             NOT_REACHED ();
936           }
937     } else {
938       switch (iter->function)
939         {
940         case N_NO_VARS:
941           iter->dbl[0] += weight;
942           break;
943         case NU_NO_VARS:
944           iter->int1++;
945           break;
946         default:
947           NOT_REACHED ();
948         }
949     }
950 }
951
952 /* We've come to a record that differs from the previous in one or
953    more of the break variables.  Make an output record from the
954    accumulated statistics in the OUTPUT case. */
955 static void 
956 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
957 {
958   {
959     int value_idx = 0;
960     int i;
961
962     for (i = 0; i < agr->break_var_cnt; i++) 
963       {
964         struct variable *v = agr->break_vars[i];
965         size_t value_cnt = var_get_value_cnt (v);
966         memcpy (case_data_rw_idx (output, value_idx),
967                 case_data (&agr->break_case, v),
968                 sizeof (union value) * value_cnt);
969         value_idx += value_cnt; 
970       }
971   }
972   
973   {
974     struct agr_var *i;
975   
976     for (i = agr->agr_vars; i; i = i->next)
977       {
978         union value *v = case_data_rw (output, i->dest);
979
980         if (agr->missing == COLUMNWISE && i->missing != 0
981             && (i->function & FUNC) != N && (i->function & FUNC) != NU
982             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
983           {
984             if (var_is_alpha (i->dest))
985               memset (v->s, ' ', var_get_width (i->dest));
986             else
987               v->f = SYSMIS;
988             continue;
989           }
990         
991         switch (i->function)
992           {
993           case SUM:
994             v->f = i->int1 ? i->dbl[0] : SYSMIS;
995             break;
996           case MEAN:
997             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
998             break;
999           case SD:
1000             {
1001               double variance;
1002
1003               /* FIXME: we should use two passes. */
1004               moments1_calculate (i->moments, NULL, NULL, &variance,
1005                                  NULL, NULL);
1006               if (variance != SYSMIS)
1007                 v->f = sqrt (variance);
1008               else
1009                 v->f = SYSMIS; 
1010             }
1011             break;
1012           case MAX:
1013           case MIN:
1014             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1015             break;
1016           case MAX | FSTRING:
1017           case MIN | FSTRING:
1018             if (i->int1)
1019               memcpy (v->s, i->string, var_get_width (i->dest));
1020             else
1021               memset (v->s, ' ', var_get_width (i->dest));
1022             break;
1023           case FGT:
1024           case FGT | FSTRING:
1025           case FLT:
1026           case FLT | FSTRING:
1027           case FIN:
1028           case FIN | FSTRING:
1029           case FOUT:
1030           case FOUT | FSTRING:
1031             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1032             break;
1033           case PGT:
1034           case PGT | FSTRING:
1035           case PLT:
1036           case PLT | FSTRING:
1037           case PIN:
1038           case PIN | FSTRING:
1039           case POUT:
1040           case POUT | FSTRING:
1041             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1042             break;
1043           case N:
1044           case N | FSTRING:
1045             v->f = i->dbl[0];
1046             break;
1047           case NU:
1048           case NU | FSTRING:
1049             v->f = i->int1;
1050             break;
1051           case FIRST:
1052           case LAST:
1053             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1054             break;
1055           case FIRST | FSTRING:
1056           case LAST | FSTRING:
1057             if (i->int1)
1058               memcpy (v->s, i->string, var_get_width (i->dest));
1059             else
1060               memset (v->s, ' ', var_get_width (i->dest));
1061             break;
1062           case N_NO_VARS:
1063             v->f = i->dbl[0];
1064             break;
1065           case NU_NO_VARS:
1066             v->f = i->int1;
1067             break;
1068           case NMISS:
1069           case NMISS | FSTRING:
1070             v->f = i->dbl[0];
1071             break;
1072           case NUMISS:
1073           case NUMISS | FSTRING:
1074             v->f = i->int1;
1075             break;
1076           default:
1077             NOT_REACHED ();
1078           }
1079       }
1080   }
1081 }
1082
1083 /* Resets the state for all the aggregate functions. */
1084 static void
1085 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1086 {
1087   struct agr_var *iter;
1088
1089   case_destroy (&agr->break_case);
1090   case_clone (&agr->break_case, input);
1091
1092   for (iter = agr->agr_vars; iter; iter = iter->next)
1093     {
1094       iter->missing = 0;
1095       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1096       iter->int1 = iter->int2 = 0;
1097       switch (iter->function)
1098         {
1099         case MIN:
1100           iter->dbl[0] = DBL_MAX;
1101           break;
1102         case MIN | FSTRING:
1103           memset (iter->string, 255, var_get_width (iter->src));
1104           break;
1105         case MAX:
1106           iter->dbl[0] = -DBL_MAX;
1107           break;
1108         case MAX | FSTRING:
1109           memset (iter->string, 0, var_get_width (iter->src));
1110           break;
1111         case SD:
1112           if (iter->moments == NULL)
1113             iter->moments = moments1_create (MOMENT_VARIANCE);
1114           else
1115             moments1_clear (iter->moments);
1116           break;
1117         default:
1118           break;
1119         }
1120     }
1121 }