Redo VFM interface. Get rid of compaction_necessary, compaction_nval,
[pspp-builds.git] / src / aggregate.c
index d49ebf9e520a51c86c13eee2fb73d95fb7b79c2f..cb7cff064d2371e87ef9ea08ec75152d1eaf98ec 100644 (file)
 #include <assert.h>
 #include <stdlib.h>
 #include "alloc.h"
-#include "approx.h"
 #include "command.h"
 #include "error.h"
 #include "file-handle.h"
 #include "lexer.h"
 #include "misc.h"
+#include "pool.h"
 #include "settings.h"
 #include "sfm.h"
 #include "sort.h"
 #include "vfm.h"
 #include "vfmP.h"
 
-#undef DEBUGGING
-/*#define DEBUGGING 1*/
-#include "debug-print.h"
-
 /* Specifies how to make an aggregate variable. */
 struct agr_var
   {
@@ -67,8 +63,6 @@ enum
     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
     FUNC = 0x1f, /* Function mask. */
     FSTRING = 1<<5, /* String function bit. */
-    FWEIGHT = 1<<6, /* Weighted function bit. */
-    FOPTIONS = FSTRING | FWEIGHT /* Function options mask. */
   };
 
 /* Attributes of an aggregation function. */
@@ -81,7 +75,7 @@ struct agr_func
   };
 
 /* Attributes of aggregation functions. */
-static struct agr_func agr_func_tab[] = 
+static const struct agr_func agr_func_tab[] = 
   {
     {"<NONE>",  0, -1,      {0, 0, 0}},
     {"SUM",     0, -1,      {FMT_F, 8, 2}},
@@ -108,53 +102,48 @@ static struct agr_func agr_func_tab[] =
     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
   };
 
-/* Output file, or NULL for the active file. */
-static struct file_handle *outfile;
-
 /* Missing value types. */
-enum
+enum missing_treatment
   {
     ITEMWISE,          /* Missing values item by item. */
     COLUMNWISE         /* Missing values column by column. */
   };
 
-/* ITEMWISE or COLUMNWISE. */
-static int missing;
-
-/* Aggregate variables. */
-static struct agr_var *agr_first, *agr_next;
-
-/* Aggregate dictionary. */
-static struct dictionary *agr_dict;
-
-/* Number of cases passed through aggregation. */
-static int case_count;
-
-/* Last values of the break variables. */
-static union value *prev_case;
-
-/* Buffers for use by the 10x transformation. */
-static flt64 *buf64_1xx;
-static struct ccase *buf_1xx;
+/* An entire AGGREGATE procedure. */
+struct agr_proc 
+  {
+    /* We have either an output file or a sink. */
+    struct file_handle *out_file;       /* Output file, or null if none. */
+    struct case_sink *sink;             /* Sink, or null if none. */
+
+    enum missing_treatment missing;     /* How to treat missing values. */
+    struct sort_cases_pgm *sort;        /* Sort program. */
+    struct agr_var *vars;               /* First aggregate variable. */
+    struct dictionary *dict;            /* Aggregate dictionary. */
+    int case_cnt;                       /* Counts aggregated cases. */
+    union value *prev_break;            /* Last values of break variables. */
+    struct ccase *agr_case;             /* Aggregate case for output. */
+    flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
+  };
 
-static void initialize_aggregate_info (void);
+static void initialize_aggregate_info (struct agr_proc *);
 
 /* Prototypes. */
-static int parse_aggregate_functions (void);
-static void free_aggregate_functions (void);
-static int aggregate_single_case (struct ccase *input, struct ccase *output);
-static int create_sysfile (void);
-
-static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
-static void agr_00x_end_func (void);
-static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
-static void agr_10x_trns_free (struct trns_header *);
-static void agr_10x_end_func (void);
-static int agr_11x_func (void);
-
-#if DEBUGGING
-static void debug_print (int flags);
-#endif
+static int parse_aggregate_functions (struct agr_proc *);
+static void agr_destroy (struct agr_proc *);
+static int aggregate_single_case (struct agr_proc *agr,
+                                  const struct ccase *input,
+                                  struct ccase *output);
+static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
+static int create_sysfile (struct agr_proc *);
+
+/* Aggregating to the active file. */
+static int agr_to_active_file (struct ccase *, void *aux);
+
+/* Aggregating to a system file. */
+static void write_case_to_sfm (struct agr_proc *agr);
+static int presorted_agr_to_sysfile (struct ccase *, void *aux);
+static int sort_agr_to_sysfile (const struct ccase *, void *aux);
 \f
 /* Parsing. */
 
@@ -162,49 +151,48 @@ static void debug_print (int flags);
 int
 cmd_aggregate (void)
 {
-  /* From sort.c. */
-  int parse_sort_variables (void);
-  
+  struct agr_proc agr;
+
   /* Have we seen these subcommands? */
   unsigned seen = 0;
 
-  outfile = NULL;
-  missing = ITEMWISE;
-  v_sort = NULL;
-  prev_case = NULL;
+  agr.out_file = NULL;
+  agr.sink = NULL;
+  agr.missing = ITEMWISE;
+  agr.sort = NULL;
+  agr.vars = NULL;
+  agr.dict = NULL;
+  agr.case_cnt = 0;
+  agr.prev_break = NULL;
   
-  agr_dict = new_dictionary (1);
+  agr.dict = dict_create ();
+  dict_set_label (agr.dict, dict_get_label (default_dict));
+  dict_set_documents (agr.dict, dict_get_documents (default_dict));
   
   lex_match_id ("AGGREGATE");
 
   /* Read most of the subcommands. */
   for (;;)
     {
-      lex_match('/');
+      lex_match ('/');
       
       if (lex_match_id ("OUTFILE"))
        {
          if (seen & 1)
            {
-             free (v_sort);
-             free_dictionary (agr_dict);
-             msg (SE, _("OUTFILE specified multiple times."));
-             return CMD_FAILURE;
+             msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
+              goto lossage;
            }
          seen |= 1;
              
          lex_match ('=');
          if (lex_match ('*'))
-           outfile = NULL;
+           agr.out_file = NULL;
          else 
            {
-             outfile = fh_parse_file_handle ();
-             if (outfile == NULL)
-               {
-                 free (v_sort);
-                 free_dictionary (agr_dict);
-                 return CMD_FAILURE;
-               }
+             agr.out_file = fh_parse_file_handle ();
+             if (agr.out_file == NULL)
+                goto lossage;
            }
        }
       else if (lex_match_id ("MISSING"))
@@ -212,12 +200,10 @@ cmd_aggregate (void)
          lex_match ('=');
          if (!lex_match_id ("COLUMNWISE"))
            {
-             free (v_sort);
-             free_dictionary (agr_dict);
              lex_error (_("while expecting COLUMNWISE"));
-             return CMD_FAILURE;
+              goto lossage;
            }
-         missing = COLUMNWISE;
+         agr.missing = COLUMNWISE;
        }
       else if (lex_match_id ("DOCUMENT"))
        seen |= 2;
@@ -227,28 +213,25 @@ cmd_aggregate (void)
        {
          if (seen & 8)
            {
-             free (v_sort);
-             free_dictionary (agr_dict);
-             msg (SE, _("BREAK specified multiple times."));
-             return CMD_FAILURE;
+             msg (SE, _("%s subcommand given multiple times."),"BREAK");
+              goto lossage;
            }
          seen |= 8;
 
          lex_match ('=');
-         if (!parse_sort_variables ())
-           {
-             free_dictionary (agr_dict);
-             return CMD_FAILURE;
-           }
+          agr.sort = parse_sort ();
+          if (agr.sort == NULL)
+            goto lossage;
          
          {
            int i;
            
-           for (i = 0; i < nv_sort; i++)
+           for (i = 0; i < agr.sort->var_cnt; i++)
              {
                struct variable *v;
              
-               v = dup_variable (agr_dict, v_sort[i], v_sort[i]->name);
+               v = dict_clone_var (agr.dict, agr.sort->vars[i],
+                                    agr.sort->vars[i]->name);
                assert (v != NULL);
              }
          }
@@ -261,193 +244,105 @@ cmd_aggregate (void)
     msg (SW, _("BREAK subcommand not specified."));
       
   /* Read in the aggregate functions. */
-  if (!parse_aggregate_functions ())
-    {
-      free_aggregate_functions ();
-      free (v_sort);
-      return CMD_FAILURE;
-    }
+  if (!parse_aggregate_functions (&agr))
+    goto lossage;
 
   /* Delete documents. */
   if (!(seen & 2))
-    {
-      free (agr_dict->documents);
-      agr_dict->documents = NULL;
-      agr_dict->n_documents = 0;
-    }
+    dict_set_documents (agr.dict, NULL);
 
   /* Cancel SPLIT FILE. */
-  default_dict.n_splits = 0;
-  free (default_dict.splits);
-  default_dict.splits = NULL;
+  dict_set_split_vars (agr.dict, NULL, 0);
   
-#if DEBUGGING
-  debug_print (seen);
-#endif
-
   /* Initialize. */
-  case_count = 0;
-  initialize_aggregate_info ();
-
-  /* How to implement all this... There are three important variables:
-     whether output is going to the active file (0) or a separate file
-     (1); whether the input data is presorted (0) or needs sorting
-     (1); whether there is a temporary transformation (1) or not (0).
-     The eight cases are as follows:
-
-     000 (0): Pass it through an aggregate transformation that
-     modifies the data.
-
-     001 (1): Cancel the temporary transformation and handle as 000.
-
-     010 (2): Set up a SORT CASES and aggregate the output, writing
-     the results to the active file.
-     
-     011 (3): Cancel the temporary transformation and handle as 010.
-
-     100 (4): Pass it through an aggregate transformation that doesn't
-     modify the data but merely writes it to the output file.
-
-     101 (5): Handled as 100.
-
-     110 (6): Set up a SORT CASES and capture the output, aggregate
-     it, write it to the output file without modifying the active
-     file.
-
-     111 (7): Handled as 110. */
-  
-  {
-    unsigned type = 0;
-
-    if (outfile != NULL)
-      type |= 4;
-    if (nv_sort != 0 && (seen & 4) == 0)
-      type |= 2;
-    if (temporary)
-      type |= 1;
-
-    switch (type)
-      {
-      case 3:
-       cancel_temporary ();
-       /* fall through */
-      case 2:
-       sort_cases (0);
-       goto case0;
-         
-      case 1:
-       cancel_temporary ();
-       /* fall through */
-      case 0:
-      case0:
-       {
-         struct trns_header *t = xmalloc (sizeof *t);
-         t->proc = agr_00x_trns_proc;
-         t->free = NULL;
-         add_transformation (t);
-         
-         temporary = 2;
-         temp_dict = agr_dict;
-         temp_trns = n_trns;
-         
-         agr_dict = NULL;
-
-         procedure (NULL, NULL, agr_00x_end_func);
-         break;
-       }
-
-      case 4:
-      case 5:
-       {
-         if (!create_sysfile ())
-           goto lossage;
-         
-         {
-           struct trns_header *t = xmalloc (sizeof *t);
-           t->proc = agr_10x_trns_proc;
-           t->free = agr_10x_trns_free;
-           add_transformation (t);
-
-           procedure (NULL, NULL, agr_10x_end_func);
-         }
-         
-         break;
-       }
-         
-      case 6:
-      case 7:
-       sort_cases (1);
-       
-       if (!create_sysfile ())
-         goto lossage;
-       read_sort_output (agr_11x_func);
-       
-       {
-         struct ccase *save_temp_case = temp_case;
-         temp_case = NULL;
-         agr_11x_func ();
-         temp_case = save_temp_case;
-       }
-       
-       break;
+  agr.case_cnt = 0;
+  agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
+  initialize_aggregate_info (&agr);
 
-      default:
-       assert (0);
-      }
-  }
-  
-  free (buf64_1xx);
-  free (buf_1xx);
-  
-  /* Clean up. */
-  free (v_sort);
-  free_aggregate_functions ();
-  free (prev_case);
+  /* Output to active file or external file? */
+  if (agr.out_file == NULL) 
+    {
+      /* The active file will be replaced by the aggregated data,
+         so TEMPORARY is moot. */
+      cancel_temporary ();
+
+      if (agr.sort != NULL && (seen & 4) == 0)
+       sort_cases (agr.sort, 0);
+
+      agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
+      if (agr.sink->class->open != NULL)
+        agr.sink->class->open (agr.sink);
+      vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
+      procedure (agr_to_active_file, &agr);
+      if (agr.case_cnt > 0) 
+        {
+          dump_aggregate_info (&agr, agr.agr_case);
+          agr.sink->class->write (agr.sink, agr.agr_case);
+        }
+      dict_destroy (default_dict);
+      default_dict = agr.dict;
+      agr.dict = NULL;
+      vfm_source = agr.sink->class->make_source (agr.sink);
+      free_case_sink (agr.sink);
+    }
+  else
+    {
+      if (!create_sysfile (&agr))
+        goto lossage;
+
+      if (agr.sort != NULL && (seen & 4) == 0) 
+        {
+          /* Sorting is needed. */
+          sort_cases (agr.sort, 1);
+          read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
+        }
+      else 
+        {
+          /* Active file is already sorted. */
+          procedure (presorted_agr_to_sysfile, &agr);
+        }
+      
+      if (agr.case_cnt > 0) 
+        {
+          dump_aggregate_info (&agr, agr.agr_case);
+          write_case_to_sfm (&agr);
+        }
+      fh_close_handle (agr.out_file);
+    }
   
+  agr_destroy (&agr);
   return CMD_SUCCESS;
 
 lossage:
-  /* Clean up. */
-  free (v_sort);
-  free_aggregate_functions ();
-  free (prev_case);
-
+  agr_destroy (&agr);
   return CMD_FAILURE;
 }
 
-/* Create a system file for use in aggregation to an external file,
-   and allocate temporary buffers for writing out cases. */
+/* Create a system file for use in aggregation to an external
+   file. */
 static int
-create_sysfile (void)
+create_sysfile (struct agr_proc *agr)
 {
   struct sfm_write_info w;
-  w.h = outfile;
-  w.dict = agr_dict;
+  w.h = agr->out_file;
+  w.dict = agr->dict;
   w.compress = set_scompression;
   if (!sfm_write_dictionary (&w))
-    {
-      free_aggregate_functions ();
-      free (v_sort);
-      free_dictionary (agr_dict);
-      return 0;
-    }
-    
-  buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
-  buf_1xx = xmalloc (sizeof (struct ccase) + sizeof (union value) * (agr_dict->nval - 1));
+    return 0;
 
+  agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
+    
   return 1;
 }
 
 /* Parse all the aggregate functions. */
 static int
-parse_aggregate_functions (void)
+parse_aggregate_functions (struct agr_proc *agr)
 {
-  agr_first = agr_next = NULL;
+  struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
 
-  /* Anticipate weighting for optimization later. */
-  update_weighting (&default_dict);
-  
   /* Parse everything. */
+  tail = NULL;
   for (;;)
     {
       char **dest;
@@ -455,7 +350,7 @@ parse_aggregate_functions (void)
       int n_dest;
 
       int include_missing;
-      struct agr_func *function;
+      const struct agr_func *function;
       int func_index;
 
       union value arg[2];
@@ -469,6 +364,7 @@ parse_aggregate_functions (void)
       dest_label = NULL;
       n_dest = 0;
       src = NULL;
+      function = NULL;
       n_src = 0;
       arg[0].c = NULL;
       arg[1].c = NULL;
@@ -545,7 +441,7 @@ parse_aggregate_functions (void)
            else if (function->n_args)
              pv_opts |= PV_SAME_TYPE;
 
-           if (!parse_variables (&default_dict, &src, &n_src, pv_opts))
+           if (!parse_variables (default_dict, &src, &n_src, pv_opts))
              goto lossage;
          }
 
@@ -611,48 +507,45 @@ parse_aggregate_functions (void)
          struct agr_var *v = xmalloc (sizeof *v);
 
          /* Add variable to chain. */
-         if (agr_first)
-           agr_next = agr_next->next = v;
+         if (agr->vars != NULL)
+           tail->next = v;
          else
-           agr_first = agr_next = v;
-         agr_next->next = NULL;
+           agr->vars = v;
+          tail = v;
+         tail->next = NULL;
          
          /* Create the target variable in the aggregate
              dictionary. */
          {
            struct variable *destvar;
            
-           agr_next->function = func_index;
+           v->function = func_index;
 
            if (src)
              {
-               int output_type;
+               int output_width;
 
-               agr_next->src = src[i];
+               v->src = src[i];
                
                if (src[i]->type == ALPHA)
                  {
-                   agr_next->function |= FSTRING;
-                   agr_next->string = xmalloc (src[i]->width);
+                   v->function |= FSTRING;
+                   v->string = xmalloc (src[i]->width);
                  }
                
-               if (default_dict.weight_index != -1)
-                 agr_next->function |= FWEIGHT;
-
-               if (agr_next->src->type == NUMERIC)
-                 output_type = NUMERIC;
+               if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
+                 output_width = 0;
                else
-                 output_type = function->alpha_type;
+                 output_width = v->src->width;
 
                if (function->alpha_type == ALPHA)
-                 destvar = dup_variable (agr_dict, agr_next->src, dest[i]);
+                 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
                else
                  {
-                   destvar = create_variable (agr_dict, dest[i], output_type,
-                                              agr_next->src->width);
-                   if (output_type == NUMERIC)
+                   destvar = dict_create_var (agr->dict, dest[i], output_width);
+                   if (output_width == 0)
                      destvar->print = destvar->write = function->format;
-                   if (output_type == NUMERIC && default_dict.weight_index != -1
+                   if (output_width == 0 && dict_get_weight (default_dict) != NULL
                        && (func_index == N || func_index == N_NO_VARS
                            || func_index == NU || func_index == NU_NO_VARS))
                      {
@@ -662,8 +555,8 @@ parse_aggregate_functions (void)
                      }
                  }
              } else {
-               agr_next->src = NULL;
-               destvar = create_variable (agr_dict, dest[i], NUMERIC, 0);
+               v->src = NULL;
+               destvar = dict_create_var (agr->dict, dest[i], 0);
              }
          
            if (!destvar)
@@ -678,6 +571,7 @@ parse_aggregate_functions (void)
              }
 
            free (dest[i]);
+            destvar->init = 0;
            if (dest_label[i])
              {
                destvar->label = dest_label[i];
@@ -686,21 +580,21 @@ parse_aggregate_functions (void)
            else if (function->alpha_type == ALPHA)
              destvar->print = destvar->write = function->format;
 
-           agr_next->dest = destvar;
+           v->dest = destvar;
          }
          
-         agr_next->include_missing = include_missing;
+         v->include_missing = include_missing;
 
-         if (agr_next->src != NULL)
+         if (v->src != NULL)
            {
              int j;
 
-             if (agr_next->src->type == NUMERIC)
+             if (v->src->type == NUMERIC)
                for (j = 0; j < function->n_args; j++)
-                 agr_next->arg[j].f = arg[j].f;
+                 v->arg[j].f = arg[j].f;
              else
                for (j = 0; j < function->n_args; j++)
-                 agr_next->arg[j].c = xstrdup (arg[j].c);
+                 v->arg[j].c = xstrdup (arg[j].c);
            }
        }
       
@@ -738,7 +632,7 @@ parse_aggregate_functions (void)
       if (src && n_src && src[0]->type == ALPHA)
        for (i = 0; i < function->n_args; i++)
          {
-           free(arg[i].c);
+           free (arg[i].c);
            arg[i].c = NULL;
          }
       free (src);
@@ -747,15 +641,17 @@ parse_aggregate_functions (void)
     }
 }
 
-/* Frees all the state for the AGGREGATE procedure. */
+/* Destroys AGR. */
 static void
-free_aggregate_functions (void)
+agr_destroy (struct agr_proc *agr)
 {
   struct agr_var *iter, *next;
 
-  if (agr_dict)
-    free_dictionary (agr_dict);
-  for (iter = agr_first; iter; iter = next)
+  if (agr->dict != NULL)
+    dict_destroy (agr->dict);
+  if (agr->sort != NULL)
+    destroy_sort_cases_pgm (agr->sort);
+  for (iter = agr->vars; iter; iter = next)
     {
       next = iter->next;
 
@@ -771,45 +667,46 @@ free_aggregate_functions (void)
        }
       free (iter);
     }
+  free (agr->prev_break);
+  free (agr->agr_case);
 }
 \f
 /* Execution. */
 
-static void accumulate_aggregate_info (struct ccase *input);
-static void dump_aggregate_info (struct ccase *output);
+static void accumulate_aggregate_info (struct agr_proc *,
+                                       const struct ccase *);
+static void dump_aggregate_info (struct agr_proc *, struct ccase *);
 
 /* Processes a single case INPUT for aggregation.  If output is
-   warranted, it is written to case OUTPUT, which may be (but need not
-   be) an alias to INPUT.  Returns -1 when output is performed, -2
-   otherwise. */
-/* The code in this function has an eerie similarity to
-   vfm.c:SPLIT_FILE_procfunc()... */
+   warranted, writes it to OUTPUT and returns nonzero.
+   Otherwise, returns zero and OUTPUT is unmodified. */
 static int
-aggregate_single_case (struct ccase *input, struct ccase *output)
+aggregate_single_case (struct agr_proc *agr,
+                       const struct ccase *input, struct ccase *output)
 {
   /* The first case always begins a new break group.  We also need to
      preserve the values of the case for later comparison. */
-  if (case_count++ == 0)
+  if (agr->case_cnt++ == 0)
     {
       int n_elem = 0;
       
       {
        int i;
 
-       for (i = 0; i < nv_sort; i++)
-         n_elem += v_sort[i]->nv;
+       for (i = 0; i < agr->sort->var_cnt; i++)
+         n_elem += agr->sort->vars[i]->nv;
       }
       
-      prev_case = xmalloc (sizeof *prev_case * n_elem);
+      agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
 
-      /* Copy INPUT into prev_case. */
+      /* Copy INPUT into prev_break. */
       {
-       union value *iter = prev_case;
+       union value *iter = agr->prev_break;
        int i;
 
-       for (i = 0; i < nv_sort; i++)
+       for (i = 0; i < agr->sort->var_cnt; i++)
          {
-           struct variable *v = v_sort[i];
+           struct variable *v = agr->sort->vars[i];
            
            if (v->type == NUMERIC)
              (iter++)->f = input->data[v->fv].f;
@@ -821,25 +718,25 @@ aggregate_single_case (struct ccase *input, struct ccase *output)
          }
       }
            
-      accumulate_aggregate_info (input);
+      accumulate_aggregate_info (agr, input);
        
-      return -2;
+      return 0;
     }
       
   /* Compare the value of each break variable to the values on the
      previous case. */
   {
-    union value *iter = prev_case;
+    union value *iter = agr->prev_break;
     int i;
     
-    for (i = 0; i < nv_sort; i++)
+    for (i = 0; i < agr->sort->var_cnt; i++)
       {
-       struct variable *v = v_sort[i];
+       struct variable *v = agr->sort->vars[i];
       
        switch (v->type)
          {
          case NUMERIC:
-           if (approx_ne (input->data[v->fv].f, iter->f))
+           if (input->data[v->fv].f != iter->f)
              goto not_equal;
            iter++;
            break;
@@ -854,26 +751,26 @@ aggregate_single_case (struct ccase *input, struct ccase *output)
       }
   }
 
-  accumulate_aggregate_info (input);
+  accumulate_aggregate_info (agr, input);
 
-  return -2;
+  return 0;
   
 not_equal:
   /* The values of the break variable are different from the values on
      the previous case.  That means that it's time to dump aggregate
      info. */
-  dump_aggregate_info (output);
-  initialize_aggregate_info ();
-  accumulate_aggregate_info (input);
+  dump_aggregate_info (agr, output);
+  initialize_aggregate_info (agr);
+  accumulate_aggregate_info (agr, input);
 
-  /* Copy INPUT into prev_case. */
+  /* Copy INPUT into prev_break. */
   {
-    union value *iter = prev_case;
+    union value *iter = agr->prev_break;
     int i;
 
-    for (i = 0; i < nv_sort; i++)
+    for (i = 0; i < agr->sort->var_cnt; i++)
       {
-       struct variable *v = v_sort[i];
+       struct variable *v = agr->sort->vars[i];
            
        if (v->type == NUMERIC)
          (iter++)->f = input->data[v->fv].f;
@@ -885,21 +782,23 @@ not_equal:
       }
   }
   
-  return -1;
+  return 1;
 }
 
 /* Accumulates aggregation data from the case INPUT. */
 static void 
-accumulate_aggregate_info (struct ccase *input)
+accumulate_aggregate_info (struct agr_proc *agr,
+                           const struct ccase *input)
 {
   struct agr_var *iter;
+  double weight;
 
-#define WEIGHT (input->data[default_dict.weight_index].f)
+  weight = dict_get_case_weight (default_dict, input);
 
-  for (iter = agr_first; iter; iter = iter->next)
+  for (iter = agr->vars; iter; iter = iter->next)
     if (iter->src)
       {
-       union value *v = &input->data[iter->src->fv];
+       const union value *v = &input->data[iter->src->fv];
 
        if ((!iter->include_missing && is_missing (v, iter->src))
            || (iter->include_missing && iter->src->type == NUMERIC
@@ -907,12 +806,10 @@ accumulate_aggregate_info (struct ccase *input)
          {
            switch (iter->function)
              {
-             case NMISS | FWEIGHT:
-               iter->dbl[0] += WEIGHT;
-               break;
              case NMISS:
+               iter->dbl[0] += weight;
+                break;
              case NUMISS:
-             case NUMISS | FWEIGHT:
                iter->int1++;
                break;
              }
@@ -924,190 +821,95 @@ accumulate_aggregate_info (struct ccase *input)
        switch (iter->function)
          {
          case SUM:
-         case SUM | FWEIGHT:
            iter->dbl[0] += v->f;
            break;
          case MEAN:
-           iter->dbl[0] += v->f;
-           iter->int1++;
-           break;
-         case MEAN | FWEIGHT:
-           {
-             double w = WEIGHT;
-             iter->dbl[0] += v->f * w;
-             iter->dbl[1] += w;
-             break;
-           }
-         case SD:
-           iter->dbl[0] += v->f;
-           iter->dbl[1] += v->f * v->f;
-           iter->int1++;
-           break;
-         case SD | FWEIGHT:
-           {
-             double w = WEIGHT;
-             double product = v->f * w;
-             iter->dbl[0] += product;
-             iter->dbl[1] += product * v->f;
-             iter->dbl[2] += w;
-             break;
-           }
+            iter->dbl[0] += v->f * weight;
+            iter->dbl[1] += weight;
+            break;
+         case SD: 
+            {
+              double product = v->f * weight;
+              iter->dbl[0] += product;
+              iter->dbl[1] += product * v->f;
+              iter->dbl[2] += weight;
+              break; 
+            }
          case MAX:
-         case MAX | FWEIGHT:
            iter->dbl[0] = max (iter->dbl[0], v->f);
            iter->int1 = 1;
            break;
          case MAX | FSTRING:
-         case MAX | FSTRING | FWEIGHT:
            if (memcmp (iter->string, v->s, iter->src->width) < 0)
              memcpy (iter->string, v->s, iter->src->width);
            iter->int1 = 1;
            break;
          case MIN:
-         case MIN | FWEIGHT:
            iter->dbl[0] = min (iter->dbl[0], v->f);
            iter->int1 = 1;
            break;
          case MIN | FSTRING:
-         case MIN | FSTRING | FWEIGHT:
            if (memcmp (iter->string, v->s, iter->src->width) > 0)
              memcpy (iter->string, v->s, iter->src->width);
            iter->int1 = 1;
            break;
          case FGT:
          case PGT:
-           if (approx_gt (v->f, iter->arg[0].f))
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FGT | FWEIGHT:
-         case PGT | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (approx_gt (v->f, iter->arg[0].f))
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (v->f > iter->arg[0].f)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FGT | FSTRING:
          case PGT | FSTRING:
-           if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FGT | FSTRING | FWEIGHT:
-         case PGT | FSTRING | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FLT:
          case PLT:
-           if (approx_lt (v->f, iter->arg[0].f))
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FLT | FWEIGHT:
-         case PLT | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (approx_lt (v->f, iter->arg[0].f))
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (v->f < iter->arg[0].f)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FLT | FSTRING:
          case PLT | FSTRING:
-           if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FLT | FSTRING | FWEIGHT:
-         case PLT | FSTRING | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FIN:
          case PIN:
-           if (approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FIN | FWEIGHT:
-         case PIN | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FIN | FSTRING:
          case PIN | FSTRING:
-           if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
-               && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FIN | FSTRING | FWEIGHT:
-         case PIN | FSTRING | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
-                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
+                && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FOUT:
          case POUT:
-           if (!approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FOUT | FWEIGHT:
-         case POUT | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (!approx_in_range (v->f, iter->arg[0].f, iter->arg[1].f))
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
+            if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case FOUT | FSTRING:
          case POUT | FSTRING:
-           if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
-               && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
-             iter->int1++;
-           iter->int2++;
-           break;
-         case FOUT | FSTRING | FWEIGHT:
-         case POUT | FSTRING | FWEIGHT:
-           {
-             double w = WEIGHT;
-             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
-                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
-               iter->dbl[0] += w;
-             iter->dbl[1] += w;
-             break;
-           }
-         case N | FWEIGHT:
-           iter->dbl[0] += WEIGHT;
-           break;
+            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
+                && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
+              iter->dbl[0] += weight;
+            iter->dbl[1] += weight;
+            break;
          case N:
+           iter->dbl[0] += weight;
+           break;
          case NU:
-         case NU | FWEIGHT:
            iter->int1++;
            break;
          case FIRST:
-         case FIRST | FWEIGHT:
            if (iter->int1 == 0)
              {
                iter->dbl[0] = v->f;
@@ -1115,7 +917,6 @@ accumulate_aggregate_info (struct ccase *input)
              }
            break;
          case FIRST | FSTRING:
-         case FIRST | FSTRING | FWEIGHT:
            if (iter->int1 == 0)
              {
                memcpy (iter->string, v->s, iter->src->width);
@@ -1123,12 +924,10 @@ accumulate_aggregate_info (struct ccase *input)
              }
            break;
          case LAST:
-         case LAST | FWEIGHT:
            iter->dbl[0] = v->f;
            iter->int1 = 1;
            break;
          case LAST | FSTRING:
-         case LAST | FSTRING | FWEIGHT:
            memcpy (iter->string, v->s, iter->src->width);
            iter->int1 = 1;
            break;
@@ -1138,12 +937,10 @@ accumulate_aggregate_info (struct ccase *input)
     } else {
       switch (iter->function)
        {
-       case N_NO_VARS | FWEIGHT:
-         iter->dbl[0] += WEIGHT;
-         break;
        case N_NO_VARS:
+         iter->dbl[0] += weight;
+         break;
        case NU_NO_VARS:
-       case NU_NO_VARS | FWEIGHT:
          iter->int1++;
          break;
        default:
@@ -1156,33 +953,28 @@ accumulate_aggregate_info (struct ccase *input)
    more of the break variables.  Make an output record from the
    accumulated statistics in the OUTPUT case. */
 static void 
-dump_aggregate_info (struct ccase *output)
+dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
 {
-  debug_printf (("(dumping "));
-  
   {
     int n_elem = 0;
     
     {
       int i;
 
-      for (i = 0; i < nv_sort; i++)
-       n_elem += v_sort[i]->nv;
+      for (i = 0; i < agr->sort->var_cnt; i++)
+       n_elem += agr->sort->vars[i]->nv;
     }
-    debug_printf (("n_elem=%d:", n_elem));
-    memcpy (output->data, prev_case, sizeof (union value) * n_elem);
+    memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
   }
   
   {
     struct agr_var *i;
   
-    for (i = agr_first; i; i = i->next)
+    for (i = agr->vars; i; i = i->next)
       {
        union value *v = &output->data[i->dest->fv];
 
-       debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
-
-       if (missing == COLUMNWISE && i->missing != 0
+       if (agr->missing == COLUMNWISE && i->missing != 0
            && (i->function & FUNC) != N && (i->function & FUNC) != NU
            && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
          {
@@ -1196,58 +988,37 @@ dump_aggregate_info (struct ccase *output)
        switch (i->function)
          {
          case SUM:
-         case SUM | FWEIGHT:
            v->f = i->dbl[0];
            break;
          case MEAN:
-           v->f = i->int1 ? i->dbl[0] / i->int1 : SYSMIS;
-           break;
-         case MEAN | FWEIGHT:
            v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
            break;
          case SD:
-           v->f = ((i->int1 > 1)
-                   ? calc_stddev (calc_variance (i->dbl, i->int1))
-                   : SYSMIS);
-           break;
-         case SD | FWEIGHT:
            v->f = ((i->dbl[2] > 1.0)
                    ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
                    : SYSMIS);
            break;
          case MAX:
-         case MAX | FWEIGHT:
          case MIN:
-         case MIN | FWEIGHT:
            v->f = i->int1 ? i->dbl[0] : SYSMIS;
            break;
          case MAX | FSTRING:
-         case MAX | FSTRING | FWEIGHT:
          case MIN | FSTRING:
-         case MIN | FSTRING | FWEIGHT:
            if (i->int1)
              memcpy (v->s, i->string, i->dest->width);
            else
              memset (v->s, ' ', i->dest->width);
            break;
-         case FGT:
          case FGT | FSTRING:
-         case FLT:
          case FLT | FSTRING:
-         case FIN:
          case FIN | FSTRING:
-         case FOUT:
          case FOUT | FSTRING:
            v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
            break;
-         case FGT | FWEIGHT:
-         case FGT | FSTRING | FWEIGHT:
-         case FLT | FWEIGHT:
-         case FLT | FSTRING | FWEIGHT:
-         case FIN | FWEIGHT:
-         case FIN | FSTRING | FWEIGHT:
-         case FOUT | FWEIGHT:
-         case FOUT | FSTRING | FWEIGHT:
+         case FGT:
+         case FLT:
+         case FIN:
+         case FOUT:
            v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
            break;
          case PGT:
@@ -1258,56 +1029,35 @@ dump_aggregate_info (struct ccase *output)
          case PIN | FSTRING:
          case POUT:
          case POUT | FSTRING:
-           v->f = (i->int2
-                   ? (double) i->int1 / (double) i->int2 * 100.0
-                   : SYSMIS);
-           break;
-         case PGT | FWEIGHT:
-         case PGT | FSTRING | FWEIGHT:
-         case PLT | FWEIGHT:
-         case PLT | FSTRING | FWEIGHT:
-         case PIN | FWEIGHT:
-         case PIN | FSTRING | FWEIGHT:
-         case POUT | FWEIGHT:
-         case POUT | FSTRING | FWEIGHT:
            v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
            break;
-         case N | FWEIGHT:
-           v->f = i->dbl[0];
          case N:
+           v->f = i->dbl[0];
+            break;
          case NU:
-         case NU | FWEIGHT:
            v->f = i->int1;
            break;
          case FIRST:
-         case FIRST | FWEIGHT:
          case LAST:
-         case LAST | FWEIGHT:
            v->f = i->int1 ? i->dbl[0] : SYSMIS;
            break;
          case FIRST | FSTRING:
-         case FIRST | FSTRING | FWEIGHT:
          case LAST | FSTRING:
-         case LAST | FSTRING | FWEIGHT:
            if (i->int1)
              memcpy (v->s, i->string, i->dest->width);
            else
              memset (v->s, ' ', i->dest->width);
            break;
-         case N_NO_VARS | FWEIGHT:
+         case N_NO_VARS:
            v->f = i->dbl[0];
            break;
-         case N_NO_VARS:
          case NU_NO_VARS:
-         case NU_NO_VARS | FWEIGHT:
            v->f = i->int1;
            break;
-         case NMISS | FWEIGHT:
+         case NMISS:
            v->f = i->dbl[0];
            break;
-         case NMISS:
          case NUMISS:
-         case NUMISS | FWEIGHT:
            v->f = i->int1;
            break;
          default:
@@ -1315,21 +1065,18 @@ dump_aggregate_info (struct ccase *output)
          }
       }
   }
-  debug_printf ((") "));
 }
 
 /* Resets the state for all the aggregate functions. */
 static void
-initialize_aggregate_info (void)
+initialize_aggregate_info (struct agr_proc *agr)
 {
   struct agr_var *iter;
 
-  for (iter = agr_first; iter; iter = iter->next)
+  for (iter = agr->vars; iter; iter = iter->next)
     {
-      int plain_function = iter->function & ~FWEIGHT;
-
       iter->missing = 0;
-      switch (plain_function)
+      switch (iter->function)
        {
        case MIN:
          iter->dbl[0] = DBL_MAX;
@@ -1354,43 +1101,31 @@ initialize_aggregate_info (void)
 /* Aggregate each case as it comes through.  Cases which aren't needed
    are dropped. */
 static int
-agr_00x_trns_proc (struct trns_header *h unused, struct ccase *c)
+agr_to_active_file (struct ccase *c, void *agr_)
 {
-  int code = aggregate_single_case (c, compaction_case);
-  debug_printf (("%d ", code));
-  return code;
-}
+  struct agr_proc *agr = agr_;
 
-/* Output the last aggregate case.  It's okay to call the vfm_sink's
-   write() method here because end_func is called so soon after all
-   the cases have been output; very little has been cleaned up at this
-   point. */
-static void
-agr_00x_end_func (void)
-{
-  /* Ensure that info for the last break group gets written to the
-     active file. */
-  dump_aggregate_info (compaction_case);
-  vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  if (aggregate_single_case (agr, c, agr->agr_case)) 
+    agr->sink->class->write (agr->sink, agr->agr_case);
+
+  return 1;
 }
 
-/* Transform the aggregate case buf_1xx, in internal format, to system
-   file format, in buf64_1xx, and write the resultant case to the
-   system file. */
+/* Writes AGR->agr_case to AGR->out_file. */
 static void
-write_case_to_sfm (void)
+write_case_to_sfm (struct agr_proc *agr)
 {
-  flt64 *p = buf64_1xx;
+  flt64 *p;
   int i;
 
-  for (i = 0; i < agr_dict->nvar; i++)
+  p = agr->sfm_agr_case;
+  for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
     {
-      struct variable *v = agr_dict->var[i];
+      struct variable *v = dict_get_var (agr->dict, i);
       
       if (v->type == NUMERIC)
        {
-         double src = buf_1xx->data[v->fv].f;
+         double src = agr->agr_case->data[v->fv].f;
          if (src == SYSMIS)
            *p++ = -FLT64_MAX;
          else
@@ -1398,126 +1133,34 @@ write_case_to_sfm (void)
        }
       else
        {
-         memcpy (p, buf_1xx->data[v->fv].s, v->width);
+         memcpy (p, agr->agr_case->data[v->fv].s, v->width);
          memset (&((char *) p)[v->width], ' ',
                  REM_RND_UP (v->width, sizeof (flt64)));
          p += DIV_RND_UP (v->width, sizeof (flt64));
        }
     }
 
-  sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
+  sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
 }
 
 /* Aggregate the current case and output it if we passed a
    breakpoint. */
 static int
-agr_10x_trns_proc (struct trns_header *h unused, struct ccase *c)
-{
-  int code = aggregate_single_case (c, buf_1xx);
-
-  assert (code == -2 || code == -1);
-  if (code == -1)
-    write_case_to_sfm ();
-  return -1;
-}
-
-/* Close the system file now that we're done with it.  */
-static void
-agr_10x_trns_free (struct trns_header *h unused)
+presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
 {
-  fh_close_handle (outfile);
-}
-
-/* Ensure that info for the last break group gets written to the
-   system file. */
-static void
-agr_10x_end_func (void)
-{
-  dump_aggregate_info (buf_1xx);
-  write_case_to_sfm ();
+  sort_agr_to_sysfile (c, agr_);
+  return 1;
 }
 
-/* When called with temp_case non-NULL (the normal case), runs the
-   case through the aggregater and outputs it to the system file if
-   appropriate.  If temp_case is NULL, finishes up writing the last
-   case if necessary. */
+/* Aggregate the current case and output it if we passed a
+   breakpoint. */
 static int
-agr_11x_func (void)
+sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
 {
-  if (temp_case != NULL)
-    {
-      int code = aggregate_single_case (temp_case, buf_1xx);
-      
-      assert (code == -2 || code == -1);
-      if (code == -1)
-       write_case_to_sfm ();
-    }
-  else
-    {
-      if (case_count)
-       {
-         dump_aggregate_info (buf_1xx);
-         write_case_to_sfm ();
-       }
-      fh_close_handle (outfile);
-    }
-  return 1;
-}
-\f
-/* Debugging. */
-#if DEBUGGING
-/* Print out useful debugging information. */
-static void
-debug_print (int flags)
-{
-  printf ("AGGREGATE\n /OUTFILE=%s\n",
-         outfile ? fh_handle_filename (outfile) : "*");
-
-  if (missing == COLUMNWISE)
-    puts (" /MISSING=COLUMNWISE");
+  struct agr_proc *agr = agr_;
 
-  if (flags & 2)
-    puts (" /DOCUMENT");
-  if (flags & 4)
-    puts (" /PRESORTED");
-  
-  {
-    int i;
+  if (aggregate_single_case (agr, c, agr->agr_case)) 
+    write_case_to_sfm (agr);
 
-    printf (" /BREAK=");
-    for (i = 0; i < nv_sort; i++)
-      printf ("%s(%c) ", v_sort[i]->name,
-             v_sort[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
-    putc ('\n', stdout);
-  }
-  
-  {
-    struct agr_var *iter;
-    
-    for (iter = agr_first; iter; iter = iter->next)
-      {
-       struct agr_func *f = &agr_func_tab[iter->function & FUNC];
-       
-       printf (" /%s", iter->dest->name);
-       if (iter->dest->label)
-         printf ("'%s'", iter->dest->label);
-       printf ("=%s(%s", f->name, iter->src->name);
-       if (f->n_args)
-         {
-           int i;
-           
-           for (i = 0; i < f->n_args; i++)
-             {
-               putc (',', stdout);
-               if (iter->src->type == NUMERIC)
-                 printf ("%g", iter->arg[i].f);
-               else
-                 printf ("%.*s", iter->src->width, iter->arg[i].c);
-             }
-         }
-       printf (")\n");
-      }
-  }
+  return 1;
 }
-
-#endif /* DEBUGGING */