Fix memory leak in data_in
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "minmax.h"
50 #include "xalloc.h"
51
52 #include "gettext.h"
53 #define _(msgid) gettext (msgid)
54
55 /* Argument for AGGREGATE function. */
56 union agr_argument
57   {
58     double f;                           /* Numeric. */
59     char *c;                            /* Short or long string. */
60   };
61
62 /* Specifies how to make an aggregate variable. */
63 struct agr_var
64   {
65     struct agr_var *next;               /* Next in list. */
66
67     /* Collected during parsing. */
68     const struct variable *src; /* Source variable. */
69     struct variable *dest;      /* Target variable. */
70     int function;               /* Function. */
71     enum mv_class exclude;      /* Classes of missing values to exclude. */
72     union agr_argument arg[2];  /* Arguments. */
73
74     /* Accumulated during AGGREGATE execution. */
75     double dbl[3];
76     int int1, int2;
77     char *string;
78     bool saw_missing;
79     struct moments1 *moments;
80     double cc;
81
82     struct variable *subject;
83     struct variable *weight;
84     struct casewriter *writer;
85   };
86
87 /* Aggregation functions. */
88 enum
89   {
90     NONE, SUM, MEAN, MEDIAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
91     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
92     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
93     FUNC = 0x1f, /* Function mask. */
94     FSTRING = 1<<5, /* String function bit. */
95   };
96
97 /* Attributes of an aggregation function. */
98 struct agr_func
99   {
100     const char *name;           /* Aggregation function name. */
101     size_t n_args;              /* Number of arguments. */
102     enum val_type alpha_type;   /* When given ALPHA arguments, output type. */
103     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
104   };
105
106 /* Attributes of aggregation functions. */
107 static const struct agr_func agr_func_tab[] =
108   {
109     {"<NONE>",  0, -1,          {0, 0, 0}},
110     {"SUM",     0, -1,          {FMT_F, 8, 2}},
111     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
112     {"MEDIAN",  0, -1,          {FMT_F, 8, 2}},
113     {"SD",      0, -1,          {FMT_F, 8, 2}},
114     {"MAX",     0, VAL_STRING,  {-1, -1, -1}},
115     {"MIN",     0, VAL_STRING,  {-1, -1, -1}},
116     {"PGT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
117     {"PLT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
118     {"PIN",     2, VAL_NUMERIC, {FMT_F, 5, 1}},
119     {"POUT",    2, VAL_NUMERIC, {FMT_F, 5, 1}},
120     {"FGT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
121     {"FLT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
122     {"FIN",     2, VAL_NUMERIC, {FMT_F, 5, 3}},
123     {"FOUT",    2, VAL_NUMERIC, {FMT_F, 5, 3}},
124     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
126     {"NMISS",   0, VAL_NUMERIC, {FMT_F, 7, 0}},
127     {"NUMISS",  0, VAL_NUMERIC, {FMT_F, 7, 0}},
128     {"FIRST",   0, VAL_STRING,  {-1, -1, -1}},
129     {"LAST",    0, VAL_STRING,  {-1, -1, -1}},
130     {NULL,      0, -1,          {-1, -1, -1}},
131     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
132     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
133   };
134
135 /* Missing value types. */
136 enum missing_treatment
137   {
138     ITEMWISE,           /* Missing values item by item. */
139     COLUMNWISE          /* Missing values column by column. */
140   };
141
142 /* An entire AGGREGATE procedure. */
143 struct agr_proc
144   {
145     /* Break variables. */
146     struct subcase sort;                /* Sort criteria (break variables). */
147     const struct variable **break_vars;       /* Break variables. */
148     size_t break_var_cnt;               /* Number of break variables. */
149     struct ccase *break_case;           /* Last values of break variables. */
150
151     enum missing_treatment missing;     /* How to treat missing values. */
152     struct agr_var *agr_vars;           /* First aggregate variable. */
153     struct dictionary *dict;            /* Aggregate dictionary. */
154     const struct dictionary *src_dict;  /* Dict of the source */
155     int case_cnt;                       /* Counts aggregated cases. */
156   };
157
158 static void initialize_aggregate_info (struct agr_proc *,
159                                        const struct ccase *);
160
161 static void accumulate_aggregate_info (struct agr_proc *,
162                                        const struct ccase *);
163 /* Prototypes. */
164 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
165                                        struct agr_proc *);
166 static void agr_destroy (struct agr_proc *);
167 static void dump_aggregate_info (struct agr_proc *agr,
168                                  struct casewriter *output);
169 \f
170 /* Parsing. */
171
172 /* Parses and executes the AGGREGATE procedure. */
173 int
174 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
175 {
176   struct dictionary *dict = dataset_dict (ds);
177   struct agr_proc agr;
178   struct file_handle *out_file = NULL;
179   struct casereader *input = NULL, *group;
180   struct casegrouper *grouper;
181   struct casewriter *output = NULL;
182
183   bool copy_documents = false;
184   bool presorted = false;
185   bool saw_direction;
186   bool ok;
187
188   memset(&agr, 0 , sizeof (agr));
189   agr.missing = ITEMWISE;
190   agr.break_case = NULL;
191
192   agr.dict = dict_create ();
193   agr.src_dict = dict;
194   subcase_init_empty (&agr.sort);
195   dict_set_label (agr.dict, dict_get_label (dict));
196   dict_set_documents (agr.dict, dict_get_documents (dict));
197
198   /* OUTFILE subcommand must be first. */
199   lex_match (lexer, '/');
200   if (!lex_force_match_id (lexer, "OUTFILE"))
201     goto error;
202   lex_match (lexer, '=');
203   if (!lex_match (lexer, '*'))
204     {
205       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
206       if (out_file == NULL)
207         goto error;
208     }
209
210   /* Read most of the subcommands. */
211   for (;;)
212     {
213       lex_match (lexer, '/');
214
215       if (lex_match_id (lexer, "MISSING"))
216         {
217           lex_match (lexer, '=');
218           if (!lex_match_id (lexer, "COLUMNWISE"))
219             {
220               lex_error (lexer, _("while expecting COLUMNWISE"));
221               goto error;
222             }
223           agr.missing = COLUMNWISE;
224         }
225       else if (lex_match_id (lexer, "DOCUMENT"))
226         copy_documents = true;
227       else if (lex_match_id (lexer, "PRESORTED"))
228         presorted = true;
229       else if (lex_match_id (lexer, "BREAK"))
230         {
231           int i;
232
233           lex_match (lexer, '=');
234           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
235                                     &saw_direction))
236             goto error;
237           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
238
239           for (i = 0; i < agr.break_var_cnt; i++)
240             dict_clone_var_assert (agr.dict, agr.break_vars[i]);
241
242           /* BREAK must follow the options. */
243           break;
244         }
245       else
246         {
247           lex_error (lexer, _("expecting BREAK"));
248           goto error;
249         }
250     }
251   if (presorted && saw_direction)
252     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
253                "with (A) or (D) has no effect.  Output data will be sorted "
254                "the same way as the input data."));
255
256   /* Read in the aggregate functions. */
257   lex_match (lexer, '/');
258   if (!parse_aggregate_functions (lexer, dict, &agr))
259     goto error;
260
261   /* Delete documents. */
262   if (!copy_documents)
263     dict_clear_documents (agr.dict);
264
265   /* Cancel SPLIT FILE. */
266   dict_set_split_vars (agr.dict, NULL, 0);
267
268   /* Initialize. */
269   agr.case_cnt = 0;
270
271   if (out_file == NULL)
272     {
273       /* The active file will be replaced by the aggregated data,
274          so TEMPORARY is moot. */
275       proc_cancel_temporary_transformations (ds);
276       proc_discard_output (ds);
277       output = autopaging_writer_create (dict_get_proto (agr.dict));
278     }
279   else
280     {
281       output = any_writer_open (out_file, agr.dict);
282       if (output == NULL)
283         goto error;
284     }
285
286   input = proc_open (ds);
287   if (!subcase_is_empty (&agr.sort) && !presorted)
288     {
289       input = sort_execute (input, &agr.sort);
290       subcase_clear (&agr.sort);
291     }
292
293   for (grouper = casegrouper_create_vars (input, agr.break_vars,
294                                           agr.break_var_cnt);
295        casegrouper_get_next_group (grouper, &group);
296        casereader_destroy (group))
297     {
298       struct ccase *c = casereader_peek (group, 0);
299       if (c == NULL)
300         {
301           casereader_destroy (group);
302           continue;
303         }
304       initialize_aggregate_info (&agr, c);
305       case_unref (c);
306
307       for (; (c = casereader_read (group)) != NULL; case_unref (c))
308         accumulate_aggregate_info (&agr, c);
309       dump_aggregate_info (&agr, output);
310     }
311   if (!casegrouper_destroy (grouper))
312     goto error;
313
314   if (!proc_commit (ds))
315     {
316       input = NULL;
317       goto error;
318     }
319   input = NULL;
320
321   if (out_file == NULL)
322     {
323       struct casereader *next_input = casewriter_make_reader (output);
324       if (next_input == NULL)
325         goto error;
326
327       proc_set_active_file (ds, next_input, agr.dict);
328       agr.dict = NULL;
329     }
330   else
331     {
332       ok = casewriter_destroy (output);
333       output = NULL;
334       if (!ok)
335         goto error;
336     }
337
338   agr_destroy (&agr);
339   fh_unref (out_file);
340   return CMD_SUCCESS;
341
342 error:
343   if (input != NULL)
344     proc_commit (ds);
345   casewriter_destroy (output);
346   agr_destroy (&agr);
347   fh_unref (out_file);
348   return CMD_CASCADING_FAILURE;
349 }
350
351 /* Parse all the aggregate functions. */
352 static bool
353 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
354                            struct agr_proc *agr)
355 {
356   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
357
358   /* Parse everything. */
359   tail = NULL;
360   for (;;)
361     {
362       char **dest;
363       char **dest_label;
364       size_t n_dest;
365       struct string function_name;
366
367       enum mv_class exclude;
368       const struct agr_func *function;
369       int func_index;
370
371       union agr_argument arg[2];
372
373       const struct variable **src;
374       size_t n_src;
375
376       size_t i;
377
378       dest = NULL;
379       dest_label = NULL;
380       n_dest = 0;
381       src = NULL;
382       function = NULL;
383       n_src = 0;
384       arg[0].c = NULL;
385       arg[1].c = NULL;
386       ds_init_empty (&function_name);
387
388       /* Parse the list of target variables. */
389       while (!lex_match (lexer, '='))
390         {
391           size_t n_dest_prev = n_dest;
392
393           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
394                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
395                                       | PV_NO_DUPLICATE)))
396             goto error;
397
398           /* Assign empty labels. */
399           {
400             int j;
401
402             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
403             for (j = n_dest_prev; j < n_dest; j++)
404               dest_label[j] = NULL;
405           }
406
407
408
409           if (lex_token (lexer) == T_STRING)
410             {
411               struct string label;
412               ds_init_string (&label, lex_tokstr (lexer));
413
414               ds_truncate (&label, 255);
415               dest_label[n_dest - 1] = ds_xstrdup (&label);
416               lex_get (lexer);
417               ds_destroy (&label);
418             }
419         }
420
421       /* Get the name of the aggregation function. */
422       if (lex_token (lexer) != T_ID)
423         {
424           lex_error (lexer, _("expecting aggregation function"));
425           goto error;
426         }
427
428       exclude = MV_ANY;
429
430       ds_assign_string (&function_name, lex_tokstr (lexer));
431
432       ds_chomp (&function_name, '.');
433
434       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
435         exclude = MV_SYSTEM;
436
437       for (function = agr_func_tab; function->name; function++)
438         if (!strcasecmp (function->name, ds_cstr (&function_name)))
439           break;
440       if (NULL == function->name)
441         {
442           msg (SE, _("Unknown aggregation function %s."),
443                ds_cstr (&function_name));
444           goto error;
445         }
446       ds_destroy (&function_name);
447       func_index = function - agr_func_tab;
448       lex_get (lexer);
449
450       /* Check for leading lparen. */
451       if (!lex_match (lexer, '('))
452         {
453           if (func_index == N)
454             func_index = N_NO_VARS;
455           else if (func_index == NU)
456             func_index = NU_NO_VARS;
457           else
458             {
459               lex_error (lexer, _("expecting `('"));
460               goto error;
461             }
462         }
463       else
464         {
465           /* Parse list of source variables. */
466           {
467             int pv_opts = PV_NO_SCRATCH;
468
469             if (func_index == SUM || func_index == MEAN || func_index == SD)
470               pv_opts |= PV_NUMERIC;
471             else if (function->n_args)
472               pv_opts |= PV_SAME_TYPE;
473
474             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
475               goto error;
476           }
477
478           /* Parse function arguments, for those functions that
479              require arguments. */
480           if (function->n_args != 0)
481             for (i = 0; i < function->n_args; i++)
482               {
483                 int type;
484
485                 lex_match (lexer, ',');
486                 if (lex_token (lexer) == T_STRING)
487                   {
488                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
489                     type = VAL_STRING;
490                   }
491                 else if (lex_is_number (lexer))
492                   {
493                     arg[i].f = lex_tokval (lexer);
494                     type = VAL_NUMERIC;
495                   }
496                 else
497                   {
498                     msg (SE, _("Missing argument %zu to %s."),
499                          i + 1, function->name);
500                     goto error;
501                   }
502
503                 lex_get (lexer);
504
505                 if (type != var_get_type (src[0]))
506                   {
507                     msg (SE, _("Arguments to %s must be of same type as "
508                                "source variables."),
509                          function->name);
510                     goto error;
511                   }
512               }
513
514           /* Trailing rparen. */
515           if (!lex_match (lexer, ')'))
516             {
517               lex_error (lexer, _("expecting `)'"));
518               goto error;
519             }
520
521           /* Now check that the number of source variables match
522              the number of target variables.  If we check earlier
523              than this, the user can get very misleading error
524              message, i.e. `AGGREGATE x=SUM(y t).' will get this
525              error message when a proper message would be more
526              like `unknown variable t'. */
527           if (n_src != n_dest)
528             {
529               msg (SE, _("Number of source variables (%zu) does not match "
530                          "number of target variables (%zu)."),
531                     n_src, n_dest);
532               goto error;
533             }
534
535           if ((func_index == PIN || func_index == POUT
536               || func_index == FIN || func_index == FOUT)
537               && (var_is_numeric (src[0])
538                   ? arg[0].f > arg[1].f
539                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
540             {
541               union agr_argument t = arg[0];
542               arg[0] = arg[1];
543               arg[1] = t;
544
545               msg (SW, _("The value arguments passed to the %s function "
546                          "are out-of-order.  They will be treated as if "
547                          "they had been specified in the correct order."),
548                    function->name);
549             }
550         }
551
552       /* Finally add these to the linked list of aggregation
553          variables. */
554       for (i = 0; i < n_dest; i++)
555         {
556           struct agr_var *v = xzalloc (sizeof *v);
557
558           /* Add variable to chain. */
559           if (agr->agr_vars != NULL)
560             tail->next = v;
561           else
562             agr->agr_vars = v;
563           tail = v;
564           tail->next = NULL;
565           v->moments = NULL;
566
567           /* Create the target variable in the aggregate
568              dictionary. */
569           {
570             struct variable *destvar;
571
572             v->function = func_index;
573
574             if (src)
575               {
576                 v->src = src[i];
577
578                 if (var_is_alpha (src[i]))
579                   {
580                     v->function |= FSTRING;
581                     v->string = xmalloc (var_get_width (src[i]));
582                   }
583
584                 if (function->alpha_type == VAL_STRING)
585                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
586                 else
587                   {
588                     assert (var_is_numeric (v->src)
589                             || function->alpha_type == VAL_NUMERIC);
590                     destvar = dict_create_var (agr->dict, dest[i], 0);
591                     if (destvar != NULL)
592                       {
593                         struct fmt_spec f;
594                         if ((func_index == N || func_index == NMISS)
595                             && dict_get_weight (dict) != NULL)
596                           f = fmt_for_output (FMT_F, 8, 2);
597                         else
598                           f = function->format;
599                         var_set_both_formats (destvar, &f);
600                       }
601                   }
602               } else {
603                 struct fmt_spec f;
604                 v->src = NULL;
605                 destvar = dict_create_var (agr->dict, dest[i], 0);
606                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
607                   f = fmt_for_output (FMT_F, 8, 2);
608                 else
609                   f = function->format;
610                 var_set_both_formats (destvar, &f);
611               }
612
613             if (!destvar)
614               {
615                 msg (SE, _("Variable name %s is not unique within the "
616                            "aggregate file dictionary, which contains "
617                            "the aggregate variables and the break "
618                            "variables."),
619                      dest[i]);
620                 goto error;
621               }
622
623             free (dest[i]);
624             if (dest_label[i])
625               var_set_label (destvar, dest_label[i]);
626
627             v->dest = destvar;
628           }
629
630           v->exclude = exclude;
631
632           if (v->src != NULL)
633             {
634               int j;
635
636               if (var_is_numeric (v->src))
637                 for (j = 0; j < function->n_args; j++)
638                   v->arg[j].f = arg[j].f;
639               else
640                 for (j = 0; j < function->n_args; j++)
641                   v->arg[j].c = xstrdup (arg[j].c);
642             }
643         }
644
645       if (src != NULL && var_is_alpha (src[0]))
646         for (i = 0; i < function->n_args; i++)
647           {
648             free (arg[i].c);
649             arg[i].c = NULL;
650           }
651
652       free (src);
653       free (dest);
654       free (dest_label);
655
656       if (!lex_match (lexer, '/'))
657         {
658           if (lex_token (lexer) == '.')
659             return true;
660
661           lex_error (lexer, "expecting end of command");
662           return false;
663         }
664       continue;
665
666     error:
667       ds_destroy (&function_name);
668       for (i = 0; i < n_dest; i++)
669         {
670           free (dest[i]);
671           free (dest_label[i]);
672         }
673       free (dest);
674       free (dest_label);
675       free (arg[0].c);
676       free (arg[1].c);
677       if (src && n_src && var_is_alpha (src[0]))
678         for (i = 0; i < function->n_args; i++)
679           {
680             free (arg[i].c);
681             arg[i].c = NULL;
682           }
683       free (src);
684
685       return false;
686     }
687 }
688
689 /* Destroys AGR. */
690 static void
691 agr_destroy (struct agr_proc *agr)
692 {
693   struct agr_var *iter, *next;
694
695   subcase_destroy (&agr->sort);
696   free (agr->break_vars);
697   case_unref (agr->break_case);
698   for (iter = agr->agr_vars; iter; iter = next)
699     {
700       next = iter->next;
701
702       if (iter->function & FSTRING)
703         {
704           size_t n_args;
705           size_t i;
706
707           n_args = agr_func_tab[iter->function & FUNC].n_args;
708           for (i = 0; i < n_args; i++)
709             free (iter->arg[i].c);
710           free (iter->string);
711         }
712       else if (iter->function == SD)
713         moments1_destroy (iter->moments);
714
715       dict_destroy_internal_var (iter->subject);
716       dict_destroy_internal_var (iter->weight);
717
718       free (iter);
719     }
720   if (agr->dict != NULL)
721     dict_destroy (agr->dict);
722 }
723 \f
724 /* Execution. */
725
726 /* Accumulates aggregation data from the case INPUT. */
727 static void
728 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
729 {
730   struct agr_var *iter;
731   double weight;
732   bool bad_warn = true;
733
734   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
735
736   for (iter = agr->agr_vars; iter; iter = iter->next)
737     if (iter->src)
738       {
739         const union value *v = case_data (input, iter->src);
740         int src_width = var_get_width (iter->src);
741
742         if (var_is_value_missing (iter->src, v, iter->exclude))
743           {
744             switch (iter->function)
745               {
746               case NMISS:
747               case NMISS | FSTRING:
748                 iter->dbl[0] += weight;
749                 break;
750               case NUMISS:
751               case NUMISS | FSTRING:
752                 iter->int1++;
753                 break;
754               }
755             iter->saw_missing = true;
756             continue;
757           }
758
759         /* This is horrible.  There are too many possibilities. */
760         switch (iter->function)
761           {
762           case SUM:
763             iter->dbl[0] += v->f * weight;
764             iter->int1 = 1;
765             break;
766           case MEAN:
767             iter->dbl[0] += v->f * weight;
768             iter->dbl[1] += weight;
769             break;
770           case MEDIAN:
771             {
772               double wv ;
773               struct ccase *cout;
774
775               cout = case_create (casewriter_get_proto (iter->writer));
776
777               case_data_rw (cout, iter->subject)->f
778                 = case_data (input, iter->src)->f;
779
780               wv = dict_get_case_weight (agr->src_dict, input, NULL);
781
782               case_data_rw (cout, iter->weight)->f = wv;
783
784               iter->cc += wv;
785
786               casewriter_write (iter->writer, cout);
787             }
788             break;
789           case SD:
790             moments1_add (iter->moments, v->f, weight);
791             break;
792           case MAX:
793             iter->dbl[0] = MAX (iter->dbl[0], v->f);
794             iter->int1 = 1;
795             break;
796           case MAX | FSTRING:
797             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
798               memcpy (iter->string, value_str (v, src_width), src_width);
799             iter->int1 = 1;
800             break;
801           case MIN:
802             iter->dbl[0] = MIN (iter->dbl[0], v->f);
803             iter->int1 = 1;
804             break;
805           case MIN | FSTRING:
806             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
807               memcpy (iter->string, value_str (v, src_width), src_width);
808             iter->int1 = 1;
809             break;
810           case FGT:
811           case PGT:
812             if (v->f > iter->arg[0].f)
813               iter->dbl[0] += weight;
814             iter->dbl[1] += weight;
815             break;
816           case FGT | FSTRING:
817           case PGT | FSTRING:
818             if (memcmp (iter->arg[0].c,
819                         value_str (v, src_width), src_width) < 0)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FLT:
824           case PLT:
825             if (v->f < iter->arg[0].f)
826               iter->dbl[0] += weight;
827             iter->dbl[1] += weight;
828             break;
829           case FLT | FSTRING:
830           case PLT | FSTRING:
831             if (memcmp (iter->arg[0].c,
832                         value_str (v, src_width), src_width) > 0)
833               iter->dbl[0] += weight;
834             iter->dbl[1] += weight;
835             break;
836           case FIN:
837           case PIN:
838             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
839               iter->dbl[0] += weight;
840             iter->dbl[1] += weight;
841             break;
842           case FIN | FSTRING:
843           case PIN | FSTRING:
844             if (memcmp (iter->arg[0].c,
845                         value_str (v, src_width), src_width) <= 0
846                 && memcmp (iter->arg[1].c,
847                            value_str (v, src_width), src_width) >= 0)
848               iter->dbl[0] += weight;
849             iter->dbl[1] += weight;
850             break;
851           case FOUT:
852           case POUT:
853             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
854               iter->dbl[0] += weight;
855             iter->dbl[1] += weight;
856             break;
857           case FOUT | FSTRING:
858           case POUT | FSTRING:
859             if (memcmp (iter->arg[0].c,
860                         value_str (v, src_width), src_width) > 0
861                 || memcmp (iter->arg[1].c,
862                            value_str (v, src_width), src_width) < 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case N:
867           case N | FSTRING:
868             iter->dbl[0] += weight;
869             break;
870           case NU:
871           case NU | FSTRING:
872             iter->int1++;
873             break;
874           case FIRST:
875             if (iter->int1 == 0)
876               {
877                 iter->dbl[0] = v->f;
878                 iter->int1 = 1;
879               }
880             break;
881           case FIRST | FSTRING:
882             if (iter->int1 == 0)
883               {
884                 memcpy (iter->string, value_str (v, src_width), src_width);
885                 iter->int1 = 1;
886               }
887             break;
888           case LAST:
889             iter->dbl[0] = v->f;
890             iter->int1 = 1;
891             break;
892           case LAST | FSTRING:
893             memcpy (iter->string, value_str (v, src_width), src_width);
894             iter->int1 = 1;
895             break;
896           case NMISS:
897           case NMISS | FSTRING:
898           case NUMISS:
899           case NUMISS | FSTRING:
900             /* Our value is not missing or it would have been
901                caught earlier.  Nothing to do. */
902             break;
903           default:
904             NOT_REACHED ();
905           }
906     } else {
907       switch (iter->function)
908         {
909         case N_NO_VARS:
910           iter->dbl[0] += weight;
911           break;
912         case NU_NO_VARS:
913           iter->int1++;
914           break;
915         default:
916           NOT_REACHED ();
917         }
918     }
919 }
920
921 /* Writes an aggregated record to OUTPUT. */
922 static void
923 dump_aggregate_info (struct agr_proc *agr, struct casewriter *output)
924 {
925   struct ccase *c = case_create (dict_get_proto (agr->dict));
926
927   {
928     int value_idx = 0;
929     int i;
930
931     for (i = 0; i < agr->break_var_cnt; i++)
932       {
933         const struct variable *v = agr->break_vars[i];
934         value_copy (case_data_rw_idx (c, value_idx),
935                     case_data (agr->break_case, v),
936                     var_get_width (v));
937         value_idx++;
938       }
939   }
940
941   {
942     struct agr_var *i;
943
944     for (i = agr->agr_vars; i; i = i->next)
945       {
946         union value *v = case_data_rw (c, i->dest);
947         int width = var_get_width (i->dest);
948
949         if (agr->missing == COLUMNWISE && i->saw_missing
950             && (i->function & FUNC) != N && (i->function & FUNC) != NU
951             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
952           {
953             value_set_missing (v, width);
954             casewriter_destroy (i->writer);
955             continue;
956           }
957
958         switch (i->function)
959           {
960           case SUM:
961             v->f = i->int1 ? i->dbl[0] : SYSMIS;
962             break;
963           case MEAN:
964             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
965             break;
966           case MEDIAN:
967             {
968               struct casereader *sorted_reader;
969               struct percentile *median = percentile_create (0.5, i->cc);
970               struct order_stats *os = &median->parent;
971
972               sorted_reader = casewriter_make_reader (i->writer);
973
974               order_stats_accumulate (&os, 1,
975                                       sorted_reader,
976                                       i->weight,
977                                       i->subject,
978                                       i->exclude);
979
980               v->f = percentile_calculate (median, PC_HAVERAGE);
981
982               statistic_destroy (&median->parent.parent);
983             }
984             break;
985           case SD:
986             {
987               double variance;
988
989               /* FIXME: we should use two passes. */
990               moments1_calculate (i->moments, NULL, NULL, &variance,
991                                  NULL, NULL);
992               if (variance != SYSMIS)
993                 v->f = sqrt (variance);
994               else
995                 v->f = SYSMIS;
996             }
997             break;
998           case MAX:
999           case MIN:
1000             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1001             break;
1002           case MAX | FSTRING:
1003           case MIN | FSTRING:
1004             if (i->int1)
1005               memcpy (value_str_rw (v, width), i->string, width);
1006             else
1007               value_set_missing (v, width);
1008             break;
1009           case FGT:
1010           case FGT | FSTRING:
1011           case FLT:
1012           case FLT | FSTRING:
1013           case FIN:
1014           case FIN | FSTRING:
1015           case FOUT:
1016           case FOUT | FSTRING:
1017             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1018             break;
1019           case PGT:
1020           case PGT | FSTRING:
1021           case PLT:
1022           case PLT | FSTRING:
1023           case PIN:
1024           case PIN | FSTRING:
1025           case POUT:
1026           case POUT | FSTRING:
1027             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1028             break;
1029           case N:
1030           case N | FSTRING:
1031             v->f = i->dbl[0];
1032             break;
1033           case NU:
1034           case NU | FSTRING:
1035             v->f = i->int1;
1036             break;
1037           case FIRST:
1038           case LAST:
1039             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1040             break;
1041           case FIRST | FSTRING:
1042           case LAST | FSTRING:
1043             if (i->int1)
1044               memcpy (value_str_rw (v, width), i->string, width);
1045             else
1046               value_set_missing (v, width);
1047             break;
1048           case N_NO_VARS:
1049             v->f = i->dbl[0];
1050             break;
1051           case NU_NO_VARS:
1052             v->f = i->int1;
1053             break;
1054           case NMISS:
1055           case NMISS | FSTRING:
1056             v->f = i->dbl[0];
1057             break;
1058           case NUMISS:
1059           case NUMISS | FSTRING:
1060             v->f = i->int1;
1061             break;
1062           default:
1063             NOT_REACHED ();
1064           }
1065       }
1066   }
1067
1068   casewriter_write (output, c);
1069 }
1070
1071 /* Resets the state for all the aggregate functions. */
1072 static void
1073 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1074 {
1075   struct agr_var *iter;
1076
1077   case_unref (agr->break_case);
1078   agr->break_case = case_ref (input);
1079
1080   for (iter = agr->agr_vars; iter; iter = iter->next)
1081     {
1082       iter->saw_missing = false;
1083       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1084       iter->int1 = iter->int2 = 0;
1085       switch (iter->function)
1086         {
1087         case MIN:
1088           iter->dbl[0] = DBL_MAX;
1089           break;
1090         case MIN | FSTRING:
1091           memset (iter->string, 255, var_get_width (iter->src));
1092           break;
1093         case MAX:
1094           iter->dbl[0] = -DBL_MAX;
1095           break;
1096         case MAX | FSTRING:
1097           memset (iter->string, 0, var_get_width (iter->src));
1098           break;
1099         case MEDIAN:
1100           {
1101             struct caseproto *proto;
1102             struct subcase ordering;
1103
1104             proto = caseproto_create ();
1105             proto = caseproto_add_width (proto, 0);
1106             proto = caseproto_add_width (proto, 0);
1107
1108             if ( ! iter->subject)
1109               iter->subject = dict_create_internal_var (0, 0);
1110
1111             if ( ! iter->weight)
1112               iter->weight = dict_create_internal_var (1, 0);
1113
1114             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1115             iter->writer = sort_create_writer (&ordering, proto);
1116             subcase_destroy (&ordering);
1117             caseproto_unref (proto);
1118
1119             iter->cc = 0;
1120           }
1121           break;
1122         case SD:
1123           if (iter->moments == NULL)
1124             iter->moments = moments1_create (MOMENT_VARIANCE);
1125           else
1126             moments1_clear (iter->moments);
1127           break;
1128         default:
1129           break;
1130         }
1131     }
1132 }