First step in making struct variable opaque: the boring mechanical
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     int include_missing;        /* 1=Include user-missing values. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     int missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     int alpha_type;             /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,      {0, 0, 0}},
104     {"SUM",     0, -1,      {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
106     {"SD",      0, -1,      {FMT_F, 8, 2}},
107     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
108     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
109     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
122     {"LAST",    0, ALPHA,   {-1, -1, -1}},
123     {NULL,      0, -1,      {-1, -1, -1}},
124     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167
168 /* Aggregating to the active file. */
169 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
170
171 /* Aggregating to a system file. */
172 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
173 \f
174 /* Parsing. */
175
176 /* Parses and executes the AGGREGATE procedure. */
177 int
178 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
179 {
180   struct dictionary *dict = dataset_dict (ds);
181   struct agr_proc agr;
182   struct file_handle *out_file = NULL;
183
184   bool copy_documents = false;
185   bool presorted = false;
186   bool saw_direction;
187
188   memset(&agr, 0 , sizeof (agr));
189   agr.missing = ITEMWISE;
190   case_nullify (&agr.break_case);
191   
192   agr.dict = dict_create ();
193   agr.src_dict = dict;
194   dict_set_label (agr.dict, dict_get_label (dict));
195   dict_set_documents (agr.dict, dict_get_documents (dict));
196
197   /* OUTFILE subcommand must be first. */
198   if (!lex_force_match_id (lexer, "OUTFILE"))
199     goto error;
200   lex_match (lexer, '=');
201   if (!lex_match (lexer, '*'))
202     {
203       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
204       if (out_file == NULL)
205         goto error;
206     }
207   
208   /* Read most of the subcommands. */
209   for (;;)
210     {
211       lex_match (lexer, '/');
212       
213       if (lex_match_id (lexer, "MISSING"))
214         {
215           lex_match (lexer, '=');
216           if (!lex_match_id (lexer, "COLUMNWISE"))
217             {
218               lex_error (lexer, _("while expecting COLUMNWISE"));
219               goto error;
220             }
221           agr.missing = COLUMNWISE;
222         }
223       else if (lex_match_id (lexer, "DOCUMENT"))
224         copy_documents = true;
225       else if (lex_match_id (lexer, "PRESORTED"))
226         presorted = true;
227       else if (lex_match_id (lexer, "BREAK"))
228         {
229           int i;
230
231           lex_match (lexer, '=');
232           agr.sort = sort_parse_criteria (lexer, dict,
233                                           &agr.break_vars, &agr.break_var_cnt,
234                                           &saw_direction, NULL);
235           if (agr.sort == NULL)
236             goto error;
237           
238           for (i = 0; i < agr.break_var_cnt; i++)
239             dict_clone_var_assert (agr.dict, agr.break_vars[i],
240                                    var_get_name (agr.break_vars[i]));
241
242           /* BREAK must follow the options. */
243           break;
244         }
245       else
246         {
247           lex_error (lexer, _("expecting BREAK"));
248           goto error;
249         }
250     }
251   if (presorted && saw_direction)
252     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
253                "with (A) or (D) has no effect.  Output data will be sorted "
254                "the same way as the input data."));
255       
256   /* Read in the aggregate functions. */
257   lex_match (lexer, '/');
258   if (!parse_aggregate_functions (lexer, dict, &agr))
259     goto error;
260
261   /* Delete documents. */
262   if (!copy_documents)
263     dict_set_documents (agr.dict, NULL);
264
265   /* Cancel SPLIT FILE. */
266   dict_set_split_vars (agr.dict, NULL, 0);
267   
268   /* Initialize. */
269   agr.case_cnt = 0;
270   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
271
272   /* Output to active file or external file? */
273   if (out_file == NULL) 
274     {
275       /* The active file will be replaced by the aggregated data,
276          so TEMPORARY is moot. */
277       proc_cancel_temporary_transformations (ds);
278
279       if (agr.sort != NULL && !presorted) 
280         {
281           if (!sort_active_file_in_place (ds, agr.sort))
282             goto error;
283         }
284
285       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
286       if (agr.sink->class->open != NULL)
287         agr.sink->class->open (agr.sink);
288       proc_set_sink (ds, 
289                      create_case_sink (&null_sink_class, 
290                                        dict, NULL));
291       if (!procedure (ds, agr_to_active_file, &agr))
292         goto error;
293       if (agr.case_cnt > 0) 
294         {
295           dump_aggregate_info (&agr, &agr.agr_case);
296           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
297             goto error;
298         }
299       discard_variables (ds);
300       dict_destroy (dict);
301       dataset_set_dict (ds, agr.dict);
302       agr.dict = NULL;
303       proc_set_source (ds, 
304                        agr.sink->class->make_source (agr.sink));
305       free_case_sink (agr.sink);
306     }
307   else
308     {
309       agr.writer = any_writer_open (out_file, agr.dict);
310       if (agr.writer == NULL)
311         goto error;
312       
313       if (agr.sort != NULL && !presorted) 
314         {
315           /* Sorting is needed. */
316           struct casefile *dst;
317           struct casereader *reader;
318           struct ccase c;
319           bool ok = true;
320           
321           dst = sort_active_file_to_casefile (ds, agr.sort);
322           if (dst == NULL)
323             goto error;
324           reader = casefile_get_destructive_reader (dst);
325           while (ok && casereader_read_xfer (reader, &c)) 
326             {
327               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
328                 ok = any_writer_write (agr.writer, &agr.agr_case);
329               case_destroy (&c);
330             }
331           casereader_destroy (reader);
332           if (ok)
333             ok = !casefile_error (dst);
334           casefile_destroy (dst);
335           if (!ok)
336             goto error;
337         }
338       else 
339         {
340           /* Active file is already sorted. */
341           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
342             goto error;
343         }
344       
345       if (agr.case_cnt > 0) 
346         {
347           dump_aggregate_info (&agr, &agr.agr_case);
348           any_writer_write (agr.writer, &agr.agr_case);
349         }
350       if (any_writer_error (agr.writer))
351         goto error;
352     }
353   
354   agr_destroy (&agr);
355   return CMD_SUCCESS;
356
357 error:
358   agr_destroy (&agr);
359   return CMD_CASCADING_FAILURE;
360 }
361
362 /* Parse all the aggregate functions. */
363 static bool
364 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
365 {
366   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
367
368   /* Parse everything. */
369   tail = NULL;
370   for (;;)
371     {
372       char **dest;
373       char **dest_label;
374       size_t n_dest;
375       struct string function_name;
376
377       int include_missing;
378       const struct agr_func *function;
379       int func_index;
380
381       union agr_argument arg[2];
382
383       struct variable **src;
384       size_t n_src;
385
386       size_t i;
387
388       dest = NULL;
389       dest_label = NULL;
390       n_dest = 0;
391       src = NULL;
392       function = NULL;
393       n_src = 0;
394       arg[0].c = NULL;
395       arg[1].c = NULL;
396
397       /* Parse the list of target variables. */
398       while (!lex_match (lexer, '='))
399         {
400           size_t n_dest_prev = n_dest;
401           
402           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
403                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
404             goto error;
405
406           /* Assign empty labels. */
407           {
408             int j;
409
410             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
411             for (j = n_dest_prev; j < n_dest; j++)
412               dest_label[j] = NULL;
413           }
414
415
416           
417           if (lex_token (lexer) == T_STRING)
418             {
419               struct string label;
420               ds_init_string (&label, lex_tokstr (lexer));
421
422               ds_truncate (&label, 255);
423               dest_label[n_dest - 1] = ds_xstrdup (&label);
424               lex_get (lexer);
425               ds_destroy (&label);
426             }
427         }
428
429       /* Get the name of the aggregation function. */
430       if (lex_token (lexer) != T_ID)
431         {
432           lex_error (lexer, _("expecting aggregation function"));
433           goto error;
434         }
435
436       include_missing = 0;
437
438       ds_init_string (&function_name, lex_tokstr (lexer));
439
440       ds_chomp (&function_name, '.');
441
442       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
443           include_missing = 1;
444
445       for (function = agr_func_tab; function->name; function++)
446         if (!strcasecmp (function->name, ds_cstr (&function_name)))
447           break;
448       if (NULL == function->name)
449         {
450           msg (SE, _("Unknown aggregation function %s."), 
451                ds_cstr (&function_name));
452           goto error;
453         }
454       ds_destroy (&function_name);
455       func_index = function - agr_func_tab;
456       lex_get (lexer);
457
458       /* Check for leading lparen. */
459       if (!lex_match (lexer, '('))
460         {
461           if (func_index == N)
462             func_index = N_NO_VARS;
463           else if (func_index == NU)
464             func_index = NU_NO_VARS;
465           else
466             {
467               lex_error (lexer, _("expecting `('"));
468               goto error;
469             }
470         }
471       else
472         {
473           /* Parse list of source variables. */
474           {
475             int pv_opts = PV_NO_SCRATCH;
476
477             if (func_index == SUM || func_index == MEAN || func_index == SD)
478               pv_opts |= PV_NUMERIC;
479             else if (function->n_args)
480               pv_opts |= PV_SAME_TYPE;
481
482             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
483               goto error;
484           }
485
486           /* Parse function arguments, for those functions that
487              require arguments. */
488           if (function->n_args != 0)
489             for (i = 0; i < function->n_args; i++)
490               {
491                 int type;
492             
493                 lex_match (lexer, ',');
494                 if (lex_token (lexer) == T_STRING)
495                   {
496                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
497                     type = ALPHA;
498                   }
499                 else if (lex_is_number (lexer))
500                   {
501                     arg[i].f = lex_tokval (lexer);
502                     type = NUMERIC;
503                   } else {
504                     msg (SE, _("Missing argument %d to %s."), i + 1,
505                          function->name);
506                     goto error;
507                   }
508             
509                 lex_get (lexer);
510
511                 if (type != var_get_type (src[0]))
512                   {
513                     msg (SE, _("Arguments to %s must be of same type as "
514                                "source variables."),
515                          function->name);
516                     goto error;
517                   }
518               }
519
520           /* Trailing rparen. */
521           if (!lex_match (lexer, ')'))
522             {
523               lex_error (lexer, _("expecting `)'"));
524               goto error;
525             }
526           
527           /* Now check that the number of source variables match
528              the number of target variables.  If we check earlier
529              than this, the user can get very misleading error
530              message, i.e. `AGGREGATE x=SUM(y t).' will get this
531              error message when a proper message would be more
532              like `unknown variable t'. */
533           if (n_src != n_dest)
534             {
535               msg (SE, _("Number of source variables (%u) does not match "
536                          "number of target variables (%u)."),
537                    (unsigned) n_src, (unsigned) n_dest);
538               goto error;
539             }
540
541           if ((func_index == PIN || func_index == POUT
542               || func_index == FIN || func_index == FOUT) 
543               && (var_is_numeric (src[0])
544                   ? arg[0].f > arg[1].f
545                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
546             {
547               union agr_argument t = arg[0];
548               arg[0] = arg[1];
549               arg[1] = t;
550                   
551               msg (SW, _("The value arguments passed to the %s function "
552                          "are out-of-order.  They will be treated as if "
553                          "they had been specified in the correct order."),
554                    function->name);
555             }
556         }
557         
558       /* Finally add these to the linked list of aggregation
559          variables. */
560       for (i = 0; i < n_dest; i++)
561         {
562           struct agr_var *v = xmalloc (sizeof *v);
563
564           /* Add variable to chain. */
565           if (agr->agr_vars != NULL)
566             tail->next = v;
567           else
568             agr->agr_vars = v;
569           tail = v;
570           tail->next = NULL;
571           v->moments = NULL;
572           
573           /* Create the target variable in the aggregate
574              dictionary. */
575           {
576             struct variable *destvar;
577             
578             v->function = func_index;
579
580             if (src)
581               {
582                 v->src = src[i];
583                 
584                 if (var_is_alpha (src[i]))
585                   {
586                     v->function |= FSTRING;
587                     v->string = xmalloc (var_get_width (src[i]));
588                   }
589
590                 if (function->alpha_type == ALPHA)
591                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
592                 else
593                   {
594                     assert (var_is_numeric (v->src)
595                             || function->alpha_type == NUMERIC);
596                     destvar = dict_create_var (agr->dict, dest[i], 0);
597                     if (destvar != NULL) 
598                       {
599                         struct fmt_spec f;
600                         if ((func_index == N || func_index == NMISS)
601                             && dict_get_weight (dict) != NULL)
602                           f = fmt_for_output (FMT_F, 8, 2); 
603                         else
604                           f = function->format;
605                         var_set_both_formats (destvar, &f);
606                       }
607                   }
608               } else {
609                 struct fmt_spec f;
610                 v->src = NULL;
611                 destvar = dict_create_var (agr->dict, dest[i], 0);
612                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
613                   f = fmt_for_output (FMT_F, 8, 2); 
614                 else
615                   f = function->format;
616                 var_set_both_formats (destvar, &f);
617               }
618           
619             if (!destvar)
620               {
621                 msg (SE, _("Variable name %s is not unique within the "
622                            "aggregate file dictionary, which contains "
623                            "the aggregate variables and the break "
624                            "variables."),
625                      dest[i]);
626                 goto error;
627               }
628
629             free (dest[i]);
630             if (dest_label[i])
631               var_set_label (destvar, dest_label[i]);
632
633             v->dest = destvar;
634           }
635           
636           v->include_missing = include_missing;
637
638           if (v->src != NULL)
639             {
640               int j;
641
642               if (var_is_numeric (v->src))
643                 for (j = 0; j < function->n_args; j++)
644                   v->arg[j].f = arg[j].f;
645               else
646                 for (j = 0; j < function->n_args; j++)
647                   v->arg[j].c = xstrdup (arg[j].c);
648             }
649         }
650       
651       if (src != NULL && var_is_alpha (src[0]))
652         for (i = 0; i < function->n_args; i++)
653           {
654             free (arg[i].c);
655             arg[i].c = NULL;
656           }
657
658       free (src);
659       free (dest);
660       free (dest_label);
661
662       if (!lex_match (lexer, '/'))
663         {
664           if (lex_token (lexer) == '.')
665             return true;
666
667           lex_error (lexer, "expecting end of command");
668           return false;
669         }
670       continue;
671       
672     error:
673       ds_destroy (&function_name);
674       for (i = 0; i < n_dest; i++)
675         {
676           free (dest[i]);
677           free (dest_label[i]);
678         }
679       free (dest);
680       free (dest_label);
681       free (arg[0].c);
682       free (arg[1].c);
683       if (src && n_src && var_is_alpha (src[0]))
684         for (i = 0; i < function->n_args; i++)
685           {
686             free (arg[i].c);
687             arg[i].c = NULL;
688           }
689       free (src);
690         
691       return false;
692     }
693 }
694
695 /* Destroys AGR. */
696 static void
697 agr_destroy (struct agr_proc *agr)
698 {
699   struct agr_var *iter, *next;
700
701   any_writer_close (agr->writer);
702   if (agr->sort != NULL)
703     sort_destroy_criteria (agr->sort);
704   free (agr->break_vars);
705   case_destroy (&agr->break_case);
706   for (iter = agr->agr_vars; iter; iter = next)
707     {
708       next = iter->next;
709
710       if (iter->function & FSTRING)
711         {
712           size_t n_args;
713           size_t i;
714
715           n_args = agr_func_tab[iter->function & FUNC].n_args;
716           for (i = 0; i < n_args; i++)
717             free (iter->arg[i].c);
718           free (iter->string);
719         }
720       else if (iter->function == SD)
721         moments1_destroy (iter->moments);
722       free (iter);
723     }
724   if (agr->dict != NULL)
725     dict_destroy (agr->dict);
726
727   case_destroy (&agr->agr_case);
728 }
729 \f
730 /* Execution. */
731
732 static void accumulate_aggregate_info (struct agr_proc *,
733                                        const struct ccase *);
734 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
735
736 /* Processes a single case INPUT for aggregation.  If output is
737    warranted, writes it to OUTPUT and returns true.
738    Otherwise, returns false and OUTPUT is unmodified. */
739 static bool
740 aggregate_single_case (struct agr_proc *agr,
741                        const struct ccase *input, struct ccase *output)
742 {
743   bool finished_group = false;
744   
745   if (agr->case_cnt++ == 0)
746     initialize_aggregate_info (agr, input);
747   else if (case_compare (&agr->break_case, input,
748                          agr->break_vars, agr->break_var_cnt))
749     {
750       dump_aggregate_info (agr, output);
751       finished_group = true;
752
753       initialize_aggregate_info (agr, input);
754     }
755
756   accumulate_aggregate_info (agr, input);
757   return finished_group;
758 }
759
760 /* Accumulates aggregation data from the case INPUT. */
761 static void 
762 accumulate_aggregate_info (struct agr_proc *agr,
763                            const struct ccase *input)
764 {
765   struct agr_var *iter;
766   double weight;
767   bool bad_warn = true;
768
769   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
770
771   for (iter = agr->agr_vars; iter; iter = iter->next)
772     if (iter->src)
773       {
774         const union value *v = case_data (input, iter->src->fv);
775         int src_width = var_get_width (iter->src);
776
777         if (iter->include_missing
778             ? var_is_numeric (iter->src) && v->f == SYSMIS
779             : var_is_value_missing (iter->src, v))
780           {
781             switch (iter->function)
782               {
783               case NMISS:
784               case NMISS | FSTRING:
785                 iter->dbl[0] += weight;
786                 break;
787               case NUMISS:
788               case NUMISS | FSTRING:
789                 iter->int1++;
790                 break;
791               }
792             iter->missing = 1;
793             continue;
794           }
795         
796         /* This is horrible.  There are too many possibilities. */
797         switch (iter->function)
798           {
799           case SUM:
800             iter->dbl[0] += v->f * weight;
801             iter->int1 = 1;
802             break;
803           case MEAN:
804             iter->dbl[0] += v->f * weight;
805             iter->dbl[1] += weight;
806             break;
807           case SD:
808             moments1_add (iter->moments, v->f, weight);
809             break;
810           case MAX:
811             iter->dbl[0] = MAX (iter->dbl[0], v->f);
812             iter->int1 = 1;
813             break;
814           case MAX | FSTRING:
815             if (memcmp (iter->string, v->s, src_width) < 0)
816               memcpy (iter->string, v->s, src_width);
817             iter->int1 = 1;
818             break;
819           case MIN:
820             iter->dbl[0] = MIN (iter->dbl[0], v->f);
821             iter->int1 = 1;
822             break;
823           case MIN | FSTRING:
824             if (memcmp (iter->string, v->s, src_width) > 0)
825               memcpy (iter->string, v->s, src_width);
826             iter->int1 = 1;
827             break;
828           case FGT:
829           case PGT:
830             if (v->f > iter->arg[0].f)
831               iter->dbl[0] += weight;
832             iter->dbl[1] += weight;
833             break;
834           case FGT | FSTRING:
835           case PGT | FSTRING:
836             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
837               iter->dbl[0] += weight;
838             iter->dbl[1] += weight;
839             break;
840           case FLT:
841           case PLT:
842             if (v->f < iter->arg[0].f)
843               iter->dbl[0] += weight;
844             iter->dbl[1] += weight;
845             break;
846           case FLT | FSTRING:
847           case PLT | FSTRING:
848             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
849               iter->dbl[0] += weight;
850             iter->dbl[1] += weight;
851             break;
852           case FIN:
853           case PIN:
854             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
855               iter->dbl[0] += weight;
856             iter->dbl[1] += weight;
857             break;
858           case FIN | FSTRING:
859           case PIN | FSTRING:
860             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
861                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
862               iter->dbl[0] += weight;
863             iter->dbl[1] += weight;
864             break;
865           case FOUT:
866           case POUT:
867             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
868               iter->dbl[0] += weight;
869             iter->dbl[1] += weight;
870             break;
871           case FOUT | FSTRING:
872           case POUT | FSTRING:
873             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
874                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case N:
879           case N | FSTRING:
880             iter->dbl[0] += weight;
881             break;
882           case NU:
883           case NU | FSTRING:
884             iter->int1++;
885             break;
886           case FIRST:
887             if (iter->int1 == 0)
888               {
889                 iter->dbl[0] = v->f;
890                 iter->int1 = 1;
891               }
892             break;
893           case FIRST | FSTRING:
894             if (iter->int1 == 0)
895               {
896                 memcpy (iter->string, v->s, src_width);
897                 iter->int1 = 1;
898               }
899             break;
900           case LAST:
901             iter->dbl[0] = v->f;
902             iter->int1 = 1;
903             break;
904           case LAST | FSTRING:
905             memcpy (iter->string, v->s, src_width);
906             iter->int1 = 1;
907             break;
908           case NMISS:
909           case NMISS | FSTRING:
910           case NUMISS:
911           case NUMISS | FSTRING:
912             /* Our value is not missing or it would have been
913                caught earlier.  Nothing to do. */
914             break;
915           default:
916             NOT_REACHED ();
917           }
918     } else {
919       switch (iter->function)
920         {
921         case N_NO_VARS:
922           iter->dbl[0] += weight;
923           break;
924         case NU_NO_VARS:
925           iter->int1++;
926           break;
927         default:
928           NOT_REACHED ();
929         }
930     }
931 }
932
933 /* We've come to a record that differs from the previous in one or
934    more of the break variables.  Make an output record from the
935    accumulated statistics in the OUTPUT case. */
936 static void 
937 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
938 {
939   {
940     int value_idx = 0;
941     int i;
942
943     for (i = 0; i < agr->break_var_cnt; i++) 
944       {
945         struct variable *v = agr->break_vars[i];
946         size_t value_cnt = var_get_value_cnt (v);
947         memcpy (case_data_rw (output, value_idx),
948                 case_data (&agr->break_case, v->fv),
949                 sizeof (union value) * value_cnt);
950         value_idx += value_cnt; 
951       }
952   }
953   
954   {
955     struct agr_var *i;
956   
957     for (i = agr->agr_vars; i; i = i->next)
958       {
959         union value *v = case_data_rw (output, i->dest->fv);
960
961         if (agr->missing == COLUMNWISE && i->missing != 0
962             && (i->function & FUNC) != N && (i->function & FUNC) != NU
963             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
964           {
965             if (var_is_alpha (i->dest))
966               memset (v->s, ' ', var_get_width (i->dest));
967             else
968               v->f = SYSMIS;
969             continue;
970           }
971         
972         switch (i->function)
973           {
974           case SUM:
975             v->f = i->int1 ? i->dbl[0] : SYSMIS;
976             break;
977           case MEAN:
978             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
979             break;
980           case SD:
981             {
982               double variance;
983
984               /* FIXME: we should use two passes. */
985               moments1_calculate (i->moments, NULL, NULL, &variance,
986                                  NULL, NULL);
987               if (variance != SYSMIS)
988                 v->f = sqrt (variance);
989               else
990                 v->f = SYSMIS; 
991             }
992             break;
993           case MAX:
994           case MIN:
995             v->f = i->int1 ? i->dbl[0] : SYSMIS;
996             break;
997           case MAX | FSTRING:
998           case MIN | FSTRING:
999             if (i->int1)
1000               memcpy (v->s, i->string, var_get_width (i->dest));
1001             else
1002               memset (v->s, ' ', var_get_width (i->dest));
1003             break;
1004           case FGT:
1005           case FGT | FSTRING:
1006           case FLT:
1007           case FLT | FSTRING:
1008           case FIN:
1009           case FIN | FSTRING:
1010           case FOUT:
1011           case FOUT | FSTRING:
1012             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1013             break;
1014           case PGT:
1015           case PGT | FSTRING:
1016           case PLT:
1017           case PLT | FSTRING:
1018           case PIN:
1019           case PIN | FSTRING:
1020           case POUT:
1021           case POUT | FSTRING:
1022             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1023             break;
1024           case N:
1025           case N | FSTRING:
1026             v->f = i->dbl[0];
1027             break;
1028           case NU:
1029           case NU | FSTRING:
1030             v->f = i->int1;
1031             break;
1032           case FIRST:
1033           case LAST:
1034             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1035             break;
1036           case FIRST | FSTRING:
1037           case LAST | FSTRING:
1038             if (i->int1)
1039               memcpy (v->s, i->string, var_get_width (i->dest));
1040             else
1041               memset (v->s, ' ', var_get_width (i->dest));
1042             break;
1043           case N_NO_VARS:
1044             v->f = i->dbl[0];
1045             break;
1046           case NU_NO_VARS:
1047             v->f = i->int1;
1048             break;
1049           case NMISS:
1050           case NMISS | FSTRING:
1051             v->f = i->dbl[0];
1052             break;
1053           case NUMISS:
1054           case NUMISS | FSTRING:
1055             v->f = i->int1;
1056             break;
1057           default:
1058             NOT_REACHED ();
1059           }
1060       }
1061   }
1062 }
1063
1064 /* Resets the state for all the aggregate functions. */
1065 static void
1066 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1067 {
1068   struct agr_var *iter;
1069
1070   case_destroy (&agr->break_case);
1071   case_clone (&agr->break_case, input);
1072
1073   for (iter = agr->agr_vars; iter; iter = iter->next)
1074     {
1075       iter->missing = 0;
1076       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1077       iter->int1 = iter->int2 = 0;
1078       switch (iter->function)
1079         {
1080         case MIN:
1081           iter->dbl[0] = DBL_MAX;
1082           break;
1083         case MIN | FSTRING:
1084           memset (iter->string, 255, var_get_width (iter->src));
1085           break;
1086         case MAX:
1087           iter->dbl[0] = -DBL_MAX;
1088           break;
1089         case MAX | FSTRING:
1090           memset (iter->string, 0, var_get_width (iter->src));
1091           break;
1092         case SD:
1093           if (iter->moments == NULL)
1094             iter->moments = moments1_create (MOMENT_VARIANCE);
1095           else
1096             moments1_clear (iter->moments);
1097           break;
1098         default:
1099           break;
1100         }
1101     }
1102 }
1103 \f
1104 /* Aggregate each case as it comes through.  Cases which aren't needed
1105    are dropped.
1106    Returns true if successful, false if an I/O error occurred. */
1107 static bool
1108 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1109 {
1110   struct agr_proc *agr = agr_;
1111
1112   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1113     return agr->sink->class->write (agr->sink, &agr->agr_case);
1114
1115   return true;
1116 }
1117
1118 /* Aggregate the current case and output it if we passed a
1119    breakpoint. */
1120 static bool
1121 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
1122                           const struct dataset *ds UNUSED) 
1123 {
1124   struct agr_proc *agr = agr_;
1125
1126   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1127     return any_writer_write (agr->writer, &agr->agr_case);
1128   
1129   return true;
1130 }