Changed a lot of non-const pointers to const.
[pspp-builds.git] / src / language / stats / aggregate.c
index 648e8896b64bd2b6c6f8ca8ed9e7bfb49e12ff4b..044297462c982a2d395a6ef66c9d26780015163d 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
@@ -27,6 +26,7 @@
 #include <data/casefile.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
+#include <data/format.h>
 #include <data/procedure.h>
 #include <data/settings.h>
 #include <data/storage-stream.h>
@@ -64,17 +64,17 @@ struct agr_var
     struct agr_var *next;              /* Next in list. */
 
     /* Collected during parsing. */
-    struct variable *src;      /* Source variable. */
+    const struct variable *src;        /* Source variable. */
     struct variable *dest;     /* Target variable. */
     int function;              /* Function. */
-    int include_missing;       /* 1=Include user-missing values. */
+    enum mv_class exclude;      /* Classes of missing values to exclude. */
     union agr_argument arg[2]; /* Arguments. */
 
     /* Accumulated during AGGREGATE execution. */
     double dbl[3];
     int int1, int2;
     char *string;
-    int missing;
+    bool saw_missing;
     struct moments1 *moments;
   };
 
@@ -93,36 +93,36 @@ struct agr_func
   {
     const char *name;          /* Aggregation function name. */
     size_t n_args;              /* Number of arguments. */
-    int alpha_type;            /* When given ALPHA arguments, output type. */
+    enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
     struct fmt_spec format;    /* Format spec if alpha_type != ALPHA. */
   };
 
 /* Attributes of aggregation functions. */
 static const struct agr_func agr_func_tab[] = 
   {
-    {"<NONE>",  0, -1,      {0, 0, 0}},
-    {"SUM",     0, -1,      {FMT_F, 8, 2}},
-    {"MEAN",   0, -1,      {FMT_F, 8, 2}},
-    {"SD",      0, -1,      {FMT_F, 8, 2}},
-    {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
-    {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
-    {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
-    {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
-    {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
-    {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
-    {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
-    {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
-    {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
-    {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
-    {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
-    {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
-    {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
-    {"LAST",    0, ALPHA,   {-1, -1, -1}},
-    {NULL,      0, -1,      {-1, -1, -1}},
-    {"N",       0, NUMERIC, {FMT_F, 7, 0}},
-    {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
+    {"<NONE>",  0, -1,          {0, 0, 0}},
+    {"SUM",     0, -1,          {FMT_F, 8, 2}},
+    {"MEAN",   0, -1,          {FMT_F, 8, 2}},
+    {"SD",      0, -1,          {FMT_F, 8, 2}},
+    {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
+    {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
+    {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
+    {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
+    {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
+    {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
+    {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
+    {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
+    {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
+    {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
+    {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
+    {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
+    {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
+    {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
+    {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
+    {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
+    {NULL,      0, -1,          {-1, -1, -1}},
+    {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
+    {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
   };
 
 /* Missing value types. */
@@ -141,7 +141,7 @@ struct agr_proc
 
     /* Break variables. */
     struct sort_criteria *sort;         /* Sort criteria. */
-    struct variable **break_vars;       /* Break variables. */
+    const struct variable **break_vars;       /* Break variables. */
     size_t break_var_cnt;               /* Number of break variables. */
     struct ccase break_case;            /* Last values of break variables. */
 
@@ -164,12 +164,6 @@ static bool aggregate_single_case (struct agr_proc *agr,
                                   const struct ccase *input,
                                   struct ccase *output);
 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
 \f
 /* Parsing. */
 
@@ -188,7 +182,7 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   memset(&agr, 0 , sizeof (agr));
   agr.missing = ITEMWISE;
   case_nullify (&agr.break_case);
-  
+
   agr.dict = dict_create ();
   agr.src_dict = dict;
   dict_set_label (agr.dict, dict_get_label (dict));
@@ -237,7 +231,7 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
          
           for (i = 0; i < agr.break_var_cnt; i++)
             dict_clone_var_assert (agr.dict, agr.break_vars[i],
-                                   agr.break_vars[i]->name);
+                                   var_get_name (agr.break_vars[i]));
 
           /* BREAK must follow the options. */
           break;
@@ -272,6 +266,8 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   /* Output to active file or external file? */
   if (out_file == NULL) 
     {
+      struct ccase *c;
+      
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
       proc_cancel_temporary_transformations (ds);
@@ -282,14 +278,26 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
             goto error;
         }
 
-      agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
+      agr.sink = create_case_sink (&storage_sink_class, agr.dict,
+                                  dataset_get_casefile_factory (ds),
+                                  NULL);
       if (agr.sink->class->open != NULL)
         agr.sink->class->open (agr.sink);
       proc_set_sink (ds, 
-                    create_case_sink (&null_sink_class, 
-                                      dict, NULL));
-      if (!procedure (ds, agr_to_active_file, &agr))
+                    create_case_sink (&null_sink_class, dict,
+                                      dataset_get_casefile_factory (ds),
+                                      NULL));
+      proc_open (ds);
+      while (proc_read (ds, &c))
+        if (aggregate_single_case (&agr, c, &agr.agr_case)) 
+          if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
+            {
+              proc_close (ds);
+              goto error; 
+            }
+      if (!proc_close (ds))
         goto error;
+
       if (agr.case_cnt > 0) 
         {
           dump_aggregate_info (&agr, &agr.agr_case);
@@ -297,11 +305,9 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
             goto error;
         }
       discard_variables (ds);
-      dict_destroy (dict);
       dataset_set_dict (ds, agr.dict);
       agr.dict = NULL;
-      proc_set_source (ds, 
-                      agr.sink->class->make_source (agr.sink));
+      proc_set_source (ds, agr.sink->class->make_source (agr.sink));
       free_case_sink (agr.sink);
     }
   else
@@ -338,7 +344,17 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
       else 
         {
           /* Active file is already sorted. */
-          if (!procedure (ds, presorted_agr_to_sysfile, &agr))
+          struct ccase *c;
+          
+          proc_open (ds);
+          while (proc_read (ds, &c))
+            if (aggregate_single_case (&agr, c, &agr.agr_case)) 
+              if (!any_writer_write (agr.writer, &agr.agr_case)) 
+                {
+                  proc_close (ds);
+                  goto error;
+                }
+          if (!proc_close (ds))
             goto error;
         }
       
@@ -374,13 +390,13 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
       size_t n_dest;
       struct string function_name;
 
-      int include_missing;
+      enum mv_class exclude;
       const struct agr_func *function;
       int func_index;
 
       union agr_argument arg[2];
 
-      struct variable **src;
+      const struct variable **src;
       size_t n_src;
 
       size_t i;
@@ -393,6 +409,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
       n_src = 0;
       arg[0].c = NULL;
       arg[1].c = NULL;
+      ds_init_empty (&function_name);
 
       /* Parse the list of target variables. */
       while (!lex_match (lexer, '='))
@@ -433,14 +450,14 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
          goto error;
        }
 
-      include_missing = 0;
+      exclude = MV_ANY;
 
-      ds_init_string (&function_name, lex_tokstr (lexer));
+      ds_assign_string (&function_name, lex_tokstr (lexer));
 
       ds_chomp (&function_name, '.');
 
       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
-         include_missing = 1;
+        exclude = MV_SYSTEM;
 
       for (function = agr_func_tab; function->name; function++)
        if (!strcasecmp (function->name, ds_cstr (&function_name)))
@@ -479,7 +496,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
            else if (function->n_args)
              pv_opts |= PV_SAME_TYPE;
 
-           if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
+           if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
              goto error;
          }
 
@@ -494,21 +511,23 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
                if (lex_token (lexer) == T_STRING)
                  {
                    arg[i].c = ds_xstrdup (lex_tokstr (lexer));
-                   type = ALPHA;
+                   type = VAR_STRING;
                  }
                else if (lex_is_number (lexer))
                  {
                    arg[i].f = lex_tokval (lexer);
-                   type = NUMERIC;
-                 } else {
-                   msg (SE, _("Missing argument %d to %s."), i + 1,
-                         function->name);
+                   type = VAR_NUMERIC;
+                 }
+                else
+                  {
+                   msg (SE, _("Missing argument %d to %s."),
+                         (int) i + 1, function->name);
                    goto error;
                  }
            
                lex_get (lexer);
 
-               if (type != src[0]->type)
+               if (type != var_get_type (src[0]))
                  {
                    msg (SE, _("Arguments to %s must be of same type as "
                               "source variables."),
@@ -540,9 +559,9 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
 
           if ((func_index == PIN || func_index == POUT
               || func_index == FIN || func_index == FOUT) 
-              && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
-                  || (src[0]->type == ALPHA
-                      && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
+              && (var_is_numeric (src[0])
+                  ? arg[0].f > arg[1].f
+                  : str_compare_rpad (arg[0].c, arg[1].c) > 0))
             {
               union agr_argument t = arg[0];
               arg[0] = arg[1];
@@ -581,18 +600,18 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
              {
                v->src = src[i];
                
-               if (src[i]->type == ALPHA)
+               if (var_is_alpha (src[i]))
                  {
                    v->function |= FSTRING;
-                   v->string = xmalloc (src[i]->width);
+                   v->string = xmalloc (var_get_width (src[i]));
                  }
 
-               if (function->alpha_type == ALPHA)
+               if (function->alpha_type == VAR_STRING)
                  destvar = dict_clone_var (agr->dict, v->src, dest[i]);
                else
                   {
-                    assert (v->src->type == NUMERIC
-                            || function->alpha_type == NUMERIC);
+                    assert (var_is_numeric (v->src)
+                            || function->alpha_type == VAR_NUMERIC);
                     destvar = dict_create_var (agr->dict, dest[i], 0);
                     if (destvar != NULL) 
                       {
@@ -602,7 +621,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
                           f = fmt_for_output (FMT_F, 8, 2); 
                         else
                           f = function->format;
-                        destvar->print = destvar->write = f;
+                        var_set_both_formats (destvar, &f);
                       }
                   }
              } else {
@@ -613,7 +632,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
                   f = fmt_for_output (FMT_F, 8, 2); 
                 else
                   f = function->format;
-                destvar->print = destvar->write = f;
+                var_set_both_formats (destvar, &f);
              }
          
            if (!destvar)
@@ -628,21 +647,18 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
 
            free (dest[i]);
            if (dest_label[i])
-             {
-               destvar->label = dest_label[i];
-               dest_label[i] = NULL;
-             }
+              var_set_label (destvar, dest_label[i]);
 
            v->dest = destvar;
          }
          
-         v->include_missing = include_missing;
+         v->exclude = exclude;
 
          if (v->src != NULL)
            {
              int j;
 
-             if (v->src->type == NUMERIC)
+             if (var_is_numeric (v->src))
                for (j = 0; j < function->n_args; j++)
                  v->arg[j].f = arg[j].f;
              else
@@ -651,7 +667,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
            }
        }
       
-      if (src != NULL && src[0]->type == ALPHA)
+      if (src != NULL && var_is_alpha (src[0]))
        for (i = 0; i < function->n_args; i++)
          {
            free (arg[i].c);
@@ -683,7 +699,7 @@ parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, s
       free (dest_label);
       free (arg[0].c);
       free (arg[1].c);
-      if (src && n_src && src[0]->type == ALPHA)
+      if (src && n_src && var_is_alpha (src[0]))
        for (i = 0; i < function->n_args; i++)
          {
            free (arg[i].c);
@@ -774,12 +790,10 @@ accumulate_aggregate_info (struct agr_proc *agr,
   for (iter = agr->agr_vars; iter; iter = iter->next)
     if (iter->src)
       {
-       const union value *v = case_data (input, iter->src->fv);
+       const union value *v = case_data (input, iter->src);
+        int src_width = var_get_width (iter->src);
 
-       if ((!iter->include_missing
-             && mv_is_value_missing (&iter->src->miss, v))
-           || (iter->include_missing && iter->src->type == NUMERIC
-               && v->f == SYSMIS))
+        if (var_is_value_missing (iter->src, v, iter->exclude))
          {
            switch (iter->function)
              {
@@ -792,7 +806,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
                iter->int1++;
                break;
              }
-           iter->missing = 1;
+           iter->saw_missing = true;
            continue;
          }
        
@@ -815,8 +829,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
            iter->int1 = 1;
            break;
          case MAX | FSTRING:
-           if (memcmp (iter->string, v->s, iter->src->width) < 0)
-             memcpy (iter->string, v->s, iter->src->width);
+           if (memcmp (iter->string, v->s, src_width) < 0)
+             memcpy (iter->string, v->s, src_width);
            iter->int1 = 1;
            break;
          case MIN:
@@ -824,8 +838,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
            iter->int1 = 1;
            break;
          case MIN | FSTRING:
-           if (memcmp (iter->string, v->s, iter->src->width) > 0)
-             memcpy (iter->string, v->s, iter->src->width);
+           if (memcmp (iter->string, v->s, src_width) > 0)
+             memcpy (iter->string, v->s, src_width);
            iter->int1 = 1;
            break;
          case FGT:
@@ -836,7 +850,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FGT | FSTRING:
          case PGT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
+            if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -848,7 +862,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FLT | FSTRING:
          case PLT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
+            if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -860,8 +874,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FIN | FSTRING:
          case PIN | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
-                && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
+            if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
+                && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -873,8 +887,8 @@ accumulate_aggregate_info (struct agr_proc *agr,
             break;
          case FOUT | FSTRING:
          case POUT | FSTRING:
-            if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
-                || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
+            if (memcmp (iter->arg[0].c, v->s, src_width) > 0
+                || memcmp (iter->arg[1].c, v->s, src_width) < 0)
               iter->dbl[0] += weight;
             iter->dbl[1] += weight;
             break;
@@ -896,7 +910,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
          case FIRST | FSTRING:
            if (iter->int1 == 0)
              {
-               memcpy (iter->string, v->s, iter->src->width);
+               memcpy (iter->string, v->s, src_width);
                iter->int1 = 1;
              }
            break;
@@ -905,7 +919,7 @@ accumulate_aggregate_info (struct agr_proc *agr,
            iter->int1 = 1;
            break;
          case LAST | FSTRING:
-           memcpy (iter->string, v->s, iter->src->width);
+           memcpy (iter->string, v->s, src_width);
            iter->int1 = 1;
            break;
           case NMISS:
@@ -945,11 +959,12 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
 
     for (i = 0; i < agr->break_var_cnt; i++) 
       {
-        struct variable *v = agr->break_vars[i];
-        memcpy (case_data_rw (output, value_idx),
-                case_data (&agr->break_case, v->fv),
-                sizeof (union value) * v->nv);
-        value_idx += v->nv; 
+        const struct variable *v = agr->break_vars[i];
+        size_t value_cnt = var_get_value_cnt (v);
+        memcpy (case_data_rw_idx (output, value_idx),
+                case_data (&agr->break_case, v),
+                sizeof (union value) * value_cnt);
+        value_idx += value_cnt; 
       }
   }
   
@@ -958,14 +973,14 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
   
     for (i = agr->agr_vars; i; i = i->next)
       {
-       union value *v = case_data_rw (output, i->dest->fv);
+       union value *v = case_data_rw (output, i->dest);
 
-       if (agr->missing == COLUMNWISE && i->missing != 0
+       if (agr->missing == COLUMNWISE && i->saw_missing
            && (i->function & FUNC) != N && (i->function & FUNC) != NU
            && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
          {
-           if (i->dest->type == ALPHA)
-             memset (v->s, ' ', i->dest->width);
+           if (var_is_alpha (i->dest))
+             memset (v->s, ' ', var_get_width (i->dest));
            else
              v->f = SYSMIS;
            continue;
@@ -999,9 +1014,9 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          case MAX | FSTRING:
          case MIN | FSTRING:
            if (i->int1)
-             memcpy (v->s, i->string, i->dest->width);
+             memcpy (v->s, i->string, var_get_width (i->dest));
            else
-             memset (v->s, ' ', i->dest->width);
+             memset (v->s, ' ', var_get_width (i->dest));
            break;
          case FGT:
          case FGT | FSTRING:
@@ -1038,9 +1053,9 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          case FIRST | FSTRING:
          case LAST | FSTRING:
            if (i->int1)
-             memcpy (v->s, i->string, i->dest->width);
+             memcpy (v->s, i->string, var_get_width (i->dest));
            else
-             memset (v->s, ' ', i->dest->width);
+             memset (v->s, ' ', var_get_width (i->dest));
            break;
          case N_NO_VARS:
            v->f = i->dbl[0];
@@ -1074,7 +1089,7 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
 
   for (iter = agr->agr_vars; iter; iter = iter->next)
     {
-      iter->missing = 0;
+      iter->saw_missing = false;
       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
       iter->int1 = iter->int2 = 0;
       switch (iter->function)
@@ -1083,13 +1098,13 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
          iter->dbl[0] = DBL_MAX;
          break;
        case MIN | FSTRING:
-         memset (iter->string, 255, iter->src->width);
+         memset (iter->string, 255, var_get_width (iter->src));
          break;
        case MAX:
          iter->dbl[0] = -DBL_MAX;
          break;
        case MAX | FSTRING:
-         memset (iter->string, 0, iter->src->width);
+         memset (iter->string, 0, var_get_width (iter->src));
          break;
         case SD:
           if (iter->moments == NULL)
@@ -1102,31 +1117,3 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
        }
     }
 }
-\f
-/* Aggregate each case as it comes through.  Cases which aren't needed
-   are dropped.
-   Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return agr->sink->class->write (agr->sink, &agr->agr_case);
-
-  return true;
-}
-
-/* Aggregate the current case and output it if we passed a
-   breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
-                         const struct dataset *ds UNUSED) 
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return any_writer_write (agr->writer, &agr->agr_case);
-  
-  return true;
-}