Implemented long variable names a la spss V12.
[pspp-builds.git] / src / aggregate.c
index 193125087cce21539a372becbd14fd6580b3bf05..6da6242c670a0be3fad5dfaf321f1db73cfd2495 100644 (file)
    02111-1307, USA. */
 
 #include <config.h>
-#include <assert.h>
+#include "error.h"
 #include <stdlib.h>
 #include "alloc.h"
+#include "case.h"
+#include "casefile.h"
 #include "command.h"
+#include "dictionary.h"
 #include "error.h"
 #include "file-handle.h"
 #include "lexer.h"
 #include "misc.h"
+#include "moments.h"
 #include "pool.h"
 #include "settings.h"
-#include "sfm.h"
+#include "sfm-write.h"
 #include "sort.h"
-#include "stats.h"
 #include "str.h"
 #include "var.h"
 #include "vfm.h"
@@ -53,6 +56,7 @@ struct agr_var
     int int1, int2;
     char *string;
     int missing;
+    struct moments1 *moments;
   };
 
 /* Aggregation functions. */
@@ -113,17 +117,20 @@ enum missing_treatment
 struct agr_proc 
   {
     /* We have either an output file or a sink. */
-    struct file_handle *out_file;       /* Output file, or null if none. */
+    struct sfm_writer *writer;          /* Output file, or null if none. */
     struct case_sink *sink;             /* Sink, or null if none. */
 
+    /* Break variables. */
+    struct sort_criteria *sort;         /* Sort criteria. */
+    struct variable **break_vars;       /* Break variables. */
+    size_t break_var_cnt;               /* Number of break variables. */
+    union value *prev_break;            /* Last values of break variables. */
+
     enum missing_treatment missing;     /* How to treat missing values. */
-    struct sort_cases_pgm *sort;        /* Sort program. */
-    struct agr_var *vars;               /* First aggregate variable. */
+    struct agr_var *agr_vars;           /* First aggregate variable. */
     struct dictionary *dict;            /* Aggregate dictionary. */
     int case_cnt;                       /* Counts aggregated cases. */
-    union value *prev_break;            /* Last values of break variables. */
-    struct ccase *agr_case;             /* Aggregate case for output. */
-    flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
+    struct ccase agr_case;              /* Aggregate case for output. */
   };
 
 static void initialize_aggregate_info (struct agr_proc *);
@@ -135,15 +142,12 @@ static int aggregate_single_case (struct agr_proc *agr,
                                   const struct ccase *input,
                                   struct ccase *output);
 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-static int create_sysfile (struct agr_proc *);
 
 /* Aggregating to the active file. */
 static int agr_to_active_file (struct ccase *, void *aux);
 
 /* Aggregating to a system file. */
-static void write_case_to_sfm (struct agr_proc *agr);
 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
-static int sort_agr_to_sysfile (const struct ccase *, void *aux);
 \f
 /* Parsing. */
 
@@ -152,101 +156,90 @@ int
 cmd_aggregate (void)
 {
   struct agr_proc agr;
+  struct file_handle *out_file = NULL;
 
-  /* Have we seen these subcommands? */
-  unsigned seen = 0;
+  bool copy_documents = false;
+  bool presorted = false;
+  bool saw_direction;
 
-  agr.out_file = NULL;
-  agr.sink = NULL;
+  memset(&agr, 0 , sizeof (agr));
   agr.missing = ITEMWISE;
-  agr.sort = NULL;
-  agr.vars = NULL;
-  agr.dict = NULL;
-  agr.case_cnt = 0;
-  agr.prev_break = NULL;
   
   agr.dict = dict_create ();
   dict_set_label (agr.dict, dict_get_label (default_dict));
   dict_set_documents (agr.dict, dict_get_documents (default_dict));
+
+  /* OUTFILE subcommand must be first. */
+  if (!lex_force_match_id ("OUTFILE"))
+    goto error;
+  lex_match ('=');
+  if (!lex_match ('*'))
+    {
+      out_file = fh_parse ();
+      if (out_file == NULL)
+        goto error;
+    }
   
   /* Read most of the subcommands. */
   for (;;)
     {
       lex_match ('/');
       
-      if (lex_match_id ("OUTFILE"))
-       {
-         if (seen & 1)
-           {
-             msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
-              goto lossage;
-           }
-         seen |= 1;
-             
-         lex_match ('=');
-         if (lex_match ('*'))
-           agr.out_file = NULL;
-         else 
-           {
-             agr.out_file = fh_parse_file_handle ();
-             if (agr.out_file == NULL)
-                goto lossage;
-           }
-       }
-      else if (lex_match_id ("MISSING"))
+      if (lex_match_id ("MISSING"))
        {
          lex_match ('=');
          if (!lex_match_id ("COLUMNWISE"))
            {
              lex_error (_("while expecting COLUMNWISE"));
-              goto lossage;
+              goto error;
            }
          agr.missing = COLUMNWISE;
        }
       else if (lex_match_id ("DOCUMENT"))
-       seen |= 2;
+        copy_documents = true;
       else if (lex_match_id ("PRESORTED"))
-       seen |= 4;
+        presorted = true;
       else if (lex_match_id ("BREAK"))
        {
-         if (seen & 8)
-           {
-             msg (SE, _("%s subcommand given multiple times."),"BREAK");
-              goto lossage;
-           }
-         seen |= 8;
+          int i;
 
          lex_match ('=');
-          agr.sort = parse_sort ();
+          agr.sort = sort_parse_criteria (default_dict,
+                                          &agr.break_vars, &agr.break_var_cnt,
+                                          &saw_direction);
           if (agr.sort == NULL)
-            goto lossage;
+            goto error;
          
-         {
-           int i;
-           
-           for (i = 0; i < agr.sort->var_cnt; i++)
-             {
-               struct variable *v;
-             
-               v = dict_clone_var (agr.dict, agr.sort->vars[i],
-                                    agr.sort->vars[i]->name);
-               assert (v != NULL);
-             }
-         }
+          for (i = 0; i < agr.break_var_cnt; i++)
+            {
+              struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
+                                                   agr.break_vars[i]->name, 
+                                                  agr.break_vars[i]->longname 
+                                                  );
+              assert (v != NULL);
+            }
+
+          /* BREAK must follow the options. */
+          break;
        }
-      else break;
+      else
+        {
+          lex_error (_("expecting BREAK"));
+          goto error;
+        }
     }
-
-  /* Check for proper syntax. */
-  if (!(seen & 8))
-    msg (SW, _("BREAK subcommand not specified."));
+  if (presorted && saw_direction)
+    msg (SW, _("When PRESORTED is specified, specifying sorting directions "
+               "with (A) or (D) has no effect.  Output data will be sorted "
+               "the same way as the input data."));
       
   /* Read in the aggregate functions. */
+  lex_match ('/');
   if (!parse_aggregate_functions (&agr))
-    goto lossage;
+    goto error;
 
   /* Delete documents. */
-  if (!(seen & 2))
+  if (!copy_documents)
     dict_set_documents (agr.dict, NULL);
 
   /* Cancel SPLIT FILE. */
@@ -254,18 +247,18 @@ cmd_aggregate (void)
   
   /* Initialize. */
   agr.case_cnt = 0;
-  agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
+  case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
   initialize_aggregate_info (&agr);
 
   /* Output to active file or external file? */
-  if (agr.out_file == NULL) 
+  if (out_file == NULL) 
     {
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
       cancel_temporary ();
 
-      if (agr.sort != NULL && (seen & 4) == 0)
-       sort_cases (agr.sort, 0);
+      if (agr.sort != NULL && !presorted)
+        sort_active_file_in_place (agr.sort);
 
       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
       if (agr.sink->class->open != NULL)
@@ -274,8 +267,8 @@ cmd_aggregate (void)
       procedure (agr_to_active_file, &agr);
       if (agr.case_cnt > 0) 
         {
-          dump_aggregate_info (&agr, agr.agr_case);
-          agr.sink->class->write (agr.sink, agr.agr_case);
+          dump_aggregate_info (&agr, &agr.agr_case);
+          agr.sink->class->write (agr.sink, &agr.agr_case);
         }
       dict_destroy (default_dict);
       default_dict = agr.dict;
@@ -285,14 +278,29 @@ cmd_aggregate (void)
     }
   else
     {
-      if (!create_sysfile (&agr))
-        goto lossage;
-
-      if (agr.sort != NULL && (seen & 4) == 0) 
+      agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
+      if (agr.writer == NULL)
+        goto error;
+      
+      if (agr.sort != NULL && !presorted) 
         {
           /* Sorting is needed. */
-          sort_cases (agr.sort, 1);
-          read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
+          struct casefile *dst;
+          struct casereader *reader;
+          struct ccase c;
+          
+          dst = sort_active_file_to_casefile (agr.sort);
+          if (dst == NULL)
+            goto error;
+          reader = casefile_get_destructive_reader (dst);
+          while (casereader_read_xfer (reader, &c)) 
+            {
+              if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
+                sfm_write_case (agr.writer, &agr.agr_case);
+              case_destroy (&c);
+            }
+          casereader_destroy (reader);
+          casefile_destroy (dst);
         }
       else 
         {
@@ -302,37 +310,19 @@ cmd_aggregate (void)
       
       if (agr.case_cnt > 0) 
         {
-          dump_aggregate_info (&agr, agr.agr_case);
-          write_case_to_sfm (&agr);
+          dump_aggregate_info (&agr, &agr.agr_case);
+          sfm_write_case (agr.writer, &agr.agr_case);
         }
-      fh_close_handle (agr.out_file);
     }
   
   agr_destroy (&agr);
   return CMD_SUCCESS;
 
-lossage:
+error:
   agr_destroy (&agr);
   return CMD_FAILURE;
 }
 
-/* Create a system file for use in aggregation to an external
-   file. */
-static int
-create_sysfile (struct agr_proc *agr)
-{
-  struct sfm_write_info w;
-  w.h = agr->out_file;
-  w.dict = agr->dict;
-  w.compress = get_scompression();
-  if (!sfm_write_dictionary (&w))
-    return 0;
-
-  agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
-    
-  return 1;
-}
-
 /* Parse all the aggregate functions. */
 static int
 parse_aggregate_functions (struct agr_proc *agr)
@@ -372,8 +362,9 @@ parse_aggregate_functions (struct agr_proc *agr)
        {
          int n_dest_prev = n_dest;
          
-         if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
-           goto lossage;
+         if (!parse_DATA_LIST_vars (&dest, &n_dest,
+                                     PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
+           goto error;
 
          /* Assign empty labels. */
          {
@@ -386,8 +377,8 @@ parse_aggregate_functions (struct agr_proc *agr)
          
          if (token == T_STRING)
            {
-             ds_truncate (&tokstr, 120);
-             dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
+             ds_truncate (&tokstr, 255);
+             dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
              lex_get ();
            }
        }
@@ -396,7 +387,7 @@ parse_aggregate_functions (struct agr_proc *agr)
       if (token != T_ID)
        {
          lex_error (_("expecting aggregation function"));
-         goto lossage;
+         goto error;
        }
 
       include_missing = 0;
@@ -407,12 +398,12 @@ parse_aggregate_functions (struct agr_proc *agr)
        }
       
       for (function = agr_func_tab; function->name; function++)
-       if (!strcmp (function->name, tokid))
+       if (!strcasecmp (function->name, tokid))
          break;
       if (NULL == function->name)
        {
          msg (SE, _("Unknown aggregation function %s."), tokid);
-         goto lossage;
+         goto error;
        }
       func_index = function - agr_func_tab;
       lex_get ();
@@ -427,9 +418,11 @@ parse_aggregate_functions (struct agr_proc *agr)
          else
            {
              lex_error (_("expecting `('"));
-             goto lossage;
+             goto error;
            }
-       } else {
+       }
+      else
+        {
          /* Parse list of source variables. */
          {
            int pv_opts = PV_NO_SCRATCH;
@@ -440,7 +433,7 @@ parse_aggregate_functions (struct agr_proc *agr)
              pv_opts |= PV_SAME_TYPE;
 
            if (!parse_variables (default_dict, &src, &n_src, pv_opts))
-             goto lossage;
+             goto error;
          }
 
          /* Parse function arguments, for those functions that
@@ -453,16 +446,16 @@ parse_aggregate_functions (struct agr_proc *agr)
                lex_match (',');
                if (token == T_STRING)
                  {
-                   arg[i].c = xstrdup (ds_value (&tokstr));
+                   arg[i].c = xstrdup (ds_c_str (&tokstr));
                    type = ALPHA;
                  }
-               else if (token == T_NUM)
+               else if (lex_is_number ())
                  {
                    arg[i].f = tokval;
                    type = NUMERIC;
                  } else {
                    msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
-                   goto lossage;
+                   goto error;
                  }
            
                lex_get ();
@@ -472,7 +465,7 @@ parse_aggregate_functions (struct agr_proc *agr)
                    msg (SE, _("Arguments to %s must be of same type as "
                               "source variables."),
                         function->name);
-                   goto lossage;
+                   goto error;
                  }
              }
 
@@ -480,22 +473,39 @@ parse_aggregate_functions (struct agr_proc *agr)
          if (!lex_match(')'))
            {
              lex_error (_("expecting `)'"));
-             goto lossage;
+             goto error;
            }
          
-         /* Now check that the number of source variables match the
-            number of target variables.  Do this here because if we
-            do it earlier then the user can get very misleading error
-            messages; i.e., `AGGREGATE x=SUM(y t).' will get this
-            error message when a proper message would be more like
-            `unknown variable t'. */
+         /* Now check that the number of source variables match
+            the number of target variables.  If we check earlier
+            than this, the user can get very misleading error
+            message, i.e. `AGGREGATE x=SUM(y t).' will get this
+            error message when a proper message would be more
+            like `unknown variable t'. */
          if (n_src != n_dest)
            {
              msg (SE, _("Number of source variables (%d) does not match "
                         "number of target variables (%d)."),
                   n_src, n_dest);
-             goto lossage;
+             goto error;
            }
+
+          if ((func_index == PIN || func_index == POUT
+              || func_index == FIN || func_index == FOUT) 
+              && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
+                  || (src[0]->type == ALPHA
+                      && st_compare_pad (arg[0].c, strlen (arg[0].c),
+                                         arg[1].c, strlen (arg[1].c)) > 0)))
+            {
+              union value t = arg[0];
+              arg[0] = arg[1];
+              arg[1] = t;
+                  
+              msg (SW, _("The value arguments passed to the %s function "
+                         "are out-of-order.  They will be treated as if "
+                         "they had been specified in the correct order."),
+                   function->name);
+            }
        }
        
       /* Finally add these to the linked list of aggregation
@@ -505,24 +515,24 @@ parse_aggregate_functions (struct agr_proc *agr)
          struct agr_var *v = xmalloc (sizeof *v);
 
          /* Add variable to chain. */
-         if (agr->vars != NULL)
+         if (agr->agr_vars != NULL)
            tail->next = v;
          else
-           agr->vars = v;
+           agr->agr_vars = v;
           tail = v;
          tail->next = NULL;
+          v->moments = NULL;
          
          /* Create the target variable in the aggregate
              dictionary. */
          {
+            static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
            struct variable *destvar;
            
            v->function = func_index;
 
            if (src)
              {
-               int output_width;
-
                v->src = src[i];
                
                if (src[i]->type == ALPHA)
@@ -530,31 +540,30 @@ parse_aggregate_functions (struct agr_proc *agr)
                    v->function |= FSTRING;
                    v->string = xmalloc (src[i]->width);
                  }
-               
-               if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
-                 output_width = 0;
-               else
-                 output_width = v->src->width;
 
                if (function->alpha_type == ALPHA)
-                 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
-               else
-                 {
-                   destvar = dict_create_var (agr->dict, dest[i], output_width);
-                   if (output_width == 0)
-                     destvar->print = destvar->write = function->format;
-                   if (output_width == 0 && dict_get_weight (default_dict) != NULL
-                       && (func_index == N || func_index == N_NO_VARS
-                           || func_index == NU || func_index == NU_NO_VARS))
-                     {
-                       struct fmt_spec f = {FMT_F, 8, 2};
-                     
-                       destvar->print = destvar->write = f;
-                     }
-                 }
+                 destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] );
+               else if (v->src->type == NUMERIC
+                         || function->alpha_type == NUMERIC)
+                  {
+                    destvar = dict_create_var (agr->dict, dest[i], 0);
+                    if (destvar != NULL) 
+                      {
+                        if ((func_index == N || func_index == NMISS)
+                            && dict_get_weight (default_dict) != NULL)
+                          destvar->print = destvar->write = f8_2; 
+                        else
+                          destvar->print = destvar->write = function->format;
+                      }
+                  }
              } else {
                v->src = NULL;
                destvar = dict_create_var (agr->dict, dest[i], 0);
+                if (func_index == N_NO_VARS
+                    && dict_get_weight (default_dict) != NULL)
+                  destvar->print = destvar->write = f8_2; 
+                else
+                  destvar->print = destvar->write = function->format;
              }
          
            if (!destvar)
@@ -564,8 +573,7 @@ parse_aggregate_functions (struct agr_proc *agr)
                           "the aggregate variables and the break "
                           "variables."),
                     dest[i]);
-               free (dest[i]);
-               goto lossage;
+               goto error;
              }
 
            free (dest[i]);
@@ -575,8 +583,6 @@ parse_aggregate_functions (struct agr_proc *agr)
                destvar->label = dest_label[i];
                dest_label[i] = NULL;
              }
-           else if (function->alpha_type == ALPHA)
-             destvar->print = destvar->write = function->format;
 
            v->dest = destvar;
          }
@@ -617,7 +623,7 @@ parse_aggregate_functions (struct agr_proc *agr)
        }
       continue;
       
-    lossage:
+    error:
       for (i = 0; i < n_dest; i++)
        {
          free (dest[i]);
@@ -645,11 +651,12 @@ agr_destroy (struct agr_proc *agr)
 {
   struct agr_var *iter, *next;
 
-  if (agr->dict != NULL)
-    dict_destroy (agr->dict);
+  sfm_close_writer (agr->writer);
   if (agr->sort != NULL)
-    destroy_sort_cases_pgm (agr->sort);
-  for (iter = agr->vars; iter; iter = next)
+    sort_destroy_criteria (agr->sort);
+  free (agr->break_vars);
+  free (agr->prev_break);
+  for (iter = agr->agr_vars; iter; iter = next)
     {
       next = iter->next;
 
@@ -663,10 +670,14 @@ agr_destroy (struct agr_proc *agr)
            free (iter->arg[i].c);
          free (iter->string);
        }
+      else if (iter->function == SD)
+        moments1_destroy (iter->moments);
       free (iter);
     }
-  free (agr->prev_break);
-  free (agr->agr_case);
+  if (agr->dict != NULL)
+    dict_destroy (agr->dict);
+
+  case_destroy (&agr->agr_case);
 }
 \f
 /* Execution. */
@@ -691,8 +702,8 @@ aggregate_single_case (struct agr_proc *agr,
       {
        int i;
 
-       for (i = 0; i < agr->sort->var_cnt; i++)
-         n_elem += agr->sort->vars[i]->nv;
+       for (i = 0; i < agr->break_var_cnt; i++)
+         n_elem += agr->break_vars[i]->nv;
       }
       
       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
@@ -702,15 +713,15 @@ aggregate_single_case (struct agr_proc *agr,
        union value *iter = agr->prev_break;
        int i;
 
-       for (i = 0; i < agr->sort->var_cnt; i++)
+       for (i = 0; i < agr->break_var_cnt; i++)
          {
-           struct variable *v = agr->sort->vars[i];
+           struct variable *v = agr->break_vars[i];
            
            if (v->type == NUMERIC)
-             (iter++)->f = input->data[v->fv].f;
+             (iter++)->f = case_num (input, v->fv);
            else
              {
-               memcpy (iter->s, input->data[v->fv].s, v->width);
+               memcpy (iter->s, case_str (input, v->fv), v->width);
                iter += v->nv;
              }
          }
@@ -727,19 +738,19 @@ aggregate_single_case (struct agr_proc *agr,
     union value *iter = agr->prev_break;
     int i;
     
-    for (i = 0; i < agr->sort->var_cnt; i++)
+    for (i = 0; i < agr->break_var_cnt; i++)
       {
-       struct variable *v = agr->sort->vars[i];
+       struct variable *v = agr->break_vars[i];
       
        switch (v->type)
          {
          case NUMERIC:
-           if (input->data[v->fv].f != iter->f)
+           if (case_num (input, v->fv) != iter->f)
              goto not_equal;
            iter++;
            break;
          case ALPHA:
-           if (memcmp (input->data[v->fv].s, iter->s, v->width))
+           if (memcmp (case_str (input, v->fv), iter->s, v->width))
              goto not_equal;
            iter += v->nv;
            break;
@@ -766,15 +777,15 @@ not_equal:
     union value *iter = agr->prev_break;
     int i;
 
-    for (i = 0; i < agr->sort->var_cnt; i++)
+    for (i = 0; i < agr->break_var_cnt; i++)
       {
-       struct variable *v = agr->sort->vars[i];
+       struct variable *v = agr->break_vars[i];
            
        if (v->type == NUMERIC)
-         (iter++)->f = input->data[v->fv].f;
+         (iter++)->f = case_num (input, v->fv);
        else
          {
-           memcpy (iter->s, input->data[v->fv].s, v->width);
+           memcpy (iter->s, case_str (input, v->fv), v->width);
            iter += v->nv;
          }
       }
@@ -790,13 +801,14 @@ accumulate_aggregate_info (struct agr_proc *agr,
 {
   struct agr_var *iter;
   double weight;
+  int bad_warn = 1;
 
-  weight = dict_get_case_weight (default_dict, input);
+  weight = dict_get_case_weight (default_dict, input, &bad_warn);
 
-  for (iter = agr->vars; iter; iter = iter->next)
+  for (iter = agr->agr_vars; iter; iter = iter->next)
     if (iter->src)
       {
-       const union value *v = &input->data[iter->src->fv];
+       const union value *v = case_data (input, iter->src->fv);
 
        if ((!iter->include_missing && is_missing (v, iter->src))
            || (iter->include_missing && iter->src->type == NUMERIC
@@ -805,9 +817,11 @@ accumulate_aggregate_info (struct agr_proc *agr,
            switch (iter->function)
              {
              case NMISS:
+             case NMISS | FSTRING:
                iter->dbl[0] += weight;
                 break;
              case NUMISS:
+             case NUMISS | FSTRING:
                iter->int1++;
                break;
              }
@@ -819,20 +833,16 @@ accumulate_aggregate_info (struct agr_proc *agr,
        switch (iter->function)
          {
          case SUM:
-           iter->dbl[0] += v->f;
+           iter->dbl[0] += v->f * weight;
+            iter->int1 = 1;
            break;
          case MEAN:
             iter->dbl[0] += v->f * weight;
             iter->dbl[1] += weight;
             break;
-         case SD: 
-            {
-              double product = v->f * weight;
-              iter->dbl[0] += product;
-              iter->dbl[1] += product * v->f;
-              iter->dbl[2] += weight;
-              break; 
-            }
+         case SD:
+            moments1_add (iter->moments, v->f, weight);
+            break;
          case MAX:
            iter->dbl[0] = max (iter->dbl[0], v->f);
            iter->int1 = 1;
@@ -897,14 +907,16 @@ accumulate_aggregate_info (struct agr_proc *agr,
          case FOUT | FSTRING:
          case POUT | FSTRING:
             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
-                && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
+                || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
          case N:
+         case N | FSTRING:
            iter->dbl[0] += weight;
            break;
          case NU:
+         case NU | FSTRING:
            iter->int1++;
            break;
          case FIRST:
@@ -929,6 +941,13 @@ accumulate_aggregate_info (struct agr_proc *agr,
            memcpy (iter->string, v->s, iter->src->width);
            iter->int1 = 1;
            break;
+          case NMISS:
+          case NMISS | FSTRING:
+          case NUMISS:
+          case NUMISS | FSTRING:
+            /* Our value is not missing or it would have been
+               caught earlier.  Nothing to do. */
+            break;
          default:
            assert (0);
          }
@@ -954,29 +973,31 @@ static void
 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
 {
   {
-    int n_elem = 0;
-    
-    {
-      int i;
+    int value_idx = 0;
+    int i;
 
-      for (i = 0; i < agr->sort->var_cnt; i++)
-       n_elem += agr->sort->vars[i]->nv;
-    }
-    memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
+    for (i = 0; i < agr->break_var_cnt; i++) 
+      {
+        int nv = agr->break_vars[i]->nv;
+        memcpy (case_data_rw (output, value_idx),
+                &agr->prev_break[value_idx],
+                sizeof (union value) * nv);
+        value_idx += nv; 
+      }
   }
   
   {
     struct agr_var *i;
   
-    for (i = agr->vars; i; i = i->next)
+    for (i = agr->agr_vars; i; i = i->next)
       {
-       union value *v = &output->data[i->dest->fv];
+       union value *v = case_data_rw (output, i->dest->fv);
 
        if (agr->missing == COLUMNWISE && i->missing != 0
            && (i->function & FUNC) != N && (i->function & FUNC) != NU
            && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
          {
-           if (i->function & FSTRING)
+           if (i->dest->type == ALPHA)
              memset (v->s, ' ', i->dest->width);
            else
              v->f = SYSMIS;
@@ -986,15 +1007,23 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
        switch (i->function)
          {
          case SUM:
-           v->f = i->dbl[0];
+           v->f = i->int1 ? i->dbl[0] : SYSMIS;
            break;
          case MEAN:
            v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
            break;
          case SD:
-           v->f = ((i->dbl[2] > 1.0)
-                   ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
-                   : SYSMIS);
+            {
+              double variance;
+
+              /* FIXME: we should use two passes. */
+              moments1_calculate (i->moments, NULL, NULL, &variance,
+                                 NULL, NULL);
+              if (variance != SYSMIS)
+                v->f = sqrt (variance);
+              else
+                v->f = SYSMIS; 
+            }
            break;
          case MAX:
          case MIN:
@@ -1007,16 +1036,14 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
            else
              memset (v->s, ' ', i->dest->width);
            break;
-         case FGT | FSTRING:
-         case FLT | FSTRING:
-         case FIN | FSTRING:
-         case FOUT | FSTRING:
-           v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
-           break;
          case FGT:
+         case FGT | FSTRING:
          case FLT:
+         case FLT | FSTRING:
          case FIN:
+         case FIN | FSTRING:
          case FOUT:
+         case FOUT | FSTRING:
            v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
            break;
          case PGT:
@@ -1030,9 +1057,11 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
            v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
            break;
          case N:
+         case N | FSTRING:
            v->f = i->dbl[0];
             break;
          case NU:
+         case NU | FSTRING:
            v->f = i->int1;
            break;
          case FIRST:
@@ -1053,9 +1082,11 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
            v->f = i->int1;
            break;
          case NMISS:
+         case NMISS | FSTRING:
            v->f = i->dbl[0];
            break;
          case NUMISS:
+         case NUMISS | FSTRING:
            v->f = i->int1;
            break;
          default:
@@ -1071,9 +1102,11 @@ initialize_aggregate_info (struct agr_proc *agr)
 {
   struct agr_var *iter;
 
-  for (iter = agr->vars; iter; iter = iter->next)
+  for (iter = agr->agr_vars; iter; iter = iter->next)
     {
       iter->missing = 0;
+      iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
+      iter->int1 = iter->int2 = 0;
       switch (iter->function)
        {
        case MIN:
@@ -1088,10 +1121,14 @@ initialize_aggregate_info (struct agr_proc *agr)
        case MAX | FSTRING:
          memset (iter->string, 0, iter->src->width);
          break;
-       default:
-         iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
-         iter->int1 = iter->int2 = 0;
-         break;
+        case SD:
+          if (iter->moments == NULL)
+            iter->moments = moments1_create (MOMENT_VARIANCE);
+          else
+            moments1_clear (iter->moments);
+          break;
+        default:
+          break;
        }
     }
 }
@@ -1103,62 +1140,21 @@ agr_to_active_file (struct ccase *c, void *agr_)
 {
   struct agr_proc *agr = agr_;
 
-  if (aggregate_single_case (agr, c, agr->agr_case)) 
-    agr->sink->class->write (agr->sink, agr->agr_case);
+  if (aggregate_single_case (agr, c, &agr->agr_case)) 
+    agr->sink->class->write (agr->sink, &agr->agr_case);
 
   return 1;
 }
 
-/* Writes AGR->agr_case to AGR->out_file. */
-static void
-write_case_to_sfm (struct agr_proc *agr)
-{
-  flt64 *p;
-  int i;
-
-  p = agr->sfm_agr_case;
-  for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
-    {
-      struct variable *v = dict_get_var (agr->dict, i);
-      
-      if (v->type == NUMERIC)
-       {
-         double src = agr->agr_case->data[v->fv].f;
-         if (src == SYSMIS)
-           *p++ = -FLT64_MAX;
-         else
-           *p++ = src;
-       }
-      else
-       {
-         memcpy (p, agr->agr_case->data[v->fv].s, v->width);
-         memset (&((char *) p)[v->width], ' ',
-                 REM_RND_UP (v->width, sizeof (flt64)));
-         p += DIV_RND_UP (v->width, sizeof (flt64));
-       }
-    }
-
-  sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
-}
-
 /* Aggregate the current case and output it if we passed a
    breakpoint. */
 static int
 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
-{
-  sort_agr_to_sysfile (c, agr_);
-  return 1;
-}
-
-/* Aggregate the current case and output it if we passed a
-   breakpoint. */
-static int
-sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
 {
   struct agr_proc *agr = agr_;
 
-  if (aggregate_single_case (agr, c, agr->agr_case)) 
-    write_case_to_sfm (agr);
+  if (aggregate_single_case (agr, c, &agr->agr_case)) 
+    sfm_write_case (agr->writer, &agr->agr_case);
 
   return 1;
 }