X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Faggregate.c;h=ffd29166142cb2bd44512ebb80cb17d2b935f7f6;hb=8bc8a011fa9df5b9f5aa00144c8d3478fd7b93fa;hp=cb7cff064d2371e87ef9ea08ec75152d1eaf98ec;hpb=14e7292894533c5491a774a2d749386362660812;p=pspp-builds.git diff --git a/src/aggregate.c b/src/aggregate.c index cb7cff06..ffd29166 100644 --- a/src/aggregate.c +++ b/src/aggregate.c @@ -18,19 +18,22 @@ 02111-1307, USA. */ #include -#include +#include "error.h" #include #include "alloc.h" +#include "case.h" +#include "casefile.h" #include "command.h" +#include "dictionary.h" #include "error.h" #include "file-handle.h" #include "lexer.h" #include "misc.h" +#include "moments.h" #include "pool.h" #include "settings.h" -#include "sfm.h" +#include "sfm-write.h" #include "sort.h" -#include "stats.h" #include "str.h" #include "var.h" #include "vfm.h" @@ -53,6 +56,7 @@ struct agr_var int int1, int2; char *string; int missing; + struct moments1 *moments; }; /* Aggregation functions. */ @@ -113,17 +117,20 @@ enum missing_treatment struct agr_proc { /* We have either an output file or a sink. */ - struct file_handle *out_file; /* Output file, or null if none. */ + struct sfm_writer *writer; /* Output file, or null if none. */ struct case_sink *sink; /* Sink, or null if none. */ + /* Break variables. */ + struct sort_criteria *sort; /* Sort criteria. */ + struct variable **break_vars; /* Break variables. */ + size_t break_var_cnt; /* Number of break variables. */ + union value *prev_break; /* Last values of break variables. */ + enum missing_treatment missing; /* How to treat missing values. */ - struct sort_cases_pgm *sort; /* Sort program. */ - struct agr_var *vars; /* First aggregate variable. */ + struct agr_var *agr_vars; /* First aggregate variable. */ struct dictionary *dict; /* Aggregate dictionary. */ int case_cnt; /* Counts aggregated cases. */ - union value *prev_break; /* Last values of break variables. */ - struct ccase *agr_case; /* Aggregate case for output. */ - flt64 *sfm_agr_case; /* Aggregate case in SFM format. */ + struct ccase agr_case; /* Aggregate case for output. */ }; static void initialize_aggregate_info (struct agr_proc *); @@ -135,15 +142,12 @@ static int aggregate_single_case (struct agr_proc *agr, const struct ccase *input, struct ccase *output); static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output); -static int create_sysfile (struct agr_proc *); /* Aggregating to the active file. */ static int agr_to_active_file (struct ccase *, void *aux); /* Aggregating to a system file. */ -static void write_case_to_sfm (struct agr_proc *agr); static int presorted_agr_to_sysfile (struct ccase *, void *aux); -static int sort_agr_to_sysfile (const struct ccase *, void *aux); /* Parsing. */ @@ -152,15 +156,17 @@ int cmd_aggregate (void) { struct agr_proc agr; + struct file_handle *out_file = NULL; /* Have we seen these subcommands? */ unsigned seen = 0; - agr.out_file = NULL; + agr.writer = NULL; agr.sink = NULL; agr.missing = ITEMWISE; agr.sort = NULL; - agr.vars = NULL; + agr.break_vars = NULL; + agr.agr_vars = NULL; agr.dict = NULL; agr.case_cnt = 0; agr.prev_break = NULL; @@ -169,8 +175,6 @@ cmd_aggregate (void) dict_set_label (agr.dict, dict_get_label (default_dict)); dict_set_documents (agr.dict, dict_get_documents (default_dict)); - lex_match_id ("AGGREGATE"); - /* Read most of the subcommands. */ for (;;) { @@ -181,18 +185,16 @@ cmd_aggregate (void) if (seen & 1) { msg (SE, _("%s subcommand given multiple times."),"OUTFILE"); - goto lossage; + goto error; } seen |= 1; lex_match ('='); - if (lex_match ('*')) - agr.out_file = NULL; - else + if (!lex_match ('*')) { - agr.out_file = fh_parse_file_handle (); - if (agr.out_file == NULL) - goto lossage; + out_file = fh_parse (); + if (out_file == NULL) + goto error; } } else if (lex_match_id ("MISSING")) @@ -201,7 +203,7 @@ cmd_aggregate (void) if (!lex_match_id ("COLUMNWISE")) { lex_error (_("while expecting COLUMNWISE")); - goto lossage; + goto error; } agr.missing = COLUMNWISE; } @@ -211,30 +213,27 @@ cmd_aggregate (void) seen |= 4; else if (lex_match_id ("BREAK")) { + int i; + if (seen & 8) { msg (SE, _("%s subcommand given multiple times."),"BREAK"); - goto lossage; + goto error; } seen |= 8; lex_match ('='); - agr.sort = parse_sort (); + agr.sort = sort_parse_criteria (default_dict, + &agr.break_vars, &agr.break_var_cnt); if (agr.sort == NULL) - goto lossage; + goto error; - { - int i; - - for (i = 0; i < agr.sort->var_cnt; i++) - { - struct variable *v; - - v = dict_clone_var (agr.dict, agr.sort->vars[i], - agr.sort->vars[i]->name); - assert (v != NULL); - } - } + for (i = 0; i < agr.break_var_cnt; i++) + { + struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i], + agr.break_vars[i]->name); + assert (v != NULL); + } } else break; } @@ -245,7 +244,7 @@ cmd_aggregate (void) /* Read in the aggregate functions. */ if (!parse_aggregate_functions (&agr)) - goto lossage; + goto error; /* Delete documents. */ if (!(seen & 2)) @@ -256,18 +255,18 @@ cmd_aggregate (void) /* Initialize. */ agr.case_cnt = 0; - agr.agr_case = xmalloc (dict_get_case_size (agr.dict)); + case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict)); initialize_aggregate_info (&agr); /* Output to active file or external file? */ - if (agr.out_file == NULL) + if (out_file == NULL) { /* The active file will be replaced by the aggregated data, so TEMPORARY is moot. */ cancel_temporary (); if (agr.sort != NULL && (seen & 4) == 0) - sort_cases (agr.sort, 0); + sort_active_file_in_place (agr.sort); agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL); if (agr.sink->class->open != NULL) @@ -276,8 +275,8 @@ cmd_aggregate (void) procedure (agr_to_active_file, &agr); if (agr.case_cnt > 0) { - dump_aggregate_info (&agr, agr.agr_case); - agr.sink->class->write (agr.sink, agr.agr_case); + dump_aggregate_info (&agr, &agr.agr_case); + agr.sink->class->write (agr.sink, &agr.agr_case); } dict_destroy (default_dict); default_dict = agr.dict; @@ -287,14 +286,29 @@ cmd_aggregate (void) } else { - if (!create_sysfile (&agr)) - goto lossage; - + agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ()); + if (agr.writer == NULL) + goto error; + if (agr.sort != NULL && (seen & 4) == 0) { /* Sorting is needed. */ - sort_cases (agr.sort, 1); - read_sort_output (agr.sort, sort_agr_to_sysfile, NULL); + struct casefile *dst; + struct casereader *reader; + struct ccase c; + + dst = sort_active_file_to_casefile (agr.sort); + if (dst == NULL) + goto error; + reader = casefile_get_destructive_reader (dst); + while (casereader_read_xfer (reader, &c)) + { + if (aggregate_single_case (&agr, &c, &agr.agr_case)) + sfm_write_case (agr.writer, &agr.agr_case); + case_destroy (&c); + } + casereader_destroy (reader); + casefile_destroy (dst); } else { @@ -304,37 +318,19 @@ cmd_aggregate (void) if (agr.case_cnt > 0) { - dump_aggregate_info (&agr, agr.agr_case); - write_case_to_sfm (&agr); + dump_aggregate_info (&agr, &agr.agr_case); + sfm_write_case (agr.writer, &agr.agr_case); } - fh_close_handle (agr.out_file); } agr_destroy (&agr); return CMD_SUCCESS; -lossage: +error: agr_destroy (&agr); return CMD_FAILURE; } -/* Create a system file for use in aggregation to an external - file. */ -static int -create_sysfile (struct agr_proc *agr) -{ - struct sfm_write_info w; - w.h = agr->out_file; - w.dict = agr->dict; - w.compress = set_scompression; - if (!sfm_write_dictionary (&w)) - return 0; - - agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size); - - return 1; -} - /* Parse all the aggregate functions. */ static int parse_aggregate_functions (struct agr_proc *agr) @@ -374,8 +370,9 @@ parse_aggregate_functions (struct agr_proc *agr) { int n_dest_prev = n_dest; - if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH)) - goto lossage; + if (!parse_DATA_LIST_vars (&dest, &n_dest, + PV_APPEND | PV_SINGLE | PV_NO_SCRATCH)) + goto error; /* Assign empty labels. */ { @@ -388,8 +385,8 @@ parse_aggregate_functions (struct agr_proc *agr) if (token == T_STRING) { - ds_truncate (&tokstr, 120); - dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr)); + ds_truncate (&tokstr, 255); + dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr)); lex_get (); } } @@ -398,7 +395,7 @@ parse_aggregate_functions (struct agr_proc *agr) if (token != T_ID) { lex_error (_("expecting aggregation function")); - goto lossage; + goto error; } include_missing = 0; @@ -414,7 +411,7 @@ parse_aggregate_functions (struct agr_proc *agr) if (NULL == function->name) { msg (SE, _("Unknown aggregation function %s."), tokid); - goto lossage; + goto error; } func_index = function - agr_func_tab; lex_get (); @@ -429,7 +426,7 @@ parse_aggregate_functions (struct agr_proc *agr) else { lex_error (_("expecting `('")); - goto lossage; + goto error; } } else { /* Parse list of source variables. */ @@ -442,7 +439,7 @@ parse_aggregate_functions (struct agr_proc *agr) pv_opts |= PV_SAME_TYPE; if (!parse_variables (default_dict, &src, &n_src, pv_opts)) - goto lossage; + goto error; } /* Parse function arguments, for those functions that @@ -455,7 +452,7 @@ parse_aggregate_functions (struct agr_proc *agr) lex_match (','); if (token == T_STRING) { - arg[i].c = xstrdup (ds_value (&tokstr)); + arg[i].c = xstrdup (ds_c_str (&tokstr)); type = ALPHA; } else if (token == T_NUM) @@ -464,7 +461,7 @@ parse_aggregate_functions (struct agr_proc *agr) type = NUMERIC; } else { msg (SE, _("Missing argument %d to %s."), i + 1, function->name); - goto lossage; + goto error; } lex_get (); @@ -474,7 +471,7 @@ parse_aggregate_functions (struct agr_proc *agr) msg (SE, _("Arguments to %s must be of same type as " "source variables."), function->name); - goto lossage; + goto error; } } @@ -482,7 +479,7 @@ parse_aggregate_functions (struct agr_proc *agr) if (!lex_match(')')) { lex_error (_("expecting `)'")); - goto lossage; + goto error; } /* Now check that the number of source variables match the @@ -496,7 +493,7 @@ parse_aggregate_functions (struct agr_proc *agr) msg (SE, _("Number of source variables (%d) does not match " "number of target variables (%d)."), n_src, n_dest); - goto lossage; + goto error; } } @@ -507,12 +504,13 @@ parse_aggregate_functions (struct agr_proc *agr) struct agr_var *v = xmalloc (sizeof *v); /* Add variable to chain. */ - if (agr->vars != NULL) + if (agr->agr_vars != NULL) tail->next = v; else - agr->vars = v; + agr->agr_vars = v; tail = v; tail->next = NULL; + v->moments = NULL; /* Create the target variable in the aggregate dictionary. */ @@ -567,7 +565,7 @@ parse_aggregate_functions (struct agr_proc *agr) "variables."), dest[i]); free (dest[i]); - goto lossage; + goto error; } free (dest[i]); @@ -619,7 +617,7 @@ parse_aggregate_functions (struct agr_proc *agr) } continue; - lossage: + error: for (i = 0; i < n_dest; i++) { free (dest[i]); @@ -647,11 +645,12 @@ agr_destroy (struct agr_proc *agr) { struct agr_var *iter, *next; - if (agr->dict != NULL) - dict_destroy (agr->dict); + sfm_close_writer (agr->writer); if (agr->sort != NULL) - destroy_sort_cases_pgm (agr->sort); - for (iter = agr->vars; iter; iter = next) + sort_destroy_criteria (agr->sort); + free (agr->break_vars); + free (agr->prev_break); + for (iter = agr->agr_vars; iter; iter = next) { next = iter->next; @@ -665,10 +664,13 @@ agr_destroy (struct agr_proc *agr) free (iter->arg[i].c); free (iter->string); } + else if (iter->function == SD) + moments1_destroy (iter->moments); free (iter); } - free (agr->prev_break); - free (agr->agr_case); + if (agr->dict != NULL) + dict_destroy (agr->dict); + case_destroy (&agr->agr_case); } /* Execution. */ @@ -693,8 +695,8 @@ aggregate_single_case (struct agr_proc *agr, { int i; - for (i = 0; i < agr->sort->var_cnt; i++) - n_elem += agr->sort->vars[i]->nv; + for (i = 0; i < agr->break_var_cnt; i++) + n_elem += agr->break_vars[i]->nv; } agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem); @@ -704,15 +706,15 @@ aggregate_single_case (struct agr_proc *agr, union value *iter = agr->prev_break; int i; - for (i = 0; i < agr->sort->var_cnt; i++) + for (i = 0; i < agr->break_var_cnt; i++) { - struct variable *v = agr->sort->vars[i]; + struct variable *v = agr->break_vars[i]; if (v->type == NUMERIC) - (iter++)->f = input->data[v->fv].f; + (iter++)->f = case_num (input, v->fv); else { - memcpy (iter->s, input->data[v->fv].s, v->width); + memcpy (iter->s, case_str (input, v->fv), v->width); iter += v->nv; } } @@ -729,19 +731,19 @@ aggregate_single_case (struct agr_proc *agr, union value *iter = agr->prev_break; int i; - for (i = 0; i < agr->sort->var_cnt; i++) + for (i = 0; i < agr->break_var_cnt; i++) { - struct variable *v = agr->sort->vars[i]; + struct variable *v = agr->break_vars[i]; switch (v->type) { case NUMERIC: - if (input->data[v->fv].f != iter->f) + if (case_num (input, v->fv) != iter->f) goto not_equal; iter++; break; case ALPHA: - if (memcmp (input->data[v->fv].s, iter->s, v->width)) + if (memcmp (case_str (input, v->fv), iter->s, v->width)) goto not_equal; iter += v->nv; break; @@ -768,15 +770,15 @@ not_equal: union value *iter = agr->prev_break; int i; - for (i = 0; i < agr->sort->var_cnt; i++) + for (i = 0; i < agr->break_var_cnt; i++) { - struct variable *v = agr->sort->vars[i]; + struct variable *v = agr->break_vars[i]; if (v->type == NUMERIC) - (iter++)->f = input->data[v->fv].f; + (iter++)->f = case_num (input, v->fv); else { - memcpy (iter->s, input->data[v->fv].s, v->width); + memcpy (iter->s, case_str (input, v->fv), v->width); iter += v->nv; } } @@ -792,13 +794,14 @@ accumulate_aggregate_info (struct agr_proc *agr, { struct agr_var *iter; double weight; + int bad_warn = 1; - weight = dict_get_case_weight (default_dict, input); + weight = dict_get_case_weight (default_dict, input, &bad_warn); - for (iter = agr->vars; iter; iter = iter->next) + for (iter = agr->agr_vars; iter; iter = iter->next) if (iter->src) { - const union value *v = &input->data[iter->src->fv]; + const union value *v = case_data (input, iter->src->fv); if ((!iter->include_missing && is_missing (v, iter->src)) || (iter->include_missing && iter->src->type == NUMERIC @@ -827,14 +830,9 @@ accumulate_aggregate_info (struct agr_proc *agr, iter->dbl[0] += v->f * weight; iter->dbl[1] += weight; break; - case SD: - { - double product = v->f * weight; - iter->dbl[0] += product; - iter->dbl[1] += product * v->f; - iter->dbl[2] += weight; - break; - } + case SD: + moments1_add (iter->moments, v->f, weight); + break; case MAX: iter->dbl[0] = max (iter->dbl[0], v->f); iter->int1 = 1; @@ -956,23 +954,25 @@ static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output) { { - int n_elem = 0; - - { - int i; + int value_idx = 0; + int i; - for (i = 0; i < agr->sort->var_cnt; i++) - n_elem += agr->sort->vars[i]->nv; - } - memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem); + for (i = 0; i < agr->break_var_cnt; i++) + { + int nv = agr->break_vars[i]->nv; + memcpy (case_data_rw (output, value_idx), + &agr->prev_break[value_idx], + sizeof (union value) * nv); + value_idx += nv; + } } { struct agr_var *i; - for (i = agr->vars; i; i = i->next) + for (i = agr->agr_vars; i; i = i->next) { - union value *v = &output->data[i->dest->fv]; + union value *v = case_data_rw (output, i->dest->fv); if (agr->missing == COLUMNWISE && i->missing != 0 && (i->function & FUNC) != N && (i->function & FUNC) != NU @@ -994,9 +994,17 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output) v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS; break; case SD: - v->f = ((i->dbl[2] > 1.0) - ? calc_stddev (calc_variance (i->dbl, i->dbl[2])) - : SYSMIS); + { + double variance; + + /* FIXME: we should use two passes. */ + moments1_calculate (i->moments, NULL, NULL, &variance, + NULL, NULL); + if (variance != SYSMIS) + v->f = sqrt (variance); + else + v->f = SYSMIS; + } break; case MAX: case MIN: @@ -1073,7 +1081,7 @@ initialize_aggregate_info (struct agr_proc *agr) { struct agr_var *iter; - for (iter = agr->vars; iter; iter = iter->next) + for (iter = agr->agr_vars; iter; iter = iter->next) { iter->missing = 0; switch (iter->function) @@ -1090,6 +1098,12 @@ initialize_aggregate_info (struct agr_proc *agr) case MAX | FSTRING: memset (iter->string, 0, iter->src->width); break; + case SD: + if (iter->moments == NULL) + iter->moments = moments1_create (MOMENT_VARIANCE); + else + moments1_clear (iter->moments); + break; default: iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0; iter->int1 = iter->int2 = 0; @@ -1105,62 +1119,21 @@ agr_to_active_file (struct ccase *c, void *agr_) { struct agr_proc *agr = agr_; - if (aggregate_single_case (agr, c, agr->agr_case)) - agr->sink->class->write (agr->sink, agr->agr_case); + if (aggregate_single_case (agr, c, &agr->agr_case)) + agr->sink->class->write (agr->sink, &agr->agr_case); return 1; } -/* Writes AGR->agr_case to AGR->out_file. */ -static void -write_case_to_sfm (struct agr_proc *agr) -{ - flt64 *p; - int i; - - p = agr->sfm_agr_case; - for (i = 0; i < dict_get_var_cnt (agr->dict); i++) - { - struct variable *v = dict_get_var (agr->dict, i); - - if (v->type == NUMERIC) - { - double src = agr->agr_case->data[v->fv].f; - if (src == SYSMIS) - *p++ = -FLT64_MAX; - else - *p++ = src; - } - else - { - memcpy (p, agr->agr_case->data[v->fv].s, v->width); - memset (&((char *) p)[v->width], ' ', - REM_RND_UP (v->width, sizeof (flt64))); - p += DIV_RND_UP (v->width, sizeof (flt64)); - } - } - - sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case); -} - /* Aggregate the current case and output it if we passed a breakpoint. */ static int presorted_agr_to_sysfile (struct ccase *c, void *agr_) -{ - sort_agr_to_sysfile (c, agr_); - return 1; -} - -/* Aggregate the current case and output it if we passed a - breakpoint. */ -static int -sort_agr_to_sysfile (const struct ccase *c, void *agr_) { struct agr_proc *agr = agr_; - if (aggregate_single_case (agr, c, agr->agr_case)) - write_case_to_sfm (agr); + if (aggregate_single_case (agr, c, &agr->agr_case)) + sfm_write_case (agr->writer, &agr->agr_case); return 1; }