Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / language / stats / aggregate.c
index a388b2d948a45d955b3e7b2b4cddae3b6db7e63f..0d181bd45e3999ab1c45b8267e713f6cf1aeb797 100644 (file)
@@ -1,35 +1,34 @@
-/* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 1997-9, 2000, 2006, 2008, 2009 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
 #include <stdlib.h>
 
 #include <data/any-writer.h>
-#include <data/case-sink.h>
 #include <data/case.h>
-#include <data/casefile.h>
+#include <data/casegrouper.h>
+#include <data/casereader.h>
+#include <data/casewriter.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
+#include <data/format.h>
 #include <data/procedure.h>
 #include <data/settings.h>
-#include <data/storage-stream.h>
+#include <data/subcase.h>
 #include <data/sys-file-writer.h>
 #include <data/variable.h>
 #include <language/command.h>
@@ -37,7 +36,6 @@
 #include <language/lexer/lexer.h>
 #include <language/lexer/variable-parser.h>
 #include <language/stats/sort-criteria.h>
-#include <libpspp/alloc.h>
 #include <libpspp/assertion.h>
 #include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 #include <math/moments.h>
 #include <math/sort.h>
+#include <math/statistic.h>
+#include <math/percentiles.h>
+
+#include "minmax.h"
+#include "xalloc.h"
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -62,24 +65,29 @@ struct agr_var
     struct agr_var *next;              /* Next in list. */
 
     /* Collected during parsing. */
-    struct variable *src;      /* Source variable. */
+    const struct variable *src;        /* Source variable. */
     struct variable *dest;     /* Target variable. */
     int function;              /* Function. */
-    int include_missing;       /* 1=Include user-missing values. */
+    enum mv_class exclude;      /* Classes of missing values to exclude. */
     union agr_argument arg[2]; /* Arguments. */
 
     /* Accumulated during AGGREGATE execution. */
     double dbl[3];
     int int1, int2;
     char *string;
-    int missing;
+    bool saw_missing;
     struct moments1 *moments;
+    double cc;
+
+    struct variable *subject;
+    struct variable *weight;
+    struct casewriter *writer;
   };
 
 /* Aggregation functions. */
 enum
   {
-    NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
+    NONE, SUM, MEAN, MEDIAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
     FUNC = 0x1f, /* Function mask. */
@@ -91,36 +99,37 @@ struct agr_func
   {
     const char *name;          /* Aggregation function name. */
     size_t n_args;              /* Number of arguments. */
-    int alpha_type;            /* When given ALPHA arguments, output type. */
+    enum val_type alpha_type;   /* When given ALPHA arguments, output type. */
     struct fmt_spec format;    /* Format spec if alpha_type != ALPHA. */
   };
 
 /* Attributes of aggregation functions. */
-static const struct agr_func agr_func_tab[] = 
+static const struct agr_func agr_func_tab[] =
   {
-    {"<NONE>",  0, -1,      {0, 0, 0}},
-    {"SUM",     0, -1,      {FMT_F, 8, 2}},
-    {"MEAN",   0, -1,      {FMT_F, 8, 2}},
-    {"SD",      0, -1,      {FMT_F, 8, 2}},
-    {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
-    {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
-    {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
-    {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
-    {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
-    {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
-    {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
-    {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
-    {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
-    {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
-    {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
-    {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
-    {"LAST",    0, ALPHA,   {-1, -1, -1}},
-    {NULL,      0, -1,      {-1, -1, -1}},
-    {"N",       0, NUMERIC, {FMT_F, 7, 0}},
-    {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
+    {"<NONE>",  0, -1,          {0, 0, 0}},
+    {"SUM",     0, -1,          {FMT_F, 8, 2}},
+    {"MEAN",   0, -1,          {FMT_F, 8, 2}},
+    {"MEDIAN", 0, -1,          {FMT_F, 8, 2}},
+    {"SD",      0, -1,          {FMT_F, 8, 2}},
+    {"MAX",     0, VAL_STRING,  {-1, -1, -1}},
+    {"MIN",     0, VAL_STRING,  {-1, -1, -1}},
+    {"PGT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
+    {"PLT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
+    {"PIN",     2, VAL_NUMERIC, {FMT_F, 5, 1}},
+    {"POUT",    2, VAL_NUMERIC, {FMT_F, 5, 1}},
+    {"FGT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
+    {"FLT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
+    {"FIN",     2, VAL_NUMERIC, {FMT_F, 5, 3}},
+    {"FOUT",    2, VAL_NUMERIC, {FMT_F, 5, 3}},
+    {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
+    {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
+    {"NMISS",   0, VAL_NUMERIC, {FMT_F, 7, 0}},
+    {"NUMISS",  0, VAL_NUMERIC, {FMT_F, 7, 0}},
+    {"FIRST",   0, VAL_STRING,  {-1, -1, -1}},
+    {"LAST",    0, VAL_STRING,  {-1, -1, -1}},
+    {NULL,      0, -1,          {-1, -1, -1}},
+    {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
+    {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
   };
 
 /* Missing value types. */
@@ -131,114 +140,111 @@ enum missing_treatment
   };
 
 /* An entire AGGREGATE procedure. */
-struct agr_proc 
+struct agr_proc
   {
-    /* We have either an output file or a sink. */
-    struct any_writer *writer;          /* Output file, or null if none. */
-    struct case_sink *sink;             /* Sink, or null if none. */
-
     /* Break variables. */
-    struct sort_criteria *sort;         /* Sort criteria. */
-    struct variable **break_vars;       /* Break variables. */
+    struct subcase sort;                /* Sort criteria (break variables). */
+    const struct variable **break_vars;       /* Break variables. */
     size_t break_var_cnt;               /* Number of break variables. */
-    struct ccase break_case;            /* Last values of break variables. */
+    struct ccase *break_case;           /* Last values of break variables. */
 
     enum missing_treatment missing;     /* How to treat missing values. */
     struct agr_var *agr_vars;           /* First aggregate variable. */
     struct dictionary *dict;            /* Aggregate dictionary. */
+    const struct dictionary *src_dict;  /* Dict of the source */
     int case_cnt;                       /* Counts aggregated cases. */
-    struct ccase agr_case;              /* Aggregate case for output. */
   };
 
 static void initialize_aggregate_info (struct agr_proc *,
                                        const struct ccase *);
 
+static void accumulate_aggregate_info (struct agr_proc *,
+                                       const struct ccase *);
 /* Prototypes. */
-static bool parse_aggregate_functions (struct agr_proc *);
+static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
+                                      struct agr_proc *);
 static void agr_destroy (struct agr_proc *);
-static bool aggregate_single_case (struct agr_proc *agr,
-                                  const struct ccase *input,
-                                  struct ccase *output);
-static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
+static void dump_aggregate_info (struct agr_proc *agr,
+                                 struct casewriter *output);
 \f
 /* Parsing. */
 
 /* Parses and executes the AGGREGATE procedure. */
 int
-cmd_aggregate (void)
+cmd_aggregate (struct lexer *lexer, struct dataset *ds)
 {
+  struct dictionary *dict = dataset_dict (ds);
   struct agr_proc agr;
   struct file_handle *out_file = NULL;
+  struct casereader *input = NULL, *group;
+  struct casegrouper *grouper;
+  struct casewriter *output = NULL;
 
   bool copy_documents = false;
   bool presorted = false;
   bool saw_direction;
+  bool ok;
 
   memset(&agr, 0 , sizeof (agr));
   agr.missing = ITEMWISE;
-  case_nullify (&agr.break_case);
-  
+  agr.break_case = NULL;
+
   agr.dict = dict_create ();
-  dict_set_label (agr.dict, dict_get_label (dataset_dict (current_dataset)));
-  dict_set_documents (agr.dict, dict_get_documents (dataset_dict (current_dataset)));
+  agr.src_dict = dict;
+  subcase_init_empty (&agr.sort);
+  dict_set_label (agr.dict, dict_get_label (dict));
+  dict_set_documents (agr.dict, dict_get_documents (dict));
 
   /* OUTFILE subcommand must be first. */
-  if (!lex_force_match_id ("OUTFILE"))
+  if (!lex_force_match_id (lexer, "OUTFILE"))
     goto error;
-  lex_match ('=');
-  if (!lex_match ('*'))
+  lex_match (lexer, '=');
+  if (!lex_match (lexer, '*'))
     {
-      out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
+      out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
       if (out_file == NULL)
         goto error;
     }
-  
+
   /* Read most of the subcommands. */
   for (;;)
     {
-      lex_match ('/');
-      
-      if (lex_match_id ("MISSING"))
+      lex_match (lexer, '/');
+
+      if (lex_match_id (lexer, "MISSING"))
        {
-         lex_match ('=');
-         if (!lex_match_id ("COLUMNWISE"))
+         lex_match (lexer, '=');
+         if (!lex_match_id (lexer, "COLUMNWISE"))
            {
-             lex_error (_("while expecting COLUMNWISE"));
+             lex_error (lexer, _("while expecting COLUMNWISE"));
               goto error;
            }
          agr.missing = COLUMNWISE;
        }
-      else if (lex_match_id ("DOCUMENT"))
+      else if (lex_match_id (lexer, "DOCUMENT"))
         copy_documents = true;
-      else if (lex_match_id ("PRESORTED"))
+      else if (lex_match_id (lexer, "PRESORTED"))
         presorted = true;
-      else if (lex_match_id ("BREAK"))
+      else if (lex_match_id (lexer, "BREAK"))
        {
           int i;
 
-         lex_match ('=');
-          agr.sort = sort_parse_criteria (dataset_dict (current_dataset),
-                                          &agr.break_vars, &agr.break_var_cnt,
-                                          &saw_direction, NULL);
-          if (agr.sort == NULL)
+         lex_match (lexer, '=');
+          if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
+                                    &saw_direction))
             goto error;
-         
+          agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
+
           for (i = 0; i < agr.break_var_cnt; i++)
             dict_clone_var_assert (agr.dict, agr.break_vars[i],
-                                   agr.break_vars[i]->name);
+                                   var_get_name (agr.break_vars[i]));
 
           /* BREAK must follow the options. */
           break;
        }
       else
         {
-          lex_error (_("expecting BREAK"));
+          lex_error (lexer, _("expecting BREAK"));
           goto error;
         }
     }
@@ -246,116 +252,106 @@ cmd_aggregate (void)
     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
                "with (A) or (D) has no effect.  Output data will be sorted "
                "the same way as the input data."));
-      
+
   /* Read in the aggregate functions. */
-  lex_match ('/');
-  if (!parse_aggregate_functions (&agr))
+  lex_match (lexer, '/');
+  if (!parse_aggregate_functions (lexer, dict, &agr))
     goto error;
 
   /* Delete documents. */
   if (!copy_documents)
-    dict_set_documents (agr.dict, NULL);
+    dict_clear_documents (agr.dict);
 
   /* Cancel SPLIT FILE. */
   dict_set_split_vars (agr.dict, NULL, 0);
-  
+
   /* Initialize. */
   agr.case_cnt = 0;
-  case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
 
-  /* Output to active file or external file? */
-  if (out_file == NULL) 
+  if (out_file == NULL)
     {
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
-      proc_cancel_temporary_transformations (current_dataset);
+      proc_cancel_temporary_transformations (ds);
+      proc_discard_output (ds);
+      output = autopaging_writer_create (dict_get_proto (agr.dict));
+    }
+  else
+    {
+      output = any_writer_open (out_file, agr.dict);
+      if (output == NULL)
+        goto error;
+    }
 
-      if (agr.sort != NULL && !presorted) 
+  input = proc_open (ds);
+  if (!subcase_is_empty (&agr.sort) && !presorted)
+    {
+      input = sort_execute (input, &agr.sort);
+      subcase_clear (&agr.sort);
+    }
+
+  for (grouper = casegrouper_create_vars (input, agr.break_vars,
+                                          agr.break_var_cnt);
+       casegrouper_get_next_group (grouper, &group);
+       casereader_destroy (group))
+    {
+      struct ccase *c = casereader_peek (group, 0);
+      if (c == NULL)
         {
-          if (!sort_active_file_in_place (agr.sort))
-            goto error;
+          casereader_destroy (group);
+          continue;
         }
+      initialize_aggregate_info (&agr, c);
+      case_unref (c);
+
+      for (; (c = casereader_read (group)) != NULL; case_unref (c))
+        accumulate_aggregate_info (&agr, c);
+      dump_aggregate_info (&agr, output);
+    }
+  if (!casegrouper_destroy (grouper))
+    goto error;
 
-      agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
-      if (agr.sink->class->open != NULL)
-        agr.sink->class->open (agr.sink);
-      proc_set_sink (current_dataset, 
-                    create_case_sink (&null_sink_class, 
-                                      dataset_dict (current_dataset), NULL));
-      if (!procedure (current_dataset,agr_to_active_file, &agr))
+  if (!proc_commit (ds))
+    {
+      input = NULL;
+      goto error;
+    }
+  input = NULL;
+
+  if (out_file == NULL)
+    {
+      struct casereader *next_input = casewriter_make_reader (output);
+      if (next_input == NULL)
         goto error;
-      if (agr.case_cnt > 0) 
-        {
-          dump_aggregate_info (&agr, &agr.agr_case);
-          if (!agr.sink->class->write (agr.sink, &agr.agr_case))
-            goto error;
-        }
-      discard_variables (current_dataset);
-      dict_destroy (dataset_dict (current_dataset));
-      dataset_set_dict (current_dataset, agr.dict);
+
+      proc_set_active_file (ds, next_input, agr.dict);
       agr.dict = NULL;
-      proc_set_source (current_dataset, 
-                      agr.sink->class->make_source (agr.sink));
-      free_case_sink (agr.sink);
     }
   else
     {
-      agr.writer = any_writer_open (out_file, agr.dict);
-      if (agr.writer == NULL)
-        goto error;
-      
-      if (agr.sort != NULL && !presorted) 
-        {
-          /* Sorting is needed. */
-          struct casefile *dst;
-          struct casereader *reader;
-          struct ccase c;
-          bool ok = true;
-          
-          dst = sort_active_file_to_casefile (agr.sort);
-          if (dst == NULL)
-            goto error;
-          reader = casefile_get_destructive_reader (dst);
-          while (ok && casereader_read_xfer (reader, &c)) 
-            {
-              if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
-                ok = any_writer_write (agr.writer, &agr.agr_case);
-              case_destroy (&c);
-            }
-          casereader_destroy (reader);
-          if (ok)
-            ok = !casefile_error (dst);
-          casefile_destroy (dst);
-          if (!ok)
-            goto error;
-        }
-      else 
-        {
-          /* Active file is already sorted. */
-          if (!procedure (current_dataset,presorted_agr_to_sysfile, &agr))
-            goto error;
-        }
-      
-      if (agr.case_cnt > 0) 
-        {
-          dump_aggregate_info (&agr, &agr.agr_case);
-          any_writer_write (agr.writer, &agr.agr_case);
-        }
-      if (any_writer_error (agr.writer))
+      ok = casewriter_destroy (output);
+      output = NULL;
+      if (!ok)
         goto error;
     }
-  
+
   agr_destroy (&agr);
+  fh_unref (out_file);
   return CMD_SUCCESS;
 
 error:
+  if (input != NULL)
+    proc_commit (ds);
+  casewriter_destroy (output);
   agr_destroy (&agr);
+  fh_unref (out_file);
   return CMD_CASCADING_FAILURE;
 }
 
 /* Parse all the aggregate functions. */
 static bool
-parse_aggregate_functions (struct agr_proc *agr)
+parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
+                          struct agr_proc *agr)
 {
   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
 
@@ -366,14 +362,15 @@ parse_aggregate_functions (struct agr_proc *agr)
       char **dest;
       char **dest_label;
       size_t n_dest;
+      struct string function_name;
 
-      int include_missing;
+      enum mv_class exclude;
       const struct agr_func *function;
       int func_index;
 
       union agr_argument arg[2];
 
-      struct variable **src;
+      const struct variable **src;
       size_t n_src;
 
       size_t i;
@@ -386,13 +383,14 @@ parse_aggregate_functions (struct agr_proc *agr)
       n_src = 0;
       arg[0].c = NULL;
       arg[1].c = NULL;
+      ds_init_empty (&function_name);
 
       /* Parse the list of target variables. */
-      while (!lex_match ('='))
+      while (!lex_match (lexer, '='))
        {
          size_t n_dest_prev = n_dest;
-         
-         if (!parse_DATA_LIST_vars (&dest, &n_dest,
+
+         if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
            goto error;
 
@@ -404,42 +402,52 @@ parse_aggregate_functions (struct agr_proc *agr)
            for (j = n_dest_prev; j < n_dest; j++)
              dest_label[j] = NULL;
          }
-         
-         if (token == T_STRING)
+
+
+
+         if (lex_token (lexer) == T_STRING)
            {
-             ds_truncate (&tokstr, 255);
-             dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
-             lex_get ();
+             struct string label;
+             ds_init_string (&label, lex_tokstr (lexer));
+
+             ds_truncate (&label, 255);
+             dest_label[n_dest - 1] = ds_xstrdup (&label);
+             lex_get (lexer);
+             ds_destroy (&label);
            }
        }
 
       /* Get the name of the aggregation function. */
-      if (token != T_ID)
+      if (lex_token (lexer) != T_ID)
        {
-         lex_error (_("expecting aggregation function"));
+         lex_error (lexer, _("expecting aggregation function"));
          goto error;
        }
 
-      include_missing = 0;
-      if (tokid[strlen (tokid) - 1] == '.')
-       {
-         include_missing = 1;
-         tokid[strlen (tokid) - 1] = 0;
-       }
-      
+      exclude = MV_ANY;
+
+      ds_assign_string (&function_name, lex_tokstr (lexer));
+
+      ds_chomp (&function_name, '.');
+
+      if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
+        exclude = MV_SYSTEM;
+
       for (function = agr_func_tab; function->name; function++)
-       if (!strcasecmp (function->name, tokid))
+       if (!strcasecmp (function->name, ds_cstr (&function_name)))
          break;
       if (NULL == function->name)
        {
-         msg (SE, _("Unknown aggregation function %s."), tokid);
+         msg (SE, _("Unknown aggregation function %s."),
+              ds_cstr (&function_name));
          goto error;
        }
+      ds_destroy (&function_name);
       func_index = function - agr_func_tab;
-      lex_get ();
+      lex_get (lexer);
 
       /* Check for leading lparen. */
-      if (!lex_match ('('))
+      if (!lex_match (lexer, '('))
        {
          if (func_index == N)
            func_index = N_NO_VARS;
@@ -447,7 +455,7 @@ parse_aggregate_functions (struct agr_proc *agr)
            func_index = NU_NO_VARS;
          else
            {
-             lex_error (_("expecting `('"));
+             lex_error (lexer, _("expecting `('"));
              goto error;
            }
        }
@@ -462,7 +470,7 @@ parse_aggregate_functions (struct agr_proc *agr)
            else if (function->n_args)
              pv_opts |= PV_SAME_TYPE;
 
-           if (!parse_variables (dataset_dict (current_dataset), &src, &n_src, pv_opts))
+           if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
              goto error;
          }
 
@@ -472,26 +480,28 @@ parse_aggregate_functions (struct agr_proc *agr)
            for (i = 0; i < function->n_args; i++)
              {
                int type;
-           
-               lex_match (',');
-               if (token == T_STRING)
+
+               lex_match (lexer, ',');
+               if (lex_token (lexer) == T_STRING)
                  {
-                   arg[i].c = ds_xstrdup (&tokstr);
-                   type = ALPHA;
+                   arg[i].c = ds_xstrdup (lex_tokstr (lexer));
+                   type = VAL_STRING;
                  }
-               else if (lex_is_number ())
+               else if (lex_is_number (lexer))
                  {
-                   arg[i].f = tokval;
-                   type = NUMERIC;
-                 } else {
-                   msg (SE, _("Missing argument %d to %s."), i + 1,
-                         function->name);
+                   arg[i].f = lex_tokval (lexer);
+                   type = VAL_NUMERIC;
+                 }
+                else
+                  {
+                   msg (SE, _("Missing argument %zu to %s."),
+                         i + 1, function->name);
                    goto error;
                  }
-           
-               lex_get ();
 
-               if (type != src[0]->type)
+               lex_get (lexer);
+
+               if (type != var_get_type (src[0]))
                  {
                    msg (SE, _("Arguments to %s must be of same type as "
                               "source variables."),
@@ -501,12 +511,12 @@ parse_aggregate_functions (struct agr_proc *agr)
              }
 
          /* Trailing rparen. */
-         if (!lex_match(')'))
+         if (!lex_match (lexer, ')'))
            {
-             lex_error (_("expecting `)'"));
+             lex_error (lexer, _("expecting `)'"));
              goto error;
            }
-         
+
          /* Now check that the number of source variables match
             the number of target variables.  If we check earlier
             than this, the user can get very misleading error
@@ -515,34 +525,34 @@ parse_aggregate_functions (struct agr_proc *agr)
             like `unknown variable t'. */
          if (n_src != n_dest)
            {
-             msg (SE, _("Number of source variables (%u) does not match "
-                        "number of target variables (%u)."),
-                  (unsigned) n_src, (unsigned) n_dest);
+             msg (SE, _("Number of source variables (%zu) does not match "
+                        "number of target variables (%zu)."),
+                   n_src, n_dest);
              goto error;
            }
 
           if ((func_index == PIN || func_index == POUT
-              || func_index == FIN || func_index == FOUT) 
-              && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
-                  || (src[0]->type == ALPHA
-                      && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
+              || func_index == FIN || func_index == FOUT)
+              && (var_is_numeric (src[0])
+                  ? arg[0].f > arg[1].f
+                  : str_compare_rpad (arg[0].c, arg[1].c) > 0))
             {
               union agr_argument t = arg[0];
               arg[0] = arg[1];
               arg[1] = t;
-                  
+
               msg (SW, _("The value arguments passed to the %s function "
                          "are out-of-order.  They will be treated as if "
                          "they had been specified in the correct order."),
                    function->name);
             }
        }
-       
+
       /* Finally add these to the linked list of aggregation
          variables. */
       for (i = 0; i < n_dest; i++)
        {
-         struct agr_var *v = xmalloc (sizeof *v);
+         struct agr_var *v = xzalloc (sizeof *v);
 
          /* Add variable to chain. */
          if (agr->agr_vars != NULL)
@@ -552,50 +562,53 @@ parse_aggregate_functions (struct agr_proc *agr)
           tail = v;
          tail->next = NULL;
           v->moments = NULL;
-         
+
          /* Create the target variable in the aggregate
              dictionary. */
          {
            struct variable *destvar;
-           
+
            v->function = func_index;
 
            if (src)
              {
                v->src = src[i];
-               
-               if (src[i]->type == ALPHA)
+
+               if (var_is_alpha (src[i]))
                  {
                    v->function |= FSTRING;
-                   v->string = xmalloc (src[i]->width);
+                   v->string = xmalloc (var_get_width (src[i]));
                  }
 
-               if (function->alpha_type == ALPHA)
+               if (function->alpha_type == VAL_STRING)
                  destvar = dict_clone_var (agr->dict, v->src, dest[i]);
                else
                   {
-                    assert (v->src->type == NUMERIC
-                            || function->alpha_type == NUMERIC);
+                    assert (var_is_numeric (v->src)
+                            || function->alpha_type == VAL_NUMERIC);
                     destvar = dict_create_var (agr->dict, dest[i], 0);
-                    if (destvar != NULL) 
+                    if (destvar != NULL)
                       {
+                        struct fmt_spec f;
                         if ((func_index == N || func_index == NMISS)
-                            && dict_get_weight (dataset_dict (current_dataset)) != NULL)
-                          destvar->print = destvar->write = f8_2; 
+                            && dict_get_weight (dict) != NULL)
+                          f = fmt_for_output (FMT_F, 8, 2);
                         else
-                          destvar->print = destvar->write = function->format;
+                          f = function->format;
+                        var_set_both_formats (destvar, &f);
                       }
                   }
              } else {
+                struct fmt_spec f;
                v->src = NULL;
                destvar = dict_create_var (agr->dict, dest[i], 0);
-                if (func_index == N_NO_VARS
-                    && dict_get_weight (dataset_dict (current_dataset)) != NULL)
-                  destvar->print = destvar->write = f8_2; 
+                if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
+                  f = fmt_for_output (FMT_F, 8, 2);
                 else
-                  destvar->print = destvar->write = function->format;
+                  f = function->format;
+                var_set_both_formats (destvar, &f);
              }
-         
+
            if (!destvar)
              {
                msg (SE, _("Variable name %s is not unique within the "
@@ -608,21 +621,18 @@ parse_aggregate_functions (struct agr_proc *agr)
 
            free (dest[i]);
            if (dest_label[i])
-             {
-               destvar->label = dest_label[i];
-               dest_label[i] = NULL;
-             }
+              var_set_label (destvar, dest_label[i]);
 
            v->dest = destvar;
          }
-         
-         v->include_missing = include_missing;
+
+         v->exclude = exclude;
 
          if (v->src != NULL)
            {
              int j;
 
-             if (v->src->type == NUMERIC)
+             if (var_is_numeric (v->src))
                for (j = 0; j < function->n_args; j++)
                  v->arg[j].f = arg[j].f;
              else
@@ -630,8 +640,8 @@ parse_aggregate_functions (struct agr_proc *agr)
                  v->arg[j].c = xstrdup (arg[j].c);
            }
        }
-      
-      if (src != NULL && src[0]->type == ALPHA)
+
+      if (src != NULL && var_is_alpha (src[0]))
        for (i = 0; i < function->n_args; i++)
          {
            free (arg[i].c);
@@ -642,17 +652,18 @@ parse_aggregate_functions (struct agr_proc *agr)
       free (dest);
       free (dest_label);
 
-      if (!lex_match ('/'))
+      if (!lex_match (lexer, '/'))
        {
-         if (token == '.')
+         if (lex_token (lexer) == '.')
            return true;
 
-         lex_error ("expecting end of command");
+         lex_error (lexer, "expecting end of command");
          return false;
        }
       continue;
-      
+
     error:
+      ds_destroy (&function_name);
       for (i = 0; i < n_dest; i++)
        {
          free (dest[i]);
@@ -662,14 +673,14 @@ parse_aggregate_functions (struct agr_proc *agr)
       free (dest_label);
       free (arg[0].c);
       free (arg[1].c);
-      if (src && n_src && src[0]->type == ALPHA)
+      if (src && n_src && var_is_alpha (src[0]))
        for (i = 0; i < function->n_args; i++)
          {
            free (arg[i].c);
            arg[i].c = NULL;
          }
       free (src);
-       
+
       return false;
     }
 }
@@ -680,11 +691,9 @@ agr_destroy (struct agr_proc *agr)
 {
   struct agr_var *iter, *next;
 
-  any_writer_close (agr->writer);
-  if (agr->sort != NULL)
-    sort_destroy_criteria (agr->sort);
+  subcase_destroy (&agr->sort);
   free (agr->break_vars);
-  case_destroy (&agr->break_case);
+  case_unref (agr->break_case);
   for (iter = agr->agr_vars; iter; iter = next)
     {
       next = iter->next;
@@ -701,64 +710,35 @@ agr_destroy (struct agr_proc *agr)
        }
       else if (iter->function == SD)
         moments1_destroy (iter->moments);
+
+      var_destroy (iter->subject);
+      var_destroy (iter->weight);
+
       free (iter);
     }
   if (agr->dict != NULL)
     dict_destroy (agr->dict);
-
-  case_destroy (&agr->agr_case);
 }
 \f
 /* Execution. */
 
-static void accumulate_aggregate_info (struct agr_proc *,
-                                       const struct ccase *);
-static void dump_aggregate_info (struct agr_proc *, struct ccase *);
-
-/* Processes a single case INPUT for aggregation.  If output is
-   warranted, writes it to OUTPUT and returns true.
-   Otherwise, returns false and OUTPUT is unmodified. */
-static bool
-aggregate_single_case (struct agr_proc *agr,
-                       const struct ccase *input, struct ccase *output)
-{
-  bool finished_group = false;
-  
-  if (agr->case_cnt++ == 0)
-    initialize_aggregate_info (agr, input);
-  else if (case_compare (&agr->break_case, input,
-                         agr->break_vars, agr->break_var_cnt))
-    {
-      dump_aggregate_info (agr, output);
-      finished_group = true;
-
-      initialize_aggregate_info (agr, input);
-    }
-
-  accumulate_aggregate_info (agr, input);
-  return finished_group;
-}
-
 /* Accumulates aggregation data from the case INPUT. */
-static void 
-accumulate_aggregate_info (struct agr_proc *agr,
-                           const struct ccase *input)
+static void
+accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
 {
   struct agr_var *iter;
   double weight;
   bool bad_warn = true;
 
-  weight = dict_get_case_weight (dataset_dict (current_dataset), input, &bad_warn);
+  weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
 
   for (iter = agr->agr_vars; iter; iter = iter->next)
     if (iter->src)
       {
-       const union value *v = case_data (input, iter->src->fv);
+       const union value *v = case_data (input, iter->src);
+        int src_width = var_get_width (iter->src);
 
-       if ((!iter->include_missing
-             && mv_is_value_missing (&iter->src->miss, v))
-           || (iter->include_missing && iter->src->type == NUMERIC
-               && v->f == SYSMIS))
+        if (var_is_value_missing (iter->src, v, iter->exclude))
          {
            switch (iter->function)
              {
@@ -771,10 +751,10 @@ accumulate_aggregate_info (struct agr_proc *agr,
                iter->int1++;
                break;
              }
-           iter->missing = 1;
+           iter->saw_missing = true;
            continue;
          }
-       
+
        /* This is horrible.  There are too many possibilities. */
        switch (iter->function)
          {
@@ -786,25 +766,44 @@ accumulate_aggregate_info (struct agr_proc *agr,
             iter->dbl[0] += v->f * weight;
             iter->dbl[1] += weight;
             break;
+         case MEDIAN:
+           {
+             double wv ;
+             struct ccase *cout;
+
+              cout = case_create (casewriter_get_proto (iter->writer));
+
+             case_data_rw (cout, iter->subject)->f
+                = case_data (input, iter->src)->f;
+
+             wv = dict_get_case_weight (agr->src_dict, input, NULL);
+
+             case_data_rw (cout, iter->weight)->f = wv;
+
+             iter->cc += wv;
+
+             casewriter_write (iter->writer, cout);
+           }
+           break;
          case SD:
             moments1_add (iter->moments, v->f, weight);
             break;
          case MAX:
-           iter->dbl[0] = max (iter->dbl[0], v->f);
+           iter->dbl[0] = MAX (iter->dbl[0], v->f);
            iter->int1 = 1;
            break;
          case MAX | FSTRING:
-           if (memcmp (iter->string, v->s, iter->src->width) < 0)
-             memcpy (iter->string, v->s, iter->src->width);
+           if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
+             memcpy (iter->string, value_str (v, src_width), src_width);
            iter->int1 = 1;
            break;
          case MIN:
-           iter->dbl[0] = min (iter->dbl[0], v->f);
+           iter->dbl[0] = MIN (iter->dbl[0], v->f);
            iter->int1 = 1;
            break;
          case MIN | FSTRING:
-           if (memcmp (iter->string, v->s, iter->src->width) > 0)
-             memcpy (iter->string, v->s, iter->src->width);
+           if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
+             memcpy (iter->string, value_str (v, src_width), src_width);
            iter->int1 = 1;
            break;
          case FGT:
@@ -815,7 +814,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FGT | FSTRING:
          case PGT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
+            if (memcmp (iter->arg[0].c,
+                        value_str (v, src_width), src_width) < 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -827,7 +827,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FLT | FSTRING:
          case PLT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
+            if (memcmp (iter->arg[0].c,
+                        value_str (v, src_width), src_width) > 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -839,8 +840,10 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FIN | FSTRING:
          case PIN | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
-                && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
+            if (memcmp (iter->arg[0].c,
+                        value_str (v, src_width), src_width) <= 0
+                && memcmp (iter->arg[1].c,
+                           value_str (v, src_width), src_width) >= 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -852,8 +855,10 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FOUT | FSTRING:
          case POUT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
-                || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
+            if (memcmp (iter->arg[0].c,
+                        value_str (v, src_width), src_width) > 0
+                || memcmp (iter->arg[1].c,
+                           value_str (v, src_width), src_width) < 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -875,7 +880,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
          case FIRST | FSTRING:
            if (iter->int1 == 0)
              {
-               memcpy (iter->string, v->s, iter->src->width);
+               memcpy (iter->string, value_str (v, src_width), src_width);
                iter->int1 = 1;
              }
            break;
@@ -884,7 +889,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
            iter->int1 = 1;
            break;
          case LAST | FSTRING:
-           memcpy (iter->string, v->s, iter->src->width);
+           memcpy (iter->string, value_str (v, src_width), src_width);
            iter->int1 = 1;
            break;
           case NMISS:
@@ -912,44 +917,43 @@ accumulate_aggregate_info (struct agr_proc *agr,
     }
 }
 
-/* We've come to a record that differs from the previous in one or
-   more of the break variables.  Make an output record from the
-   accumulated statistics in the OUTPUT case. */
-static void 
-dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
+/* Writes an aggregated record to OUTPUT. */
+static void
+dump_aggregate_info (struct agr_proc *agr, struct casewriter *output)
 {
+  struct ccase *c = case_create (dict_get_proto (agr->dict));
+
   {
     int value_idx = 0;
     int i;
 
-    for (i = 0; i < agr->break_var_cnt; i++) 
+    for (i = 0; i < agr->break_var_cnt; i++)
       {
-        struct variable *v = agr->break_vars[i];
-        memcpy (case_data_rw (output, value_idx),
-                case_data (&agr->break_case, v->fv),
-                sizeof (union value) * v->nv);
-        value_idx += v->nv; 
+        const struct variable *v = agr->break_vars[i];
+        value_copy (case_data_rw_idx (c, value_idx),
+                    case_data (agr->break_case, v),
+                    var_get_width (v));
+        value_idx++;
       }
   }
-  
+
   {
     struct agr_var *i;
-  
+
     for (i = agr->agr_vars; i; i = i->next)
       {
-       union value *v = case_data_rw (output, i->dest->fv);
+       union value *v = case_data_rw (c, i->dest);
+        int width = var_get_width (i->dest);
 
-       if (agr->missing == COLUMNWISE && i->missing != 0
+       if (agr->missing == COLUMNWISE && i->saw_missing
            && (i->function & FUNC) != N && (i->function & FUNC) != NU
            && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
          {
-           if (i->dest->type == ALPHA)
-             memset (v->s, ' ', i->dest->width);
-           else
-             v->f = SYSMIS;
+            value_set_missing (v, width);
+           casewriter_destroy (i->writer);
            continue;
          }
-       
+
        switch (i->function)
          {
          case SUM:
@@ -958,6 +962,25 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          case MEAN:
            v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
            break;
+         case MEDIAN:
+           {
+             struct casereader *sorted_reader;
+             struct order_stats *median = percentile_create (0.5, i->cc);
+
+             sorted_reader = casewriter_make_reader (i->writer);
+
+             order_stats_accumulate (&median, 1,
+                                     sorted_reader,
+                                     i->weight,
+                                     i->subject,
+                                     i->exclude);
+
+             v->f = percentile_calculate ((struct percentile *) median,
+                                          PC_HAVERAGE);
+
+             statistic_destroy ((struct statistic *) median);
+           }
+           break;
          case SD:
             {
               double variance;
@@ -968,7 +991,7 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
               if (variance != SYSMIS)
                 v->f = sqrt (variance);
               else
-                v->f = SYSMIS; 
+                v->f = SYSMIS;
             }
            break;
          case MAX:
@@ -978,9 +1001,9 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          case MAX | FSTRING:
          case MIN | FSTRING:
            if (i->int1)
-             memcpy (v->s, i->string, i->dest->width);
+             memcpy (value_str_rw (v, width), i->string, width);
            else
-             memset (v->s, ' ', i->dest->width);
+              value_set_missing (v, width);
            break;
          case FGT:
          case FGT | FSTRING:
@@ -1017,9 +1040,9 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          case FIRST | FSTRING:
          case LAST | FSTRING:
            if (i->int1)
-             memcpy (v->s, i->string, i->dest->width);
+             memcpy (value_str_rw (v, width), i->string, width);
            else
-             memset (v->s, ' ', i->dest->width);
+              value_set_missing (v, width);
            break;
          case N_NO_VARS:
            v->f = i->dbl[0];
@@ -1040,6 +1063,8 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          }
       }
   }
+
+  casewriter_write (output, c);
 }
 
 /* Resets the state for all the aggregate functions. */
@@ -1048,12 +1073,12 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
 {
   struct agr_var *iter;
 
-  case_destroy (&agr->break_case);
-  case_clone (&agr->break_case, input);
+  case_unref (agr->break_case);
+  agr->break_case = case_ref (input);
 
   for (iter = agr->agr_vars; iter; iter = iter->next)
     {
-      iter->missing = 0;
+      iter->saw_missing = false;
       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
       iter->int1 = iter->int2 = 0;
       switch (iter->function)
@@ -1062,13 +1087,36 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
          iter->dbl[0] = DBL_MAX;
          break;
        case MIN | FSTRING:
-         memset (iter->string, 255, iter->src->width);
+         memset (iter->string, 255, var_get_width (iter->src));
          break;
        case MAX:
          iter->dbl[0] = -DBL_MAX;
          break;
        case MAX | FSTRING:
-         memset (iter->string, 0, iter->src->width);
+         memset (iter->string, 0, var_get_width (iter->src));
+         break;
+       case MEDIAN:
+         {
+            struct caseproto *proto;
+            struct subcase ordering;
+
+            proto = caseproto_create ();
+            proto = caseproto_add_width (proto, 0);
+            proto = caseproto_add_width (proto, 0);
+
+           if ( ! iter->subject)
+             iter->subject = var_create_internal (0);
+
+           if ( ! iter->weight)
+             iter->weight = var_create_internal (1);
+
+            subcase_init_var (&ordering, iter->subject, SC_ASCEND);
+           iter->writer = sort_create_writer (&ordering, proto);
+            subcase_destroy (&ordering);
+            caseproto_unref (proto);
+
+           iter->cc = 0;
+         }
          break;
         case SD:
           if (iter->moments == NULL)
@@ -1081,30 +1129,3 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
        }
     }
 }
-\f
-/* Aggregate each case as it comes through.  Cases which aren't needed
-   are dropped.
-   Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_)
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return agr->sink->class->write (agr->sink, &agr->agr_case);
-
-  return true;
-}
-
-/* Aggregate the current case and output it if we passed a
-   breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_) 
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return any_writer_write (agr->writer, &agr->agr_case);
-  
-  return true;
-}