struct agr_var *next; /* Next in list. */
/* Collected during parsing. */
- struct variable *src; /* Source variable. */
+ const struct variable *src; /* Source variable. */
struct variable *dest; /* Target variable. */
int function; /* Function. */
- int include_missing; /* 1=Include user-missing values. */
+ enum mv_class exclude; /* Classes of missing values to exclude. */
union agr_argument arg[2]; /* Arguments. */
/* Accumulated during AGGREGATE execution. */
double dbl[3];
int int1, int2;
char *string;
- int missing;
+ bool saw_missing;
struct moments1 *moments;
};
/* Break variables. */
struct sort_criteria *sort; /* Sort criteria. */
- struct variable **break_vars; /* Break variables. */
+ const struct variable **break_vars; /* Break variables. */
size_t break_var_cnt; /* Number of break variables. */
struct ccase break_case; /* Last values of break variables. */
const struct ccase *input,
struct ccase *output);
static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
\f
/* Parsing. */
/* Delete documents. */
if (!copy_documents)
- dict_set_documents (agr.dict, NULL);
+ dict_clear_documents (agr.dict);
/* Cancel SPLIT FILE. */
dict_set_split_vars (agr.dict, NULL, 0);
/* Output to active file or external file? */
if (out_file == NULL)
{
+ struct ccase *c;
+
/* The active file will be replaced by the aggregated data,
so TEMPORARY is moot. */
proc_cancel_temporary_transformations (ds);
goto error;
}
- agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
+ agr.sink = create_case_sink (&storage_sink_class, agr.dict,
+ dataset_get_casefile_factory (ds),
+ NULL);
if (agr.sink->class->open != NULL)
agr.sink->class->open (agr.sink);
proc_set_sink (ds,
- create_case_sink (&null_sink_class,
- dict, NULL));
- if (!procedure (ds, agr_to_active_file, &agr))
+ create_case_sink (&null_sink_class, dict,
+ dataset_get_casefile_factory (ds),
+ NULL));
+ proc_open (ds);
+ while (proc_read (ds, &c))
+ if (aggregate_single_case (&agr, c, &agr.agr_case))
+ if (!agr.sink->class->write (agr.sink, &agr.agr_case))
+ {
+ proc_close (ds);
+ goto error;
+ }
+ if (!proc_close (ds))
goto error;
+
if (agr.case_cnt > 0)
{
dump_aggregate_info (&agr, &agr.agr_case);
goto error;
}
discard_variables (ds);
- dict_destroy (dict);
dataset_set_dict (ds, agr.dict);
agr.dict = NULL;
- proc_set_source (ds,
- agr.sink->class->make_source (agr.sink));
+ proc_set_source (ds, agr.sink->class->make_source (agr.sink));
free_case_sink (agr.sink);
}
else
else
{
/* Active file is already sorted. */
- if (!procedure (ds, presorted_agr_to_sysfile, &agr))
+ struct ccase *c;
+
+ proc_open (ds);
+ while (proc_read (ds, &c))
+ if (aggregate_single_case (&agr, c, &agr.agr_case))
+ if (!any_writer_write (agr.writer, &agr.agr_case))
+ {
+ proc_close (ds);
+ goto error;
+ }
+ if (!proc_close (ds))
goto error;
}
size_t n_dest;
struct string function_name;
- int include_missing;
+ enum mv_class exclude;
const struct agr_func *function;
int func_index;
union agr_argument arg[2];
- struct variable **src;
+ const struct variable **src;
size_t n_src;
size_t i;
n_src = 0;
arg[0].c = NULL;
arg[1].c = NULL;
+ ds_init_empty (&function_name);
/* Parse the list of target variables. */
while (!lex_match (lexer, '='))
goto error;
}
- include_missing = 0;
+ exclude = MV_ANY;
- ds_init_string (&function_name, lex_tokstr (lexer));
+ ds_assign_string (&function_name, lex_tokstr (lexer));
ds_chomp (&function_name, '.');
if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
- include_missing = 1;
+ exclude = MV_SYSTEM;
for (function = agr_func_tab; function->name; function++)
if (!strcasecmp (function->name, ds_cstr (&function_name)))
else if (function->n_args)
pv_opts |= PV_SAME_TYPE;
- if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
+ if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
goto error;
}
}
else
{
- msg (SE, _("Missing argument %d to %s."), i + 1,
- function->name);
+ msg (SE, _("Missing argument %d to %s."),
+ (int) i + 1, function->name);
goto error;
}
v->dest = destvar;
}
- v->include_missing = include_missing;
+ v->exclude = exclude;
if (v->src != NULL)
{
const union value *v = case_data (input, iter->src);
int src_width = var_get_width (iter->src);
- if (iter->include_missing
- ? var_is_numeric (iter->src) && v->f == SYSMIS
- : var_is_value_missing (iter->src, v))
+ if (var_is_value_missing (iter->src, v, iter->exclude))
{
switch (iter->function)
{
iter->int1++;
break;
}
- iter->missing = 1;
+ iter->saw_missing = true;
continue;
}
for (i = 0; i < agr->break_var_cnt; i++)
{
- struct variable *v = agr->break_vars[i];
+ const struct variable *v = agr->break_vars[i];
size_t value_cnt = var_get_value_cnt (v);
memcpy (case_data_rw_idx (output, value_idx),
case_data (&agr->break_case, v),
{
union value *v = case_data_rw (output, i->dest);
- if (agr->missing == COLUMNWISE && i->missing != 0
+ if (agr->missing == COLUMNWISE && i->saw_missing
&& (i->function & FUNC) != N && (i->function & FUNC) != NU
&& (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
{
for (iter = agr->agr_vars; iter; iter = iter->next)
{
- iter->missing = 0;
+ iter->saw_missing = false;
iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
iter->int1 = iter->int2 = 0;
switch (iter->function)
}
}
}
-\f
-/* Aggregate each case as it comes through. Cases which aren't needed
- are dropped.
- Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return agr->sink->class->write (agr->sink, &agr->agr_case);
-
- return true;
-}
-
-/* Aggregate the current case and output it if we passed a
- breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_,
- const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return any_writer_write (agr->writer, &agr->agr_case);
-
- return true;
-}