Move all command implementations into a single 'commands' directory.
[pspp] / src / language / commands / aggregate.c
diff --git a/src/language/commands/aggregate.c b/src/language/commands/aggregate.c
new file mode 100644 (file)
index 0000000..fac71b1
--- /dev/null
@@ -0,0 +1,1133 @@
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011, 2012, 2014 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+#include <config.h>
+
+#include "language/commands/aggregate.h"
+
+#include <stdlib.h>
+
+#include "data/any-writer.h"
+#include "data/case.h"
+#include "data/casegrouper.h"
+#include "data/casereader.h"
+#include "data/casewriter.h"
+#include "data/dataset.h"
+#include "data/dictionary.h"
+#include "data/file-handle-def.h"
+#include "data/format.h"
+#include "data/settings.h"
+#include "data/subcase.h"
+#include "data/sys-file-writer.h"
+#include "data/variable.h"
+#include "language/command.h"
+#include "language/commands/file-handle.h"
+#include "language/lexer/lexer.h"
+#include "language/lexer/variable-parser.h"
+#include "language/commands/sort-criteria.h"
+#include "libpspp/assertion.h"
+#include "libpspp/i18n.h"
+#include "libpspp/message.h"
+#include "libpspp/misc.h"
+#include "libpspp/pool.h"
+#include "libpspp/str.h"
+#include "math/moments.h"
+#include "math/percentiles.h"
+#include "math/sort.h"
+#include "math/statistic.h"
+
+#include "gl/c-strcase.h"
+#include "gl/minmax.h"
+#include "gl/xalloc.h"
+
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+#define N_(msgid) msgid
+
+/* Argument for AGGREGATE function.
+
+   Only one of the members is used, so this could be a union, but it's simpler
+   to just have both. */
+struct agr_argument
+  {
+    double f;                           /* Numeric. */
+    struct substring s;                 /* String. */
+  };
+
+/* Specifies how to make an aggregate variable. */
+struct agr_var
+  {
+    /* Collected during parsing. */
+    const struct variable *src;        /* Source variable. */
+    struct variable *dest;     /* Target variable. */
+    enum agr_function function; /* Function. */
+    enum mv_class exclude;      /* Classes of missing values to exclude. */
+    struct agr_argument arg[2];        /* Arguments. */
+
+    /* Accumulated during AGGREGATE execution. */
+    double dbl;
+    double W;                   /* Total non-missing weight. */
+    int int1;
+    char *string;
+    bool saw_missing;
+    struct moments1 *moments;
+
+    struct variable *subject;
+    struct variable *weight;
+    struct casewriter *writer;
+  };
+
+/* Attributes of aggregation functions. */
+const struct agr_func agr_func_tab[] =
+  {
+#define AGRF(ENUM, NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, W, D) \
+    [ENUM] = { NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, \
+               { .type = (W) > 0 ? FMT_F : -1, .w = W, .d = D } },
+AGGREGATE_FUNCTIONS
+#undef AGRF
+    {NULL, NULL, AGR_SV_NO, 0, -1, {-1, -1, -1}},
+  };
+
+/* Missing value types. */
+enum missing_treatment
+  {
+    ITEMWISE,          /* Missing values item by item. */
+    COLUMNWISE         /* Missing values column by column. */
+  };
+
+/* An entire AGGREGATE procedure. */
+struct agr_proc
+  {
+    /* Break variables. */
+    struct subcase sort;                /* Sort criteria (break variables). */
+    const struct variable **break_vars;       /* Break variables. */
+    size_t break_n_vars;                /* Number of break variables. */
+
+    enum missing_treatment missing;     /* How to treat missing values. */
+    struct agr_var *agr_vars;           /* Aggregate variables. */
+    size_t n_agr_vars;
+    struct dictionary *dict;            /* Aggregate dictionary. */
+    const struct dictionary *src_dict;  /* Dict of the source */
+    int n_cases;                        /* Counts aggregated cases. */
+
+    bool add_variables;                 /* True iff the aggregated variables should
+                                          be appended to the existing dictionary */
+  };
+
+static void initialize_aggregate_info (struct agr_proc *);
+
+static void accumulate_aggregate_info (struct agr_proc *,
+                                       const struct ccase *);
+/* Prototypes. */
+static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
+                                      struct agr_proc *);
+static void agr_destroy (struct agr_proc *);
+static void dump_aggregate_info (const struct agr_proc *agr,
+                                 struct casewriter *output,
+                                const struct ccase *break_case);
+\f
+/* Parsing. */
+
+/* Parses and executes the AGGREGATE procedure. */
+int
+cmd_aggregate (struct lexer *lexer, struct dataset *ds)
+{
+  struct dictionary *dict = dataset_dict (ds);
+  struct agr_proc agr = {
+    .missing = ITEMWISE,
+    .src_dict = dict,
+  };
+  struct file_handle *out_file = NULL;
+  struct casereader *input = NULL;
+  struct casewriter *output = NULL;
+
+  bool copy_documents = false;
+  bool presorted = false;
+  int addvariables_ofs = 0;
+
+  /* OUTFILE subcommand must be first. */
+  if (lex_match_phrase (lexer, "/OUTFILE") || lex_match_id (lexer, "OUTFILE"))
+    {
+      lex_match (lexer, T_EQUALS);
+      if (!lex_match (lexer, T_ASTERISK))
+        {
+          out_file = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
+          if (out_file == NULL)
+            goto error;
+        }
+
+      if (!out_file && lex_match_id (lexer, "MODE"))
+        {
+          lex_match (lexer, T_EQUALS);
+          if (lex_match_id (lexer, "ADDVARIABLES"))
+            {
+              addvariables_ofs = lex_ofs (lexer) - 1;
+              agr.add_variables = true;
+              presorted = true;
+            }
+          else if (lex_match_id (lexer, "REPLACE"))
+            agr.add_variables = false;
+          else
+            {
+              lex_error_expecting (lexer, "ADDVARIABLES", "REPLACE");
+              goto error;
+            }
+        }
+    }
+  else
+    {
+      agr.add_variables = true;
+      presorted = true;
+    }
+
+  if (lex_match_phrase (lexer, "/MISSING"))
+    {
+      lex_match (lexer, T_EQUALS);
+      if (!lex_match_id (lexer, "COLUMNWISE"))
+        {
+          lex_error_expecting (lexer, "COLUMNWISE");
+          goto error;
+        }
+      agr.missing = COLUMNWISE;
+    }
+
+  int presorted_ofs = 0;
+  for (;;)
+    if (lex_match_phrase (lexer, "/DOCUMENT"))
+      copy_documents = true;
+    else if (lex_match_phrase (lexer, "/PRESORTED"))
+      {
+        presorted = true;
+        presorted_ofs = lex_ofs (lexer) - 1;
+      }
+    else
+      break;
+
+  if (agr.add_variables)
+    agr.dict = dict_clone (dict);
+  else
+    agr.dict = dict_create (dict_get_encoding (dict));
+
+  dict_set_label (agr.dict, dict_get_label (dict));
+  dict_set_documents (agr.dict, dict_get_documents (dict));
+
+  if (lex_match_phrase (lexer, "/BREAK"))
+    {
+      lex_match (lexer, T_EQUALS);
+      bool saw_direction;
+      int break_start = lex_ofs (lexer);
+      if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
+                                &saw_direction))
+        goto error;
+      int break_end = lex_ofs (lexer) - 1;
+      agr.break_n_vars = subcase_get_n_fields (&agr.sort);
+
+      if  (! agr.add_variables)
+        for (size_t i = 0; i < agr.break_n_vars; i++)
+          dict_clone_var_assert (agr.dict, agr.break_vars[i]);
+
+      if (presorted && saw_direction)
+        {
+          lex_ofs_msg (lexer, SW, break_start, break_end,
+                       _("When the input data is presorted, specifying "
+                         "sorting directions with (A) or (D) has no effect.  "
+                         "Output data will be sorted the same way as the "
+                         "input data."));
+          if (presorted_ofs)
+            lex_ofs_msg (lexer, SN, presorted_ofs, presorted_ofs,
+                         _("The PRESORTED subcommand state that the "
+                           "input data is presorted."));
+          else if (addvariables_ofs)
+            lex_ofs_msg (lexer, SN, addvariables_ofs, addvariables_ofs,
+                         _("ADDVARIABLES implies that the input data "
+                           "is presorted."));
+          else
+            msg (SN, _("The input data must be presorted because the "
+                       "OUTFILE subcommand is not specified."));
+        }
+    }
+
+  /* Read in the aggregate functions. */
+  if (!parse_aggregate_functions (lexer, dict, &agr))
+    goto error;
+
+  /* Delete documents. */
+  if (!copy_documents)
+    dict_clear_documents (agr.dict);
+
+  /* Cancel SPLIT FILE. */
+  dict_clear_split_vars (agr.dict);
+
+  /* Initialize. */
+  agr.n_cases = 0;
+
+  if (out_file == NULL)
+    {
+      /* The active dataset will be replaced by the aggregated data,
+         so TEMPORARY is moot. */
+      proc_cancel_temporary_transformations (ds);
+      proc_discard_output (ds);
+      output = autopaging_writer_create (dict_get_proto (agr.dict));
+    }
+  else
+    {
+      output = any_writer_open (out_file, agr.dict);
+      if (output == NULL)
+        goto error;
+    }
+
+  input = proc_open (ds);
+  if (!subcase_is_empty (&agr.sort) && !presorted)
+    {
+      input = sort_execute (input, &agr.sort);
+      subcase_clear (&agr.sort);
+    }
+
+  struct casegrouper *grouper;
+  struct casereader *group;
+  for (grouper = casegrouper_create_vars (input, agr.break_vars,
+                                          agr.break_n_vars);
+       casegrouper_get_next_group (grouper, &group);
+       casereader_destroy (group))
+    {
+      struct casereader *placeholder = NULL;
+      struct ccase *c = casereader_peek (group, 0);
+
+      if (c == NULL)
+        {
+          casereader_destroy (group);
+          continue;
+        }
+
+      initialize_aggregate_info (&agr);
+
+      if (agr.add_variables)
+       placeholder = casereader_clone (group);
+
+      {
+       struct ccase *cg;
+       for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
+         accumulate_aggregate_info (&agr, cg);
+      }
+
+
+      if  (agr.add_variables)
+       {
+         struct ccase *cg;
+         for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
+           dump_aggregate_info (&agr, output, cg);
+
+         casereader_destroy (placeholder);
+       }
+      else
+       {
+         dump_aggregate_info (&agr, output, c);
+       }
+      case_unref (c);
+    }
+  if (!casegrouper_destroy (grouper))
+    goto error;
+
+  bool ok = proc_commit (ds);
+  input = NULL;
+  if (!ok)
+    goto error;
+
+  if (out_file == NULL)
+    {
+      struct casereader *next_input = casewriter_make_reader (output);
+      if (next_input == NULL)
+        goto error;
+
+      dataset_set_dict (ds, agr.dict);
+      dataset_set_source (ds, next_input);
+      agr.dict = NULL;
+    }
+  else
+    {
+      ok = casewriter_destroy (output);
+      output = NULL;
+      if (!ok)
+        goto error;
+    }
+
+  agr_destroy (&agr);
+  fh_unref (out_file);
+  return CMD_SUCCESS;
+
+error:
+  if (input != NULL)
+    proc_commit (ds);
+  casewriter_destroy (output);
+  agr_destroy (&agr);
+  fh_unref (out_file);
+  return CMD_CASCADING_FAILURE;
+}
+
+static bool
+parse_agr_func_name (struct lexer *lexer, int *func_index,
+                     enum mv_class *exclude)
+{
+  if (lex_token (lexer) != T_ID)
+    {
+      lex_error (lexer, _("Syntax error expecting aggregation function."));
+      return false;
+    }
+
+  struct substring name = lex_tokss (lexer);
+  *exclude = ss_chomp_byte (&name, '.') ? MV_SYSTEM : MV_ANY;
+
+  for (const struct agr_func *f = agr_func_tab; f->name; f++)
+    if (ss_equals_case (ss_cstr (f->name), name))
+      {
+        *func_index = f - agr_func_tab;
+        lex_get (lexer);
+        return true;
+      }
+  lex_error (lexer, _("Unknown aggregation function %s."), lex_tokcstr (lexer));
+  return false;
+}
+
+/* Parse all the aggregate functions. */
+static bool
+parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
+                          struct agr_proc *agr)
+{
+  if (!lex_force_match (lexer, T_SLASH))
+    return false;
+
+  size_t starting_n_vars = dict_get_n_vars (dict);
+  size_t allocated_agr_vars = 0;
+
+  /* Parse everything. */
+  for (;;)
+    {
+      char **dest = NULL;
+      char **dest_label = NULL;
+      size_t n_vars = 0;
+
+      struct agr_argument arg[2] = { { .f = 0 }, { .f = 0 } };
+
+      const struct variable **src = NULL;
+
+      /* Parse the list of target variables. */
+      int dst_start_ofs = lex_ofs (lexer);
+      while (!lex_match (lexer, T_EQUALS))
+       {
+         size_t n_vars_prev = n_vars;
+
+         if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_vars,
+                                     (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
+                                      | PV_NO_DUPLICATE)))
+           goto error;
+
+         /* Assign empty labels. */
+          dest_label = xnrealloc (dest_label, n_vars, sizeof *dest_label);
+          for (size_t j = n_vars_prev; j < n_vars; j++)
+            dest_label[j] = NULL;
+
+         if (lex_is_string (lexer))
+           {
+             dest_label[n_vars - 1] = xstrdup (lex_tokcstr (lexer));
+             lex_get (lexer);
+           }
+       }
+      int dst_end_ofs = lex_ofs (lexer) - 2;
+
+      /* Get the name of the aggregation function. */
+      int func_index;
+      enum mv_class exclude;
+      if (!parse_agr_func_name (lexer, &func_index, &exclude))
+        goto error;
+      const struct agr_func *function = &agr_func_tab[func_index];
+
+      /* Check for leading lparen. */
+      if (!lex_match (lexer, T_LPAREN))
+       {
+         if (function->src_vars == AGR_SV_YES)
+           {
+              bool ok UNUSED = lex_force_match (lexer, T_LPAREN);
+             goto error;
+           }
+       }
+      else
+        {
+         /* Parse list of source variables. */
+          int pv_opts = PV_NO_SCRATCH;
+          if (func_index == AGRF_SUM || func_index == AGRF_MEAN
+              || func_index == AGRF_MEDIAN || func_index == AGRF_SD)
+            pv_opts |= PV_NUMERIC;
+          else if (function->n_args)
+            pv_opts |= PV_SAME_TYPE;
+
+          int src_start_ofs = lex_ofs (lexer);
+          size_t n_src;
+          if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
+            goto error;
+          int src_end_ofs = lex_ofs (lexer) - 1;
+
+         /* Parse function arguments, for those functions that
+            require arguments. */
+          int args_start_ofs = 0;
+         if (function->n_args != 0)
+           for (size_t i = 0; i < function->n_args; i++)
+             {
+               lex_match (lexer, T_COMMA);
+
+               enum val_type type;
+               if (lex_is_string (lexer))
+                  type = VAL_STRING;
+                else if (lex_is_number (lexer))
+                  type = VAL_NUMERIC;
+                else
+                  {
+                   lex_error (lexer, _("Missing argument %zu to %s."),
+                               i + 1, function->name);
+                   goto error;
+                 }
+
+               if (type != var_get_type (src[0]))
+                 {
+                   msg (SE, _("Arguments to %s must be of same type as "
+                              "source variables."),
+                        function->name);
+                    if (type == VAL_NUMERIC)
+                      {
+                        lex_next_msg (lexer, SN, 0, 0,
+                                      _("The argument is numeric."));
+                        lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
+                                     _("The variables have string type."));
+                      }
+                    else
+                      {
+                        lex_next_msg (lexer, SN, 0, 0,
+                                      _("The argument is a string."));
+                        lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
+                                     _("The variables are numeric."));
+                      }
+                   goto error;
+                 }
+
+                if (i == 0)
+                  args_start_ofs = lex_ofs (lexer);
+               if (type == VAL_NUMERIC)
+                  arg[i].f = lex_tokval (lexer);
+                else
+                  arg[i].s = recode_substring_pool (dict_get_encoding (agr->dict),
+                                                    "UTF-8", lex_tokss (lexer),
+                                                    NULL);
+               lex_get (lexer);
+             }
+          int args_end_ofs = lex_ofs (lexer) - 1;
+
+         /* Trailing rparen. */
+         if (!lex_force_match (lexer, T_RPAREN))
+            goto error;
+
+         /* Now check that the number of source variables match
+            the number of target variables.  If we check earlier
+            than this, the user can get very misleading error
+            message, i.e. `AGGREGATE x=SUM(y t).' will get this
+            error message when a proper message would be more
+            like `unknown variable t'. */
+         if (n_src != n_vars)
+           {
+             msg (SE, _("Number of source variables (%zu) does not match "
+                        "number of target variables (%zu)."),
+                   n_src, n_vars);
+              lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
+                           _("These are the source variables."));
+              lex_ofs_msg (lexer, SN, dst_start_ofs, dst_end_ofs,
+                           _("These are the target variables."));
+             goto error;
+           }
+
+          if ((func_index == AGRF_PIN || func_index == AGRF_POUT
+              || func_index == AGRF_FIN || func_index == AGRF_FOUT)
+              && (var_is_numeric (src[0])
+                  ? arg[0].f > arg[1].f
+                  : buf_compare_rpad (arg[0].s.string, arg[0].s.length,
+                                      arg[1].s.string, arg[1].s.length) > 0))
+            {
+              struct agr_argument tmp = arg[0];
+              arg[0] = arg[1];
+              arg[1] = tmp;
+
+              lex_ofs_msg (lexer, SW, args_start_ofs, args_end_ofs,
+                           _("The value arguments passed to the %s function "
+                             "are out of order.  They will be treated as if "
+                             "they had been specified in the correct order."),
+                           function->name);
+            }
+       }
+
+      /* Finally add these to the aggregation variables. */
+      for (size_t i = 0; i < n_vars; i++)
+       {
+          const struct variable *existing_var = dict_lookup_var (agr->dict,
+                                                                 dest[i]);
+          if (existing_var)
+            {
+              if (var_get_dict_index (existing_var) >= starting_n_vars)
+               lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
+                               _("Duplicate target variable name %s."),
+                               dest[i]);
+              else if (agr->add_variables)
+               lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
+                               _("Variable name %s duplicates the name of a "
+                                 "variable in the active file dictionary."),
+                               dest[i]);
+              else
+               lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
+                               _("Variable name %s duplicates the name of a "
+                                 "break variable."), dest[i]);
+              goto error;
+            }
+
+         /* Add variable. */
+          if (agr->n_agr_vars >= allocated_agr_vars)
+            agr->agr_vars = x2nrealloc (agr->agr_vars, &allocated_agr_vars,
+                                        sizeof *agr->agr_vars);
+          struct agr_var *v = &agr->agr_vars[agr->n_agr_vars++];
+          *v = (struct agr_var) {
+            .exclude = exclude,
+            .moments = NULL,
+            .function = func_index,
+            .src = src ? src[i] : NULL,
+          };
+
+         /* Create the target variable in the aggregate dictionary. */
+          if (v->src && var_is_alpha (v->src))
+            v->string = xmalloc (var_get_width (v->src));
+
+          if (v->src && function->alpha_type == VAL_STRING)
+            v->dest = dict_clone_var_as_assert (agr->dict, v->src, dest[i]);
+          else
+            {
+              v->dest = dict_create_var_assert (agr->dict, dest[i], 0);
+
+              struct fmt_spec f;
+              if ((func_index == AGRF_N || func_index == AGRF_NMISS)
+                  && dict_get_weight (dict) != NULL)
+                f = fmt_for_output (FMT_F, 8, 2);
+              else
+                f = function->format;
+              var_set_both_formats (v->dest, &f);
+            }
+          if (dest_label[i])
+            var_set_label (v->dest, dest_label[i]);
+
+         if (v->src != NULL)
+            for (size_t j = 0; j < function->n_args; j++)
+              v->arg[j] = (struct agr_argument) {
+                .f = arg[j].f,
+                .s = arg[j].s.string ? ss_clone (arg[j].s) : ss_empty (),
+              };
+       }
+
+      ss_dealloc (&arg[0].s);
+      ss_dealloc (&arg[1].s);
+
+      free (src);
+      for (size_t i = 0; i < n_vars; i++)
+       {
+         free (dest[i]);
+         free (dest_label[i]);
+       }
+      free (dest);
+      free (dest_label);
+
+      if (!lex_match (lexer, T_SLASH))
+       {
+         if (lex_token (lexer) == T_ENDCMD)
+           return true;
+
+         lex_error (lexer, "Syntax error expecting end of command.");
+         return false;
+       }
+      continue;
+
+    error:
+      for (size_t i = 0; i < n_vars; i++)
+       {
+         free (dest[i]);
+         free (dest_label[i]);
+       }
+      free (dest);
+      free (dest_label);
+      ss_dealloc (&arg[0].s);
+      ss_dealloc (&arg[1].s);
+      free (src);
+
+      return false;
+    }
+}
+
+/* Destroys AGR. */
+static void
+agr_destroy (struct agr_proc *agr)
+{
+  subcase_uninit (&agr->sort);
+  free (agr->break_vars);
+  for (size_t i = 0; i < agr->n_agr_vars; i++)
+    {
+      struct agr_var *av = &agr->agr_vars[i];
+
+      ss_dealloc (&av->arg[0].s);
+      ss_dealloc (&av->arg[1].s);
+      free (av->string);
+
+      if (av->function == AGRF_SD)
+        moments1_destroy (av->moments);
+
+      dict_destroy_internal_var (av->subject);
+      dict_destroy_internal_var (av->weight);
+    }
+  free (agr->agr_vars);
+  if (agr->dict != NULL)
+    dict_unref (agr->dict);
+}
+\f
+/* Execution. */
+
+/* Accumulates aggregation data from the case INPUT. */
+static void
+accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
+{
+  bool bad_warn = true;
+  double weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
+  for (size_t i = 0; i < agr->n_agr_vars; i++)
+    {
+      struct agr_var *av = &agr->agr_vars[i];
+      if (av->src)
+        {
+          bool is_string = var_is_alpha (av->src);
+          const union value *v = case_data (input, av->src);
+          int src_width = var_get_width (av->src);
+          const struct substring vs = (src_width > 0
+                                       ? value_ss (v, src_width)
+                                       : ss_empty ());
+
+          if (var_is_value_missing (av->src, v) & av->exclude)
+            {
+              switch (av->function)
+                {
+                case AGRF_NMISS:
+                  av->dbl += weight;
+                  break;
+
+                case AGRF_NUMISS:
+                  av->int1++;
+                  break;
+
+                case AGRF_SUM:
+                case AGRF_MEAN:
+                case AGRF_MEDIAN:
+                case AGRF_SD:
+                case AGRF_MAX:
+                case AGRF_MIN:
+                case AGRF_PGT:
+                case AGRF_PLT:
+                case AGRF_PIN:
+                case AGRF_POUT:
+                case AGRF_FGT:
+                case AGRF_FLT:
+                case AGRF_FIN:
+                case AGRF_FOUT:
+                case AGRF_CGT:
+                case AGRF_CLT:
+                case AGRF_CIN:
+                case AGRF_COUT:
+                case AGRF_N:
+                case AGRF_NU:
+                case AGRF_FIRST:
+                case AGRF_LAST:
+                  break;
+                }
+              av->saw_missing = true;
+              continue;
+            }
+
+          /* This is horrible.  There are too many possibilities. */
+          av->W += weight;
+          switch (av->function)
+            {
+            case AGRF_SUM:
+              av->dbl += v->f * weight;
+              av->int1 = 1;
+              break;
+
+            case AGRF_MEAN:
+              av->dbl += v->f * weight;
+              break;
+
+            case AGRF_MEDIAN:
+              {
+                struct ccase *cout = case_create (casewriter_get_proto (av->writer));
+                *case_num_rw (cout, av->subject) = case_num (input, av->src);
+                *case_num_rw (cout, av->weight) = weight;
+                casewriter_write (av->writer, cout);
+              }
+              break;
+
+            case AGRF_SD:
+              moments1_add (av->moments, v->f, weight);
+              break;
+
+            case AGRF_MAX:
+              if (!is_string)
+                av->dbl = MAX (av->dbl, v->f);
+              else if (memcmp (av->string, v->s, src_width) < 0)
+                memcpy (av->string, v->s, src_width);
+              av->int1 = 1;
+              break;
+
+            case AGRF_MIN:
+              if (!is_string)
+                av->dbl = MIN (av->dbl, v->f);
+              else if (memcmp (av->string, v->s, src_width) > 0)
+                memcpy (av->string, v->s, src_width);
+              av->dbl = MIN (av->dbl, v->f);
+              av->int1 = 1;
+              break;
+
+            case AGRF_FGT:
+            case AGRF_PGT:
+            case AGRF_CGT:
+              if (is_string
+                  ? ss_compare_rpad (av->arg[0].s, vs) < 0
+                  : v->f > av->arg[0].f)
+                av->dbl += weight;
+              break;
+
+            case AGRF_FLT:
+            case AGRF_PLT:
+            case AGRF_CLT:
+              if (is_string
+                  ? ss_compare_rpad (av->arg[0].s, vs) > 0
+                  : v->f < av->arg[0].f)
+                av->dbl += weight;
+              break;
+
+            case AGRF_FIN:
+            case AGRF_PIN:
+            case AGRF_CIN:
+              if (is_string
+                  ? (ss_compare_rpad (av->arg[0].s, vs) <= 0
+                     && ss_compare_rpad (av->arg[1].s, vs) >= 0)
+                  : av->arg[0].f <= v->f && v->f <= av->arg[1].f)
+                av->dbl += weight;
+              break;
+
+            case AGRF_FOUT:
+            case AGRF_POUT:
+            case AGRF_COUT:
+              if (is_string
+                  ? (ss_compare_rpad (av->arg[0].s, vs) > 0
+                     || ss_compare_rpad (av->arg[1].s, vs) < 0)
+                  : av->arg[0].f > v->f || v->f > av->arg[1].f)
+                av->dbl += weight;
+              break;
+
+            case AGRF_N:
+              av->dbl += weight;
+              break;
+
+            case AGRF_NU:
+              av->int1++;
+              break;
+
+            case AGRF_FIRST:
+              if (av->int1 == 0)
+                {
+                  if (is_string)
+                    memcpy (av->string, v->s, src_width);
+                  else
+                    av->dbl = v->f;
+                  av->int1 = 1;
+                }
+              break;
+
+            case AGRF_LAST:
+              if (is_string)
+                memcpy (av->string, v->s, src_width);
+              else
+                av->dbl = v->f;
+              av->int1 = 1;
+              break;
+
+            case AGRF_NMISS:
+            case AGRF_NUMISS:
+              /* Our value is not missing or it would have been
+                 caught earlier.  Nothing to do. */
+              break;
+            }
+        }
+      else
+        {
+          av->W += weight;
+          switch (av->function)
+            {
+            case AGRF_N:
+              break;
+
+            case AGRF_NU:
+              av->int1++;
+              break;
+
+            case AGRF_SUM:
+            case AGRF_MEAN:
+            case AGRF_MEDIAN:
+            case AGRF_SD:
+            case AGRF_MAX:
+            case AGRF_MIN:
+            case AGRF_PGT:
+            case AGRF_PLT:
+            case AGRF_PIN:
+            case AGRF_POUT:
+            case AGRF_FGT:
+            case AGRF_FLT:
+            case AGRF_FIN:
+            case AGRF_FOUT:
+            case AGRF_CGT:
+            case AGRF_CLT:
+            case AGRF_CIN:
+            case AGRF_COUT:
+            case AGRF_NMISS:
+            case AGRF_NUMISS:
+            case AGRF_FIRST:
+            case AGRF_LAST:
+              NOT_REACHED ();
+            }
+        }
+    }
+}
+
+/* Writes an aggregated record to OUTPUT. */
+static void
+dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
+{
+  struct ccase *c = case_create (dict_get_proto (agr->dict));
+
+  if (agr->add_variables)
+    {
+      case_copy (c, 0, break_case, 0, dict_get_n_vars (agr->src_dict));
+    }
+  else
+    {
+      int value_idx = 0;
+
+      for (size_t i = 0; i < agr->break_n_vars; i++)
+       {
+         const struct variable *v = agr->break_vars[i];
+         value_copy (case_data_rw_idx (c, value_idx),
+                     case_data (break_case, v),
+                     var_get_width (v));
+         value_idx++;
+       }
+    }
+
+  for (size_t i = 0; i < agr->n_agr_vars; i++)
+    {
+      struct agr_var *av = &agr->agr_vars[i];
+      union value *v = case_data_rw (c, av->dest);
+      int width = var_get_width (av->dest);
+
+      if (agr->missing == COLUMNWISE && av->saw_missing
+          && av->function != AGRF_N
+          && av->function != AGRF_NU
+          && av->function != AGRF_NMISS
+          && av->function != AGRF_NUMISS)
+        {
+          value_set_missing (v, width);
+          casewriter_destroy (av->writer);
+          continue;
+        }
+
+      switch (av->function)
+        {
+        case AGRF_SUM:
+          v->f = av->int1 ? av->dbl : SYSMIS;
+          break;
+
+        case AGRF_MEAN:
+          v->f = av->W != 0.0 ? av->dbl / av->W : SYSMIS;
+          break;
+
+        case AGRF_MEDIAN:
+          {
+            if (av->writer)
+              {
+                struct percentile *median = percentile_create (0.5, av->W);
+                struct order_stats *os = &median->parent;
+                struct casereader *sorted_reader = casewriter_make_reader (av->writer);
+                av->writer = NULL;
+
+                order_stats_accumulate (&os, 1,
+                                        sorted_reader,
+                                        av->weight,
+                                        av->subject,
+                                        av->exclude);
+                av->dbl = percentile_calculate (median, PC_HAVERAGE);
+                statistic_destroy (&median->parent.parent);
+              }
+            v->f = av->dbl;
+          }
+          break;
+
+        case AGRF_SD:
+          {
+            double variance;
+
+            moments1_calculate (av->moments, NULL, NULL, &variance,
+                                NULL, NULL);
+            v->f = variance != SYSMIS ? sqrt (variance) : SYSMIS;
+          }
+          break;
+
+        case AGRF_MAX:
+        case AGRF_MIN:
+        case AGRF_FIRST:
+        case AGRF_LAST:
+          if (!width)
+            v->f = av->int1 ? av->dbl : SYSMIS;
+          else
+            {
+              if (av->int1)
+                memcpy (v->s, av->string, width);
+              else
+                value_set_missing (v, width);
+            }
+          break;
+
+        case AGRF_FGT:
+        case AGRF_FLT:
+        case AGRF_FIN:
+        case AGRF_FOUT:
+          v->f = av->W ? av->dbl / av->W : SYSMIS;
+          break;
+
+        case AGRF_PGT:
+        case AGRF_PLT:
+        case AGRF_PIN:
+        case AGRF_POUT:
+          v->f = av->W ? av->dbl / av->W * 100.0 : SYSMIS;
+          break;
+
+        case AGRF_CGT:
+        case AGRF_CLT:
+        case AGRF_CIN:
+        case AGRF_COUT:
+          v->f = av->dbl;
+          break;
+
+        case AGRF_N:
+          v->f = av->W;
+          break;
+
+        case AGRF_NU:
+        case AGRF_NUMISS:
+          v->f = av->int1;
+          break;
+
+        case AGRF_NMISS:
+          v->f = av->dbl;
+          break;
+        }
+    }
+
+  casewriter_write (output, c);
+}
+
+/* Resets the state for all the aggregate functions. */
+static void
+initialize_aggregate_info (struct agr_proc *agr)
+{
+  for (size_t i = 0; i < agr->n_agr_vars; i++)
+    {
+      struct agr_var *av = &agr->agr_vars[i];
+      av->saw_missing = false;
+      av->dbl = av->W = 0.0;
+      av->int1 = 0;
+
+      int width = av->src ? var_get_width (av->src) : 0;
+      switch (av->function)
+       {
+       case AGRF_MIN:
+          if (!width)
+            av->dbl = DBL_MAX;
+          else
+            memset (av->string, 255, width);
+         break;
+
+       case AGRF_MAX:
+          if (!width)
+            av->dbl = -DBL_MAX;
+         else
+            memset (av->string, 0, width);
+         break;
+
+       case AGRF_MEDIAN:
+         {
+            struct caseproto *proto = caseproto_create ();
+            proto = caseproto_add_width (proto, 0);
+            proto = caseproto_add_width (proto, 0);
+
+           if (! av->subject)
+             av->subject = dict_create_internal_var (0, 0);
+
+           if (! av->weight)
+             av->weight = dict_create_internal_var (1, 0);
+
+            struct subcase ordering;
+            subcase_init_var (&ordering, av->subject, SC_ASCEND);
+           av->writer = sort_create_writer (&ordering, proto);
+            subcase_uninit (&ordering);
+            caseproto_unref (proto);
+         }
+         break;
+
+        case AGRF_SD:
+          if (av->moments == NULL)
+            av->moments = moments1_create (MOMENT_VARIANCE);
+          else
+            moments1_clear (av->moments);
+          break;
+
+        case AGRF_SUM:
+        case AGRF_MEAN:
+        case AGRF_PGT:
+        case AGRF_PLT:
+        case AGRF_PIN:
+        case AGRF_POUT:
+        case AGRF_FGT:
+        case AGRF_FLT:
+        case AGRF_FIN:
+        case AGRF_FOUT:
+        case AGRF_CGT:
+        case AGRF_CLT:
+        case AGRF_CIN:
+        case AGRF_COUT:
+        case AGRF_N:
+        case AGRF_NU:
+        case AGRF_NMISS:
+        case AGRF_NUMISS:
+        case AGRF_FIRST:
+        case AGRF_LAST:
+          break;
+       }
+    }
+}