You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- 02111-1307, USA. */
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA. */
#include <config.h>
-#include <assert.h>
+#include "error.h"
#include <stdlib.h>
#include "alloc.h"
+#include "case.h"
+#include "casefile.h"
#include "command.h"
+#include "dictionary.h"
#include "error.h"
#include "file-handle.h"
#include "lexer.h"
#include "misc.h"
+#include "moments.h"
+#include "pool.h"
#include "settings.h"
-#include "sfm.h"
+#include "sfm-write.h"
#include "sort.h"
-#include "stats.h"
#include "str.h"
#include "var.h"
#include "vfm.h"
#include "vfmP.h"
-#include "debug-print.h"
-
/* Specifies how to make an aggregate variable. */
struct agr_var
{
int int1, int2;
char *string;
int missing;
+ struct moments1 *moments;
};
/* Aggregation functions. */
};
/* Attributes of aggregation functions. */
-static struct agr_func agr_func_tab[] =
+static const struct agr_func agr_func_tab[] =
{
{"<NONE>", 0, -1, {0, 0, 0}},
{"SUM", 0, -1, {FMT_F, 8, 2}},
{"NU", 0, NUMERIC, {FMT_F, 7, 0}},
};
-/* Output file, or NULL for the active file. */
-static struct file_handle *outfile;
-
/* Missing value types. */
-enum
+enum missing_treatment
{
ITEMWISE, /* Missing values item by item. */
COLUMNWISE /* Missing values column by column. */
};
-/* ITEMWISE or COLUMNWISE. */
-static int missing;
-
-/* Aggregate variables. */
-static struct agr_var *agr_first, *agr_next;
-
-/* Aggregate dictionary. */
-static struct dictionary *agr_dict;
-
-/* Number of cases passed through aggregation. */
-static int case_count;
-
-/* Last values of the break variables. */
-static union value *prev_case;
-
-/* Buffers for use by the 10x transformation. */
-static flt64 *buf64_1xx;
-static struct ccase *buf_1xx;
+/* An entire AGGREGATE procedure. */
+struct agr_proc
+ {
+ /* We have either an output file or a sink. */
+ struct sfm_writer *writer; /* Output file, or null if none. */
+ struct case_sink *sink; /* Sink, or null if none. */
+
+ /* Break variables. */
+ struct sort_criteria *sort; /* Sort criteria. */
+ struct variable **break_vars; /* Break variables. */
+ size_t break_var_cnt; /* Number of break variables. */
+ struct ccase break_case; /* Last values of break variables. */
+
+ enum missing_treatment missing; /* How to treat missing values. */
+ struct agr_var *agr_vars; /* First aggregate variable. */
+ struct dictionary *dict; /* Aggregate dictionary. */
+ int case_cnt; /* Counts aggregated cases. */
+ struct ccase agr_case; /* Aggregate case for output. */
+ };
-static void initialize_aggregate_info (void);
+static void initialize_aggregate_info (struct agr_proc *,
+ const struct ccase *);
/* Prototypes. */
-static int parse_aggregate_functions (void);
-static void free_aggregate_functions (void);
-static int aggregate_single_case (struct ccase *input, struct ccase *output);
-static int create_sysfile (void);
-
-static int agr_00x_trns_proc (struct trns_header *, struct ccase *);
-static void agr_00x_end_func (void);
-static int agr_10x_trns_proc (struct trns_header *, struct ccase *);
-static void agr_10x_trns_free (struct trns_header *);
-static void agr_10x_end_func (void);
-static int agr_11x_func (void);
-
-#if DEBUGGING
-static void debug_print (int flags);
-#endif
+static int parse_aggregate_functions (struct agr_proc *);
+static void agr_destroy (struct agr_proc *);
+static int aggregate_single_case (struct agr_proc *agr,
+ const struct ccase *input,
+ struct ccase *output);
+static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
+
+/* Aggregating to the active file. */
+static int agr_to_active_file (struct ccase *, void *aux);
+
+/* Aggregating to a system file. */
+static int presorted_agr_to_sysfile (struct ccase *, void *aux);
\f
/* Parsing. */
int
cmd_aggregate (void)
{
- /* From sort.c. */
- int parse_sort_variables (void);
-
- /* Have we seen these subcommands? */
- unsigned seen = 0;
+ struct agr_proc agr;
+ struct file_handle *out_file = NULL;
- outfile = NULL;
- missing = ITEMWISE;
- v_sort = NULL;
- prev_case = NULL;
+ bool copy_documents = false;
+ bool presorted = false;
+ bool saw_direction;
+
+ memset(&agr, 0 , sizeof (agr));
+ agr.missing = ITEMWISE;
+ case_nullify (&agr.break_case);
- agr_dict = dict_create ();
- dict_set_label (agr_dict, dict_get_label (default_dict));
- dict_set_documents (agr_dict, dict_get_documents (default_dict));
+ agr.dict = dict_create ();
+ dict_set_label (agr.dict, dict_get_label (default_dict));
+ dict_set_documents (agr.dict, dict_get_documents (default_dict));
+
+ /* OUTFILE subcommand must be first. */
+ if (!lex_force_match_id ("OUTFILE"))
+ goto error;
+ lex_match ('=');
+ if (!lex_match ('*'))
+ {
+ out_file = fh_parse ();
+ if (out_file == NULL)
+ goto error;
+ }
- lex_match_id ("AGGREGATE");
-
/* Read most of the subcommands. */
for (;;)
{
- lex_match('/');
+ lex_match ('/');
- if (lex_match_id ("OUTFILE"))
- {
- if (seen & 1)
- {
- free (v_sort);
- dict_destroy (agr_dict);
- msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
- return CMD_FAILURE;
- }
- seen |= 1;
-
- lex_match ('=');
- if (lex_match ('*'))
- outfile = NULL;
- else
- {
- outfile = fh_parse_file_handle ();
- if (outfile == NULL)
- {
- free (v_sort);
- dict_destroy (agr_dict);
- return CMD_FAILURE;
- }
- }
- }
- else if (lex_match_id ("MISSING"))
+ if (lex_match_id ("MISSING"))
{
lex_match ('=');
if (!lex_match_id ("COLUMNWISE"))
{
- free (v_sort);
- dict_destroy (agr_dict);
lex_error (_("while expecting COLUMNWISE"));
- return CMD_FAILURE;
+ goto error;
}
- missing = COLUMNWISE;
+ agr.missing = COLUMNWISE;
}
else if (lex_match_id ("DOCUMENT"))
- seen |= 2;
+ copy_documents = true;
else if (lex_match_id ("PRESORTED"))
- seen |= 4;
+ presorted = true;
else if (lex_match_id ("BREAK"))
{
- if (seen & 8)
- {
- free (v_sort);
- dict_destroy (agr_dict);
- msg (SE, _("%s subcommand given multiple times."),"BREAK");
- return CMD_FAILURE;
- }
- seen |= 8;
+ int i;
lex_match ('=');
- if (!parse_sort_variables ())
- {
- dict_destroy (agr_dict);
- return CMD_FAILURE;
- }
+ agr.sort = sort_parse_criteria (default_dict,
+ &agr.break_vars, &agr.break_var_cnt,
+ &saw_direction);
+ if (agr.sort == NULL)
+ goto error;
- {
- int i;
-
- for (i = 0; i < nv_sort; i++)
- {
- struct variable *v;
-
- v = dict_clone_var (agr_dict, v_sort[i], v_sort[i]->name);
- assert (v != NULL);
- }
- }
+ for (i = 0; i < agr.break_var_cnt; i++)
+ {
+ struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
+ agr.break_vars[i]->name,
+ agr.break_vars[i]->longname
+ );
+ assert (v != NULL);
+ }
+
+ /* BREAK must follow the options. */
+ break;
}
- else break;
+ else
+ {
+ lex_error (_("expecting BREAK"));
+ goto error;
+ }
}
-
- /* Check for proper syntax. */
- if (!(seen & 8))
- msg (SW, _("BREAK subcommand not specified."));
+ if (presorted && saw_direction)
+ msg (SW, _("When PRESORTED is specified, specifying sorting directions "
+ "with (A) or (D) has no effect. Output data will be sorted "
+ "the same way as the input data."));
/* Read in the aggregate functions. */
- if (!parse_aggregate_functions ())
- {
- free_aggregate_functions ();
- free (v_sort);
- return CMD_FAILURE;
- }
+ lex_match ('/');
+ if (!parse_aggregate_functions (&agr))
+ goto error;
/* Delete documents. */
- if (!(seen & 2))
- dict_set_documents (agr_dict, NULL);
+ if (!copy_documents)
+ dict_set_documents (agr.dict, NULL);
/* Cancel SPLIT FILE. */
- dict_set_split_vars (agr_dict, NULL, 0);
+ dict_set_split_vars (agr.dict, NULL, 0);
-#if DEBUGGING
- debug_print (seen);
-#endif
-
/* Initialize. */
- case_count = 0;
- initialize_aggregate_info ();
-
- /* How to implement all this... There are three important variables:
- whether output is going to the active file (0) or a separate file
- (1); whether the input data is presorted (0) or needs sorting
- (1); whether there is a temporary transformation (1) or not (0).
- The eight cases are as follows:
-
- 000 (0): Pass it through an aggregate transformation that
- modifies the data.
-
- 001 (1): Cancel the temporary transformation and handle as 000.
-
- 010 (2): Set up a SORT CASES and aggregate the output, writing
- the results to the active file.
-
- 011 (3): Cancel the temporary transformation and handle as 010.
-
- 100 (4): Pass it through an aggregate transformation that doesn't
- modify the data but merely writes it to the output file.
-
- 101 (5): Handled as 100.
-
- 110 (6): Set up a SORT CASES and capture the output, aggregate
- it, write it to the output file without modifying the active
- file.
-
- 111 (7): Handled as 110. */
-
- {
- unsigned type = 0;
-
- if (outfile != NULL)
- type |= 4;
- if (nv_sort != 0 && (seen & 4) == 0)
- type |= 2;
- if (temporary)
- type |= 1;
-
- switch (type)
- {
- case 3:
- cancel_temporary ();
- /* fall through */
- case 2:
- sort_cases (0);
- goto case0;
-
- case 1:
- cancel_temporary ();
- /* fall through */
- case 0:
- case0:
- {
- struct trns_header *t = xmalloc (sizeof *t);
- t->proc = agr_00x_trns_proc;
- t->free = NULL;
- add_transformation (t);
-
- temporary = 2;
- temp_dict = agr_dict;
- temp_trns = n_trns;
-
- agr_dict = NULL;
-
- procedure (NULL, NULL, agr_00x_end_func);
- break;
- }
+ agr.case_cnt = 0;
+ case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
- case 4:
- case 5:
- {
- if (!create_sysfile ())
- goto lossage;
-
- {
- struct trns_header *t = xmalloc (sizeof *t);
- t->proc = agr_10x_trns_proc;
- t->free = agr_10x_trns_free;
- add_transformation (t);
-
- procedure (NULL, NULL, agr_10x_end_func);
- }
-
- break;
- }
-
- case 6:
- case 7:
- sort_cases (1);
-
- if (!create_sysfile ())
- goto lossage;
- read_sort_output (agr_11x_func);
-
- {
- struct ccase *save_temp_case = temp_case;
- temp_case = NULL;
- agr_11x_func ();
- temp_case = save_temp_case;
- }
-
- break;
-
- default:
- assert (0);
- }
- }
-
- free (buf64_1xx);
- free (buf_1xx);
-
- /* Clean up. */
- free (v_sort);
- free_aggregate_functions ();
- free (prev_case);
+ /* Output to active file or external file? */
+ if (out_file == NULL)
+ {
+ /* The active file will be replaced by the aggregated data,
+ so TEMPORARY is moot. */
+ cancel_temporary ();
+
+ if (agr.sort != NULL && !presorted)
+ sort_active_file_in_place (agr.sort);
+
+ agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
+ if (agr.sink->class->open != NULL)
+ agr.sink->class->open (agr.sink);
+ vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
+ procedure (agr_to_active_file, &agr);
+ if (agr.case_cnt > 0)
+ {
+ dump_aggregate_info (&agr, &agr.agr_case);
+ agr.sink->class->write (agr.sink, &agr.agr_case);
+ }
+ dict_destroy (default_dict);
+ default_dict = agr.dict;
+ agr.dict = NULL;
+ vfm_source = agr.sink->class->make_source (agr.sink);
+ free_case_sink (agr.sink);
+ }
+ else
+ {
+ agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
+ if (agr.writer == NULL)
+ goto error;
+
+ if (agr.sort != NULL && !presorted)
+ {
+ /* Sorting is needed. */
+ struct casefile *dst;
+ struct casereader *reader;
+ struct ccase c;
+
+ dst = sort_active_file_to_casefile (agr.sort);
+ if (dst == NULL)
+ goto error;
+ reader = casefile_get_destructive_reader (dst);
+ while (casereader_read_xfer (reader, &c))
+ {
+ if (aggregate_single_case (&agr, &c, &agr.agr_case))
+ sfm_write_case (agr.writer, &agr.agr_case);
+ case_destroy (&c);
+ }
+ casereader_destroy (reader);
+ casefile_destroy (dst);
+ }
+ else
+ {
+ /* Active file is already sorted. */
+ procedure (presorted_agr_to_sysfile, &agr);
+ }
+
+ if (agr.case_cnt > 0)
+ {
+ dump_aggregate_info (&agr, &agr.agr_case);
+ sfm_write_case (agr.writer, &agr.agr_case);
+ }
+ }
+ agr_destroy (&agr);
return CMD_SUCCESS;
-lossage:
- /* Clean up. */
- free (v_sort);
- free_aggregate_functions ();
- free (prev_case);
-
+error:
+ agr_destroy (&agr);
return CMD_FAILURE;
}
-/* Create a system file for use in aggregation to an external file,
- and allocate temporary buffers for writing out cases. */
-static int
-create_sysfile (void)
-{
- struct sfm_write_info w;
- w.h = outfile;
- w.dict = agr_dict;
- w.compress = set_scompression;
- if (!sfm_write_dictionary (&w))
- {
- free_aggregate_functions ();
- free (v_sort);
- dict_destroy (agr_dict);
- return 0;
- }
-
- buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size);
- buf_1xx = xmalloc (sizeof (struct ccase)
- + (sizeof (union value)
- * (dict_get_value_cnt (agr_dict) - 1)));
-
- return 1;
-}
-
/* Parse all the aggregate functions. */
static int
-parse_aggregate_functions (void)
+parse_aggregate_functions (struct agr_proc *agr)
{
- agr_first = agr_next = NULL;
+ struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
/* Parse everything. */
+ tail = NULL;
for (;;)
{
char **dest;
int n_dest;
int include_missing;
- struct agr_func *function;
+ const struct agr_func *function;
int func_index;
union value arg[2];
dest_label = NULL;
n_dest = 0;
src = NULL;
+ function = NULL;
n_src = 0;
arg[0].c = NULL;
arg[1].c = NULL;
{
int n_dest_prev = n_dest;
- if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
- goto lossage;
+ if (!parse_DATA_LIST_vars (&dest, &n_dest,
+ PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
+ goto error;
/* Assign empty labels. */
{
if (token == T_STRING)
{
- ds_truncate (&tokstr, 120);
- dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
+ ds_truncate (&tokstr, 255);
+ dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
lex_get ();
}
}
if (token != T_ID)
{
lex_error (_("expecting aggregation function"));
- goto lossage;
+ goto error;
}
include_missing = 0;
}
for (function = agr_func_tab; function->name; function++)
- if (!strcmp (function->name, tokid))
+ if (!strcasecmp (function->name, tokid))
break;
if (NULL == function->name)
{
msg (SE, _("Unknown aggregation function %s."), tokid);
- goto lossage;
+ goto error;
}
func_index = function - agr_func_tab;
lex_get ();
else
{
lex_error (_("expecting `('"));
- goto lossage;
+ goto error;
}
- } else {
+ }
+ else
+ {
/* Parse list of source variables. */
{
int pv_opts = PV_NO_SCRATCH;
pv_opts |= PV_SAME_TYPE;
if (!parse_variables (default_dict, &src, &n_src, pv_opts))
- goto lossage;
+ goto error;
}
/* Parse function arguments, for those functions that
lex_match (',');
if (token == T_STRING)
{
- arg[i].c = xstrdup (ds_value (&tokstr));
+ arg[i].c = xstrdup (ds_c_str (&tokstr));
type = ALPHA;
}
- else if (token == T_NUM)
+ else if (lex_is_number ())
{
arg[i].f = tokval;
type = NUMERIC;
} else {
msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
- goto lossage;
+ goto error;
}
lex_get ();
msg (SE, _("Arguments to %s must be of same type as "
"source variables."),
function->name);
- goto lossage;
+ goto error;
}
}
if (!lex_match(')'))
{
lex_error (_("expecting `)'"));
- goto lossage;
+ goto error;
}
- /* Now check that the number of source variables match the
- number of target variables. Do this here because if we
- do it earlier then the user can get very misleading error
- messages; i.e., `AGGREGATE x=SUM(y t).' will get this
- error message when a proper message would be more like
- `unknown variable t'. */
+ /* Now check that the number of source variables match
+ the number of target variables. If we check earlier
+ than this, the user can get very misleading error
+ message, i.e. `AGGREGATE x=SUM(y t).' will get this
+ error message when a proper message would be more
+ like `unknown variable t'. */
if (n_src != n_dest)
{
msg (SE, _("Number of source variables (%d) does not match "
"number of target variables (%d)."),
n_src, n_dest);
- goto lossage;
+ goto error;
}
+
+ if ((func_index == PIN || func_index == POUT
+ || func_index == FIN || func_index == FOUT)
+ && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
+ || (src[0]->type == ALPHA
+ && st_compare_pad (arg[0].c, strlen (arg[0].c),
+ arg[1].c, strlen (arg[1].c)) > 0)))
+ {
+ union value t = arg[0];
+ arg[0] = arg[1];
+ arg[1] = t;
+
+ msg (SW, _("The value arguments passed to the %s function "
+ "are out-of-order. They will be treated as if "
+ "they had been specified in the correct order."),
+ function->name);
+ }
}
/* Finally add these to the linked list of aggregation
struct agr_var *v = xmalloc (sizeof *v);
/* Add variable to chain. */
- if (agr_first)
- agr_next = agr_next->next = v;
+ if (agr->agr_vars != NULL)
+ tail->next = v;
else
- agr_first = agr_next = v;
- agr_next->next = NULL;
+ agr->agr_vars = v;
+ tail = v;
+ tail->next = NULL;
+ v->moments = NULL;
/* Create the target variable in the aggregate
dictionary. */
{
+ static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
struct variable *destvar;
- agr_next->function = func_index;
+ v->function = func_index;
if (src)
{
- int output_width;
-
- agr_next->src = src[i];
+ v->src = src[i];
if (src[i]->type == ALPHA)
{
- agr_next->function |= FSTRING;
- agr_next->string = xmalloc (src[i]->width);
+ v->function |= FSTRING;
+ v->string = xmalloc (src[i]->width);
}
-
- if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC)
- output_width = 0;
- else
- output_width = agr_next->src->width;
if (function->alpha_type == ALPHA)
- destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]);
- else
- {
- destvar = dict_create_var (agr_dict, dest[i], output_width);
- if (output_width == 0)
- destvar->print = destvar->write = function->format;
- if (output_width == 0 && dict_get_weight (default_dict) != NULL
- && (func_index == N || func_index == N_NO_VARS
- || func_index == NU || func_index == NU_NO_VARS))
- {
- struct fmt_spec f = {FMT_F, 8, 2};
-
- destvar->print = destvar->write = f;
- }
- }
+ destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] );
+ else if (v->src->type == NUMERIC
+ || function->alpha_type == NUMERIC)
+ {
+ destvar = dict_create_var (agr->dict, dest[i], 0);
+ if (destvar != NULL)
+ {
+ if ((func_index == N || func_index == NMISS)
+ && dict_get_weight (default_dict) != NULL)
+ destvar->print = destvar->write = f8_2;
+ else
+ destvar->print = destvar->write = function->format;
+ }
+ }
} else {
- agr_next->src = NULL;
- destvar = dict_create_var (agr_dict, dest[i], 0);
+ v->src = NULL;
+ destvar = dict_create_var (agr->dict, dest[i], 0);
+ if (func_index == N_NO_VARS
+ && dict_get_weight (default_dict) != NULL)
+ destvar->print = destvar->write = f8_2;
+ else
+ destvar->print = destvar->write = function->format;
}
if (!destvar)
"the aggregate variables and the break "
"variables."),
dest[i]);
- free (dest[i]);
- goto lossage;
+ goto error;
}
free (dest[i]);
+ destvar->init = 0;
if (dest_label[i])
{
destvar->label = dest_label[i];
dest_label[i] = NULL;
}
- else if (function->alpha_type == ALPHA)
- destvar->print = destvar->write = function->format;
- agr_next->dest = destvar;
+ v->dest = destvar;
}
- agr_next->include_missing = include_missing;
+ v->include_missing = include_missing;
- if (agr_next->src != NULL)
+ if (v->src != NULL)
{
int j;
- if (agr_next->src->type == NUMERIC)
+ if (v->src->type == NUMERIC)
for (j = 0; j < function->n_args; j++)
- agr_next->arg[j].f = arg[j].f;
+ v->arg[j].f = arg[j].f;
else
for (j = 0; j < function->n_args; j++)
- agr_next->arg[j].c = xstrdup (arg[j].c);
+ v->arg[j].c = xstrdup (arg[j].c);
}
}
}
continue;
- lossage:
+ error:
for (i = 0; i < n_dest; i++)
{
free (dest[i]);
if (src && n_src && src[0]->type == ALPHA)
for (i = 0; i < function->n_args; i++)
{
- free(arg[i].c);
+ free (arg[i].c);
arg[i].c = NULL;
}
free (src);
}
}
-/* Frees all the state for the AGGREGATE procedure. */
+/* Destroys AGR. */
static void
-free_aggregate_functions (void)
+agr_destroy (struct agr_proc *agr)
{
struct agr_var *iter, *next;
- if (agr_dict)
- dict_destroy (agr_dict);
- for (iter = agr_first; iter; iter = next)
+ sfm_close_writer (agr->writer);
+ if (agr->sort != NULL)
+ sort_destroy_criteria (agr->sort);
+ free (agr->break_vars);
+ case_destroy (&agr->break_case);
+ for (iter = agr->agr_vars; iter; iter = next)
{
next = iter->next;
free (iter->arg[i].c);
free (iter->string);
}
+ else if (iter->function == SD)
+ moments1_destroy (iter->moments);
free (iter);
}
+ if (agr->dict != NULL)
+ dict_destroy (agr->dict);
+
+ case_destroy (&agr->agr_case);
}
\f
/* Execution. */
-static void accumulate_aggregate_info (struct ccase *input);
-static void dump_aggregate_info (struct ccase *output);
+static void accumulate_aggregate_info (struct agr_proc *,
+ const struct ccase *);
+static void dump_aggregate_info (struct agr_proc *, struct ccase *);
/* Processes a single case INPUT for aggregation. If output is
- warranted, it is written to case OUTPUT, which may be (but need not
- be) an alias to INPUT. Returns -1 when output is performed, -2
- otherwise. */
-/* The code in this function has an eerie similarity to
- vfm.c:SPLIT_FILE_procfunc()... */
+ warranted, writes it to OUTPUT and returns nonzero.
+ Otherwise, returns zero and OUTPUT is unmodified. */
static int
-aggregate_single_case (struct ccase *input, struct ccase *output)
+aggregate_single_case (struct agr_proc *agr,
+ const struct ccase *input, struct ccase *output)
{
- /* The first case always begins a new break group. We also need to
- preserve the values of the case for later comparison. */
- if (case_count++ == 0)
+ bool finished_group = false;
+
+ if (agr->case_cnt++ == 0)
+ initialize_aggregate_info (agr, input);
+ else if (case_compare (&agr->break_case, input,
+ agr->break_vars, agr->break_var_cnt))
{
- int n_elem = 0;
-
- {
- int i;
+ dump_aggregate_info (agr, output);
+ finished_group = true;
- for (i = 0; i < nv_sort; i++)
- n_elem += v_sort[i]->nv;
- }
-
- prev_case = xmalloc (sizeof *prev_case * n_elem);
-
- /* Copy INPUT into prev_case. */
- {
- union value *iter = prev_case;
- int i;
-
- for (i = 0; i < nv_sort; i++)
- {
- struct variable *v = v_sort[i];
-
- if (v->type == NUMERIC)
- (iter++)->f = input->data[v->fv].f;
- else
- {
- memcpy (iter->s, input->data[v->fv].s, v->width);
- iter += v->nv;
- }
- }
- }
-
- accumulate_aggregate_info (input);
-
- return -2;
+ initialize_aggregate_info (agr, input);
}
-
- /* Compare the value of each break variable to the values on the
- previous case. */
- {
- union value *iter = prev_case;
- int i;
-
- for (i = 0; i < nv_sort; i++)
- {
- struct variable *v = v_sort[i];
-
- switch (v->type)
- {
- case NUMERIC:
- if (input->data[v->fv].f != iter->f)
- goto not_equal;
- iter++;
- break;
- case ALPHA:
- if (memcmp (input->data[v->fv].s, iter->s, v->width))
- goto not_equal;
- iter += v->nv;
- break;
- default:
- assert (0);
- }
- }
- }
- accumulate_aggregate_info (input);
-
- return -2;
-
-not_equal:
- /* The values of the break variable are different from the values on
- the previous case. That means that it's time to dump aggregate
- info. */
- dump_aggregate_info (output);
- initialize_aggregate_info ();
- accumulate_aggregate_info (input);
-
- /* Copy INPUT into prev_case. */
- {
- union value *iter = prev_case;
- int i;
-
- for (i = 0; i < nv_sort; i++)
- {
- struct variable *v = v_sort[i];
-
- if (v->type == NUMERIC)
- (iter++)->f = input->data[v->fv].f;
- else
- {
- memcpy (iter->s, input->data[v->fv].s, v->width);
- iter += v->nv;
- }
- }
- }
-
- return -1;
+ accumulate_aggregate_info (agr, input);
+ return finished_group;
}
/* Accumulates aggregation data from the case INPUT. */
static void
-accumulate_aggregate_info (struct ccase *input)
+accumulate_aggregate_info (struct agr_proc *agr,
+ const struct ccase *input)
{
struct agr_var *iter;
double weight;
+ int bad_warn = 1;
- weight = dict_get_case_weight (default_dict, input);
+ weight = dict_get_case_weight (default_dict, input, &bad_warn);
- for (iter = agr_first; iter; iter = iter->next)
+ for (iter = agr->agr_vars; iter; iter = iter->next)
if (iter->src)
{
- union value *v = &input->data[iter->src->fv];
+ const union value *v = case_data (input, iter->src->fv);
if ((!iter->include_missing && is_missing (v, iter->src))
|| (iter->include_missing && iter->src->type == NUMERIC
switch (iter->function)
{
case NMISS:
+ case NMISS | FSTRING:
iter->dbl[0] += weight;
break;
case NUMISS:
+ case NUMISS | FSTRING:
iter->int1++;
break;
}
switch (iter->function)
{
case SUM:
- iter->dbl[0] += v->f;
+ iter->dbl[0] += v->f * weight;
+ iter->int1 = 1;
break;
case MEAN:
iter->dbl[0] += v->f * weight;
iter->dbl[1] += weight;
break;
- case SD:
- {
- double product = v->f * weight;
- iter->dbl[0] += product;
- iter->dbl[1] += product * v->f;
- iter->dbl[2] += weight;
- break;
- }
+ case SD:
+ moments1_add (iter->moments, v->f, weight);
+ break;
case MAX:
iter->dbl[0] = max (iter->dbl[0], v->f);
iter->int1 = 1;
case FOUT | FSTRING:
case POUT | FSTRING:
if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
- && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
+ || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
iter->dbl[0] += weight;
iter->dbl[1] += weight;
break;
case N:
+ case N | FSTRING:
iter->dbl[0] += weight;
break;
case NU:
+ case NU | FSTRING:
iter->int1++;
break;
case FIRST:
memcpy (iter->string, v->s, iter->src->width);
iter->int1 = 1;
break;
+ case NMISS:
+ case NMISS | FSTRING:
+ case NUMISS:
+ case NUMISS | FSTRING:
+ /* Our value is not missing or it would have been
+ caught earlier. Nothing to do. */
+ break;
default:
assert (0);
}
more of the break variables. Make an output record from the
accumulated statistics in the OUTPUT case. */
static void
-dump_aggregate_info (struct ccase *output)
+dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
{
- debug_printf (("(dumping "));
-
{
- int n_elem = 0;
-
- {
- int i;
+ int value_idx = 0;
+ int i;
- for (i = 0; i < nv_sort; i++)
- n_elem += v_sort[i]->nv;
- }
- debug_printf (("n_elem=%d:", n_elem));
- memcpy (output->data, prev_case, sizeof (union value) * n_elem);
+ for (i = 0; i < agr->break_var_cnt; i++)
+ {
+ struct variable *v = agr->break_vars[i];
+ memcpy (case_data_rw (output, value_idx),
+ case_data (&agr->break_case, v->fv),
+ sizeof (union value) * v->nv);
+ value_idx += v->nv;
+ }
}
{
struct agr_var *i;
- for (i = agr_first; i; i = i->next)
+ for (i = agr->agr_vars; i; i = i->next)
{
- union value *v = &output->data[i->dest->fv];
-
- debug_printf ((" %d,%d", i->dest->fv, i->dest->nv));
+ union value *v = case_data_rw (output, i->dest->fv);
- if (missing == COLUMNWISE && i->missing != 0
+ if (agr->missing == COLUMNWISE && i->missing != 0
&& (i->function & FUNC) != N && (i->function & FUNC) != NU
&& (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
{
- if (i->function & FSTRING)
+ if (i->dest->type == ALPHA)
memset (v->s, ' ', i->dest->width);
else
v->f = SYSMIS;
switch (i->function)
{
case SUM:
- v->f = i->dbl[0];
+ v->f = i->int1 ? i->dbl[0] : SYSMIS;
break;
case MEAN:
v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
break;
case SD:
- v->f = ((i->dbl[2] > 1.0)
- ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
- : SYSMIS);
+ {
+ double variance;
+
+ /* FIXME: we should use two passes. */
+ moments1_calculate (i->moments, NULL, NULL, &variance,
+ NULL, NULL);
+ if (variance != SYSMIS)
+ v->f = sqrt (variance);
+ else
+ v->f = SYSMIS;
+ }
break;
case MAX:
case MIN:
else
memset (v->s, ' ', i->dest->width);
break;
- case FGT | FSTRING:
- case FLT | FSTRING:
- case FIN | FSTRING:
- case FOUT | FSTRING:
- v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
- break;
case FGT:
+ case FGT | FSTRING:
case FLT:
+ case FLT | FSTRING:
case FIN:
+ case FIN | FSTRING:
case FOUT:
+ case FOUT | FSTRING:
v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
break;
case PGT:
v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
break;
case N:
+ case N | FSTRING:
v->f = i->dbl[0];
break;
case NU:
+ case NU | FSTRING:
v->f = i->int1;
break;
case FIRST:
v->f = i->int1;
break;
case NMISS:
+ case NMISS | FSTRING:
v->f = i->dbl[0];
break;
case NUMISS:
+ case NUMISS | FSTRING:
v->f = i->int1;
break;
default:
}
}
}
- debug_printf ((") "));
}
/* Resets the state for all the aggregate functions. */
static void
-initialize_aggregate_info (void)
+initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
{
struct agr_var *iter;
- for (iter = agr_first; iter; iter = iter->next)
+ case_destroy (&agr->break_case);
+ case_clone (&agr->break_case, input);
+
+ for (iter = agr->agr_vars; iter; iter = iter->next)
{
iter->missing = 0;
+ iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
+ iter->int1 = iter->int2 = 0;
switch (iter->function)
{
case MIN:
case MAX | FSTRING:
memset (iter->string, 0, iter->src->width);
break;
- default:
- iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
- iter->int1 = iter->int2 = 0;
- break;
+ case SD:
+ if (iter->moments == NULL)
+ iter->moments = moments1_create (MOMENT_VARIANCE);
+ else
+ moments1_clear (iter->moments);
+ break;
+ default:
+ break;
}
}
}
/* Aggregate each case as it comes through. Cases which aren't needed
are dropped. */
static int
-agr_00x_trns_proc (struct trns_header *h unused, struct ccase *c)
+agr_to_active_file (struct ccase *c, void *agr_)
{
- int code = aggregate_single_case (c, compaction_case);
- debug_printf (("%d ", code));
- return code;
-}
-
-/* Output the last aggregate case. It's okay to call the vfm_sink's
- write() method here because end_func is called so soon after all
- the cases have been output; very little has been cleaned up at this
- point. */
-static void
-agr_00x_end_func (void)
-{
- /* Ensure that info for the last break group gets written to the
- active file. */
- dump_aggregate_info (compaction_case);
- vfm_sink_info.ncases++;
- vfm_sink->write ();
-}
-
-/* Transform the aggregate case buf_1xx, in internal format, to system
- file format, in buf64_1xx, and write the resultant case to the
- system file. */
-static void
-write_case_to_sfm (void)
-{
- flt64 *p = buf64_1xx;
- int i;
+ struct agr_proc *agr = agr_;
- for (i = 0; i < dict_get_var_cnt (agr_dict); i++)
- {
- struct variable *v = dict_get_var (agr_dict, i);
-
- if (v->type == NUMERIC)
- {
- double src = buf_1xx->data[v->fv].f;
- if (src == SYSMIS)
- *p++ = -FLT64_MAX;
- else
- *p++ = src;
- }
- else
- {
- memcpy (p, buf_1xx->data[v->fv].s, v->width);
- memset (&((char *) p)[v->width], ' ',
- REM_RND_UP (v->width, sizeof (flt64)));
- p += DIV_RND_UP (v->width, sizeof (flt64));
- }
- }
+ if (aggregate_single_case (agr, c, &agr->agr_case))
+ agr->sink->class->write (agr->sink, &agr->agr_case);
- sfm_write_case (outfile, buf64_1xx, p - buf64_1xx);
+ return 1;
}
/* Aggregate the current case and output it if we passed a
breakpoint. */
static int
-agr_10x_trns_proc (struct trns_header *h unused, struct ccase *c)
+presorted_agr_to_sysfile (struct ccase *c, void *agr_)
{
- int code = aggregate_single_case (c, buf_1xx);
-
- assert (code == -2 || code == -1);
- if (code == -1)
- write_case_to_sfm ();
- return -1;
-}
+ struct agr_proc *agr = agr_;
-/* Close the system file now that we're done with it. */
-static void
-agr_10x_trns_free (struct trns_header *h unused)
-{
- fh_close_handle (outfile);
-}
+ if (aggregate_single_case (agr, c, &agr->agr_case))
+ sfm_write_case (agr->writer, &agr->agr_case);
-/* Ensure that info for the last break group gets written to the
- system file. */
-static void
-agr_10x_end_func (void)
-{
- dump_aggregate_info (buf_1xx);
- write_case_to_sfm ();
-}
-
-/* When called with temp_case non-NULL (the normal case), runs the
- case through the aggregater and outputs it to the system file if
- appropriate. If temp_case is NULL, finishes up writing the last
- case if necessary. */
-static int
-agr_11x_func (void)
-{
- if (temp_case != NULL)
- {
- int code = aggregate_single_case (temp_case, buf_1xx);
-
- assert (code == -2 || code == -1);
- if (code == -1)
- write_case_to_sfm ();
- }
- else
- {
- if (case_count)
- {
- dump_aggregate_info (buf_1xx);
- write_case_to_sfm ();
- }
- fh_close_handle (outfile);
- }
return 1;
}
-\f
-/* Debugging. */
-#if DEBUGGING
-/* Print out useful debugging information. */
-static void
-debug_print (int flags)
-{
- printf ("AGGREGATE\n /OUTFILE=%s\n",
- outfile ? fh_handle_filename (outfile) : "*");
-
- if (missing == COLUMNWISE)
- puts (" /MISSING=COLUMNWISE");
-
- if (flags & 2)
- puts (" /DOCUMENT");
- if (flags & 4)
- puts (" /PRESORTED");
-
- {
- int i;
-
- printf (" /BREAK=");
- for (i = 0; i < nv_sort; i++)
- printf ("%s(%c) ", v_sort[i]->name,
- v_sort[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D');
- putc ('\n', stdout);
- }
-
- {
- struct agr_var *iter;
-
- for (iter = agr_first; iter; iter = iter->next)
- {
- struct agr_func *f = &agr_func_tab[iter->function & FUNC];
-
- printf (" /%s", iter->dest->name);
- if (iter->dest->label)
- printf ("'%s'", iter->dest->label);
- printf ("=%s(%s", f->name, iter->src->name);
- if (f->n_args)
- {
- int i;
-
- for (i = 0; i < f->n_args; i++)
- {
- putc (',', stdout);
- if (iter->src->type == NUMERIC)
- printf ("%g", iter->arg[i].f);
- else
- printf ("%.*s", iter->src->width, iter->arg[i].c);
- }
- }
- printf (")\n");
- }
- }
-}
-
-#endif /* DEBUGGING */