/* PSPP - computes sample statistics.
Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
- Written by Ben Pfaff <blp@gnu.org>.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include <stdlib.h>
#include <data/any-writer.h>
-#include <data/case-sink.h>
+#include <data/case-ordering.h>
#include <data/case.h>
-#include <data/casefile.h>
+#include <data/casegrouper.h>
+#include <data/casereader.h>
+#include <data/casewriter.h>
#include <data/dictionary.h>
#include <data/file-handle-def.h>
+#include <data/format.h>
#include <data/procedure.h>
#include <data/settings.h>
-#include <data/storage-stream.h>
#include <data/sys-file-writer.h>
#include <data/variable.h>
#include <language/command.h>
struct agr_var *next; /* Next in list. */
/* Collected during parsing. */
- struct variable *src; /* Source variable. */
+ const struct variable *src; /* Source variable. */
struct variable *dest; /* Target variable. */
int function; /* Function. */
- int include_missing; /* 1=Include user-missing values. */
+ enum mv_class exclude; /* Classes of missing values to exclude. */
union agr_argument arg[2]; /* Arguments. */
/* Accumulated during AGGREGATE execution. */
double dbl[3];
int int1, int2;
char *string;
- int missing;
+ bool saw_missing;
struct moments1 *moments;
};
{
const char *name; /* Aggregation function name. */
size_t n_args; /* Number of arguments. */
- int alpha_type; /* When given ALPHA arguments, output type. */
+ enum var_type alpha_type; /* When given ALPHA arguments, output type. */
struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
};
/* Attributes of aggregation functions. */
static const struct agr_func agr_func_tab[] =
{
- {"<NONE>", 0, -1, {0, 0, 0}},
- {"SUM", 0, -1, {FMT_F, 8, 2}},
- {"MEAN", 0, -1, {FMT_F, 8, 2}},
- {"SD", 0, -1, {FMT_F, 8, 2}},
- {"MAX", 0, ALPHA, {-1, -1, -1}},
- {"MIN", 0, ALPHA, {-1, -1, -1}},
- {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
- {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
- {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
- {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
- {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
- {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
- {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
- {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
- {"N", 0, NUMERIC, {FMT_F, 7, 0}},
- {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
- {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
- {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
- {"FIRST", 0, ALPHA, {-1, -1, -1}},
- {"LAST", 0, ALPHA, {-1, -1, -1}},
- {NULL, 0, -1, {-1, -1, -1}},
- {"N", 0, NUMERIC, {FMT_F, 7, 0}},
- {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
+ {"<NONE>", 0, -1, {0, 0, 0}},
+ {"SUM", 0, -1, {FMT_F, 8, 2}},
+ {"MEAN", 0, -1, {FMT_F, 8, 2}},
+ {"SD", 0, -1, {FMT_F, 8, 2}},
+ {"MAX", 0, VAR_STRING, {-1, -1, -1}},
+ {"MIN", 0, VAR_STRING, {-1, -1, -1}},
+ {"PGT", 1, VAR_NUMERIC, {FMT_F, 5, 1}},
+ {"PLT", 1, VAR_NUMERIC, {FMT_F, 5, 1}},
+ {"PIN", 2, VAR_NUMERIC, {FMT_F, 5, 1}},
+ {"POUT", 2, VAR_NUMERIC, {FMT_F, 5, 1}},
+ {"FGT", 1, VAR_NUMERIC, {FMT_F, 5, 3}},
+ {"FLT", 1, VAR_NUMERIC, {FMT_F, 5, 3}},
+ {"FIN", 2, VAR_NUMERIC, {FMT_F, 5, 3}},
+ {"FOUT", 2, VAR_NUMERIC, {FMT_F, 5, 3}},
+ {"N", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
+ {"NU", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
+ {"NMISS", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
+ {"NUMISS", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
+ {"FIRST", 0, VAR_STRING, {-1, -1, -1}},
+ {"LAST", 0, VAR_STRING, {-1, -1, -1}},
+ {NULL, 0, -1, {-1, -1, -1}},
+ {"N", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
+ {"NU", 0, VAR_NUMERIC, {FMT_F, 7, 0}},
};
/* Missing value types. */
/* An entire AGGREGATE procedure. */
struct agr_proc
{
- /* We have either an output file or a sink. */
- struct any_writer *writer; /* Output file, or null if none. */
- struct case_sink *sink; /* Sink, or null if none. */
-
/* Break variables. */
- struct sort_criteria *sort; /* Sort criteria. */
- struct variable **break_vars; /* Break variables. */
+ struct case_ordering *sort; /* Sort criteria. */
+ const struct variable **break_vars; /* Break variables. */
size_t break_var_cnt; /* Number of break variables. */
struct ccase break_case; /* Last values of break variables. */
struct dictionary *dict; /* Aggregate dictionary. */
const struct dictionary *src_dict; /* Dict of the source */
int case_cnt; /* Counts aggregated cases. */
- struct ccase agr_case; /* Aggregate case for output. */
};
static void initialize_aggregate_info (struct agr_proc *,
const struct ccase *);
-
+static void accumulate_aggregate_info (struct agr_proc *,
+ const struct ccase *);
/* Prototypes. */
static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
struct agr_proc *);
static void agr_destroy (struct agr_proc *);
-static bool aggregate_single_case (struct agr_proc *agr,
- const struct ccase *input,
- struct ccase *output);
-static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
+static void dump_aggregate_info (struct agr_proc *agr,
+ struct casewriter *output);
\f
/* Parsing. */
struct dictionary *dict = dataset_dict (ds);
struct agr_proc agr;
struct file_handle *out_file = NULL;
+ struct casereader *input = NULL, *group;
+ struct casegrouper *grouper;
+ struct casewriter *output = NULL;
bool copy_documents = false;
bool presorted = false;
bool saw_direction;
+ bool ok;
memset(&agr, 0 , sizeof (agr));
agr.missing = ITEMWISE;
case_nullify (&agr.break_case);
-
+
agr.dict = dict_create ();
agr.src_dict = dict;
dict_set_label (agr.dict, dict_get_label (dict));
int i;
lex_match (lexer, '=');
- agr.sort = sort_parse_criteria (lexer, dict,
- &agr.break_vars, &agr.break_var_cnt,
- &saw_direction, NULL);
+ agr.sort = parse_case_ordering (lexer, dict,
+
+ &saw_direction);
if (agr.sort == NULL)
goto error;
+ case_ordering_get_vars (agr.sort,
+ &agr.break_vars, &agr.break_var_cnt);
for (i = 0; i < agr.break_var_cnt; i++)
dict_clone_var_assert (agr.dict, agr.break_vars[i],
/* Delete documents. */
if (!copy_documents)
- dict_set_documents (agr.dict, NULL);
+ dict_clear_documents (agr.dict);
/* Cancel SPLIT FILE. */
dict_set_split_vars (agr.dict, NULL, 0);
/* Initialize. */
agr.case_cnt = 0;
- case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
- /* Output to active file or external file? */
if (out_file == NULL)
{
/* The active file will be replaced by the aggregated data,
so TEMPORARY is moot. */
proc_cancel_temporary_transformations (ds);
+ proc_discard_output (ds);
+ output = autopaging_writer_create (dict_get_next_value_idx (agr.dict));
+ }
+ else
+ {
+ output = any_writer_open (out_file, agr.dict);
+ if (output == NULL)
+ goto error;
+ }
- if (agr.sort != NULL && !presorted)
- {
- if (!sort_active_file_in_place (ds, agr.sort))
- goto error;
- }
+ input = proc_open (ds);
+ if (agr.sort != NULL && !presorted)
+ {
+ input = sort_execute (input, agr.sort);
+ agr.sort = NULL;
+ }
- agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
- if (agr.sink->class->open != NULL)
- agr.sink->class->open (agr.sink);
- proc_set_sink (ds,
- create_case_sink (&null_sink_class,
- dict, NULL));
- if (!procedure (ds, agr_to_active_file, &agr))
- goto error;
- if (agr.case_cnt > 0)
- {
- dump_aggregate_info (&agr, &agr.agr_case);
- if (!agr.sink->class->write (agr.sink, &agr.agr_case))
- goto error;
- }
- discard_variables (ds);
- dict_destroy (dict);
- dataset_set_dict (ds, agr.dict);
- agr.dict = NULL;
- proc_set_source (ds,
- agr.sink->class->make_source (agr.sink));
- free_case_sink (agr.sink);
+ for (grouper = casegrouper_create_vars (input, agr.break_vars,
+ agr.break_var_cnt);
+ casegrouper_get_next_group (grouper, &group);
+ casereader_destroy (group))
+ {
+ struct ccase c;
+
+ if (!casereader_peek (group, 0, &c))
+ continue;
+ initialize_aggregate_info (&agr, &c);
+ case_destroy (&c);
+
+ for (; casereader_read (group, &c); case_destroy (&c))
+ accumulate_aggregate_info (&agr, &c);
+ dump_aggregate_info (&agr, output);
}
- else
+ if (!casegrouper_destroy (grouper))
+ goto error;
+
+ if (!proc_commit (ds))
{
- agr.writer = any_writer_open (out_file, agr.dict);
- if (agr.writer == NULL)
+ input = NULL;
+ goto error;
+ }
+ input = NULL;
+
+ if (out_file == NULL)
+ {
+ struct casereader *next_input = casewriter_make_reader (output);
+ if (next_input == NULL)
goto error;
- if (agr.sort != NULL && !presorted)
- {
- /* Sorting is needed. */
- struct casefile *dst;
- struct casereader *reader;
- struct ccase c;
- bool ok = true;
-
- dst = sort_active_file_to_casefile (ds, agr.sort);
- if (dst == NULL)
- goto error;
- reader = casefile_get_destructive_reader (dst);
- while (ok && casereader_read_xfer (reader, &c))
- {
- if (aggregate_single_case (&agr, &c, &agr.agr_case))
- ok = any_writer_write (agr.writer, &agr.agr_case);
- case_destroy (&c);
- }
- casereader_destroy (reader);
- if (ok)
- ok = !casefile_error (dst);
- casefile_destroy (dst);
- if (!ok)
- goto error;
- }
- else
- {
- /* Active file is already sorted. */
- if (!procedure (ds, presorted_agr_to_sysfile, &agr))
- goto error;
- }
-
- if (agr.case_cnt > 0)
- {
- dump_aggregate_info (&agr, &agr.agr_case);
- any_writer_write (agr.writer, &agr.agr_case);
- }
- if (any_writer_error (agr.writer))
+ proc_set_active_file (ds, next_input, agr.dict);
+ agr.dict = NULL;
+ }
+ else
+ {
+ ok = casewriter_destroy (output);
+ output = NULL;
+ if (!ok)
goto error;
}
return CMD_SUCCESS;
error:
+ if (input != NULL)
+ proc_commit (ds);
+ casewriter_destroy (output);
agr_destroy (&agr);
return CMD_CASCADING_FAILURE;
}
size_t n_dest;
struct string function_name;
- int include_missing;
+ enum mv_class exclude;
const struct agr_func *function;
int func_index;
union agr_argument arg[2];
- struct variable **src;
+ const struct variable **src;
size_t n_src;
size_t i;
n_src = 0;
arg[0].c = NULL;
arg[1].c = NULL;
+ ds_init_empty (&function_name);
/* Parse the list of target variables. */
while (!lex_match (lexer, '='))
goto error;
}
- include_missing = 0;
+ exclude = MV_ANY;
- ds_init_string (&function_name, lex_tokstr (lexer));
+ ds_assign_string (&function_name, lex_tokstr (lexer));
ds_chomp (&function_name, '.');
if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
- include_missing = 1;
+ exclude = MV_SYSTEM;
for (function = agr_func_tab; function->name; function++)
if (!strcasecmp (function->name, ds_cstr (&function_name)))
else if (function->n_args)
pv_opts |= PV_SAME_TYPE;
- if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
+ if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
goto error;
}
if (lex_token (lexer) == T_STRING)
{
arg[i].c = ds_xstrdup (lex_tokstr (lexer));
- type = ALPHA;
+ type = VAR_STRING;
}
else if (lex_is_number (lexer))
{
arg[i].f = lex_tokval (lexer);
- type = NUMERIC;
- } else {
- msg (SE, _("Missing argument %d to %s."), i + 1,
- function->name);
+ type = VAR_NUMERIC;
+ }
+ else
+ {
+ msg (SE, _("Missing argument %d to %s."),
+ (int) i + 1, function->name);
goto error;
}
v->string = xmalloc (var_get_width (src[i]));
}
- if (function->alpha_type == ALPHA)
+ if (function->alpha_type == VAR_STRING)
destvar = dict_clone_var (agr->dict, v->src, dest[i]);
else
{
assert (var_is_numeric (v->src)
- || function->alpha_type == NUMERIC);
+ || function->alpha_type == VAR_NUMERIC);
destvar = dict_create_var (agr->dict, dest[i], 0);
if (destvar != NULL)
{
v->dest = destvar;
}
- v->include_missing = include_missing;
+ v->exclude = exclude;
if (v->src != NULL)
{
{
struct agr_var *iter, *next;
- any_writer_close (agr->writer);
- if (agr->sort != NULL)
- sort_destroy_criteria (agr->sort);
+ case_ordering_destroy (agr->sort);
free (agr->break_vars);
case_destroy (&agr->break_case);
for (iter = agr->agr_vars; iter; iter = next)
}
if (agr->dict != NULL)
dict_destroy (agr->dict);
-
- case_destroy (&agr->agr_case);
}
\f
/* Execution. */
-static void accumulate_aggregate_info (struct agr_proc *,
- const struct ccase *);
-static void dump_aggregate_info (struct agr_proc *, struct ccase *);
-
-/* Processes a single case INPUT for aggregation. If output is
- warranted, writes it to OUTPUT and returns true.
- Otherwise, returns false and OUTPUT is unmodified. */
-static bool
-aggregate_single_case (struct agr_proc *agr,
- const struct ccase *input, struct ccase *output)
-{
- bool finished_group = false;
-
- if (agr->case_cnt++ == 0)
- initialize_aggregate_info (agr, input);
- else if (case_compare (&agr->break_case, input,
- agr->break_vars, agr->break_var_cnt))
- {
- dump_aggregate_info (agr, output);
- finished_group = true;
-
- initialize_aggregate_info (agr, input);
- }
-
- accumulate_aggregate_info (agr, input);
- return finished_group;
-}
-
/* Accumulates aggregation data from the case INPUT. */
static void
-accumulate_aggregate_info (struct agr_proc *agr,
- const struct ccase *input)
+accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
{
struct agr_var *iter;
double weight;
for (iter = agr->agr_vars; iter; iter = iter->next)
if (iter->src)
{
- const union value *v = case_data (input, iter->src->fv);
+ const union value *v = case_data (input, iter->src);
int src_width = var_get_width (iter->src);
- if (iter->include_missing
- ? var_is_numeric (iter->src) && v->f == SYSMIS
- : var_is_value_missing (iter->src, v))
+ if (var_is_value_missing (iter->src, v, iter->exclude))
{
switch (iter->function)
{
iter->int1++;
break;
}
- iter->missing = 1;
+ iter->saw_missing = true;
continue;
}
}
}
-/* We've come to a record that differs from the previous in one or
- more of the break variables. Make an output record from the
- accumulated statistics in the OUTPUT case. */
+/* Writes an aggregated record to OUTPUT. */
static void
-dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
+dump_aggregate_info (struct agr_proc *agr, struct casewriter *output)
{
+ struct ccase c;
+
+ case_create (&c, dict_get_next_value_idx (agr->dict));
+
{
int value_idx = 0;
int i;
for (i = 0; i < agr->break_var_cnt; i++)
{
- struct variable *v = agr->break_vars[i];
+ const struct variable *v = agr->break_vars[i];
size_t value_cnt = var_get_value_cnt (v);
- memcpy (case_data_rw (output, value_idx),
- case_data (&agr->break_case, v->fv),
+ memcpy (case_data_rw_idx (&c, value_idx),
+ case_data (&agr->break_case, v),
sizeof (union value) * value_cnt);
value_idx += value_cnt;
}
for (i = agr->agr_vars; i; i = i->next)
{
- union value *v = case_data_rw (output, i->dest->fv);
+ union value *v = case_data_rw (&c, i->dest);
- if (agr->missing == COLUMNWISE && i->missing != 0
+ if (agr->missing == COLUMNWISE && i->saw_missing
&& (i->function & FUNC) != N && (i->function & FUNC) != NU
&& (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
{
}
}
}
+
+ casewriter_write (output, &c);
}
/* Resets the state for all the aggregate functions. */
for (iter = agr->agr_vars; iter; iter = iter->next)
{
- iter->missing = 0;
+ iter->saw_missing = false;
iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
iter->int1 = iter->int2 = 0;
switch (iter->function)
}
}
}
-\f
-/* Aggregate each case as it comes through. Cases which aren't needed
- are dropped.
- Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return agr->sink->class->write (agr->sink, &agr->agr_case);
-
- return true;
-}
-
-/* Aggregate the current case and output it if we passed a
- breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_,
- const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return any_writer_write (agr->writer, &agr->agr_case);
-
- return true;
-}