X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Faggregate.c;h=b7b6836e7e17daace560a71703b16ef578393c68;hb=4fdeb2145d081ff1b84e3f6c99f9d1c048c0d64a;hp=880a713a3e6481e18c86a9298578b7816647fa84;hpb=f2828f801736701c0294803b5dedd4c4ab63b45e;p=pspp-builds.git diff --git a/src/aggregate.c b/src/aggregate.c index 880a713a..b7b6836e 100644 --- a/src/aggregate.c +++ b/src/aggregate.c @@ -14,30 +14,31 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - 02111-1307, USA. */ + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. */ #include -#include +#include "error.h" #include #include "alloc.h" +#include "case.h" +#include "casefile.h" #include "command.h" +#include "dictionary.h" #include "error.h" #include "file-handle.h" #include "lexer.h" #include "misc.h" +#include "moments.h" #include "pool.h" #include "settings.h" -#include "sfm.h" +#include "sfm-write.h" #include "sort.h" -#include "stats.h" #include "str.h" #include "var.h" #include "vfm.h" #include "vfmP.h" -#include "debug-print.h" - /* Specifies how to make an aggregate variable. */ struct agr_var { @@ -55,6 +56,7 @@ struct agr_var int int1, int2; char *string; int missing; + struct moments1 *moments; }; /* Aggregation functions. */ @@ -104,55 +106,49 @@ static const struct agr_func agr_func_tab[] = {"NU", 0, NUMERIC, {FMT_F, 7, 0}}, }; -/* Output file, or NULL for the active file. */ -static struct file_handle *outfile; - /* Missing value types. */ -enum +enum missing_treatment { ITEMWISE, /* Missing values item by item. */ COLUMNWISE /* Missing values column by column. */ }; -/* ITEMWISE or COLUMNWISE. */ -static int missing; - -/* Sort program. */ -static struct sort_cases_pgm *sort; - -/* Aggregate variables. */ -static struct agr_var *agr_first, *agr_next; - -/* Aggregate dictionary. */ -static struct dictionary *agr_dict; - -/* Number of cases passed through aggregation. */ -static int case_count; - -/* Last values of the break variables. */ -static union value *prev_case; - -/* Buffers for use by the 10x transformation. */ -static flt64 *buf64_1xx; -static struct ccase *buf_1xx; +/* An entire AGGREGATE procedure. */ +struct agr_proc + { + /* We have either an output file or a sink. */ + struct sfm_writer *writer; /* Output file, or null if none. */ + struct case_sink *sink; /* Sink, or null if none. */ + + /* Break variables. */ + struct sort_criteria *sort; /* Sort criteria. */ + struct variable **break_vars; /* Break variables. */ + size_t break_var_cnt; /* Number of break variables. */ + struct ccase break_case; /* Last values of break variables. */ + + enum missing_treatment missing; /* How to treat missing values. */ + struct agr_var *agr_vars; /* First aggregate variable. */ + struct dictionary *dict; /* Aggregate dictionary. */ + int case_cnt; /* Counts aggregated cases. */ + struct ccase agr_case; /* Aggregate case for output. */ + }; -static void initialize_aggregate_info (void); +static void initialize_aggregate_info (struct agr_proc *, + const struct ccase *); /* Prototypes. */ -static int parse_aggregate_functions (void); -static void free_aggregate_functions (void); -static int aggregate_single_case (struct ccase *input, struct ccase *output); -static int create_sysfile (void); - -static trns_proc_func agr_00x_trns_proc, agr_10x_trns_proc; -static trns_free_func agr_10x_trns_free; -static void agr_00x_end_func (void *aux); -static void agr_10x_end_func (void *); -static int agr_11x_func (write_case_data); - -#if DEBUGGING -static void debug_print (int flags); -#endif +static int parse_aggregate_functions (struct agr_proc *); +static void agr_destroy (struct agr_proc *); +static int aggregate_single_case (struct agr_proc *agr, + const struct ccase *input, + struct ccase *output); +static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output); + +/* Aggregating to the active file. */ +static int agr_to_active_file (struct ccase *, void *aux); + +/* Aggregating to a system file. */ +static int presorted_agr_to_sysfile (struct ccase *, void *aux); /* Parsing. */ @@ -160,283 +156,182 @@ static void debug_print (int flags); int cmd_aggregate (void) { - /* Have we seen these subcommands? */ - unsigned seen = 0; + struct agr_proc agr; + struct file_handle *out_file = NULL; - outfile = NULL; - missing = ITEMWISE; - sort = NULL; - prev_case = NULL; + bool copy_documents = false; + bool presorted = false; + bool saw_direction; + + memset(&agr, 0 , sizeof (agr)); + agr.missing = ITEMWISE; + case_nullify (&agr.break_case); - agr_dict = dict_create (); - dict_set_label (agr_dict, dict_get_label (default_dict)); - dict_set_documents (agr_dict, dict_get_documents (default_dict)); + agr.dict = dict_create (); + dict_set_label (agr.dict, dict_get_label (default_dict)); + dict_set_documents (agr.dict, dict_get_documents (default_dict)); + + /* OUTFILE subcommand must be first. */ + if (!lex_force_match_id ("OUTFILE")) + goto error; + lex_match ('='); + if (!lex_match ('*')) + { + out_file = fh_parse (); + if (out_file == NULL) + goto error; + } - lex_match_id ("AGGREGATE"); - /* Read most of the subcommands. */ for (;;) { - lex_match('/'); + lex_match ('/'); - if (lex_match_id ("OUTFILE")) - { - if (seen & 1) - { - destroy_sort_cases_pgm (sort); - dict_destroy (agr_dict); - msg (SE, _("%s subcommand given multiple times."),"OUTFILE"); - return CMD_FAILURE; - } - seen |= 1; - - lex_match ('='); - if (lex_match ('*')) - outfile = NULL; - else - { - outfile = fh_parse_file_handle (); - if (outfile == NULL) - { - destroy_sort_cases_pgm (sort); - dict_destroy (agr_dict); - return CMD_FAILURE; - } - } - } - else if (lex_match_id ("MISSING")) + if (lex_match_id ("MISSING")) { lex_match ('='); if (!lex_match_id ("COLUMNWISE")) { - destroy_sort_cases_pgm (sort); - dict_destroy (agr_dict); lex_error (_("while expecting COLUMNWISE")); - return CMD_FAILURE; + goto error; } - missing = COLUMNWISE; + agr.missing = COLUMNWISE; } else if (lex_match_id ("DOCUMENT")) - seen |= 2; + copy_documents = true; else if (lex_match_id ("PRESORTED")) - seen |= 4; + presorted = true; else if (lex_match_id ("BREAK")) { - if (seen & 8) - { - destroy_sort_cases_pgm (sort); - dict_destroy (agr_dict); - msg (SE, _("%s subcommand given multiple times."),"BREAK"); - return CMD_FAILURE; - } - seen |= 8; + int i; lex_match ('='); - sort = parse_sort (); - if (sort == NULL) - { - dict_destroy (agr_dict); - return CMD_FAILURE; - } + agr.sort = sort_parse_criteria (default_dict, + &agr.break_vars, &agr.break_var_cnt, + &saw_direction); + if (agr.sort == NULL) + goto error; - { - int i; - - for (i = 0; i < sort->var_cnt; i++) - { - struct variable *v; - - v = dict_clone_var (agr_dict, sort->vars[i], sort->vars[i]->name); - assert (v != NULL); - } - } + for (i = 0; i < agr.break_var_cnt; i++) + { + struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i], + agr.break_vars[i]->name, + agr.break_vars[i]->longname + ); + assert (v != NULL); + } + + /* BREAK must follow the options. */ + break; } - else break; + else + { + lex_error (_("expecting BREAK")); + goto error; + } } - - /* Check for proper syntax. */ - if (!(seen & 8)) - msg (SW, _("BREAK subcommand not specified.")); + if (presorted && saw_direction) + msg (SW, _("When PRESORTED is specified, specifying sorting directions " + "with (A) or (D) has no effect. Output data will be sorted " + "the same way as the input data.")); /* Read in the aggregate functions. */ - if (!parse_aggregate_functions ()) - { - free_aggregate_functions (); - destroy_sort_cases_pgm (sort); - return CMD_FAILURE; - } + lex_match ('/'); + if (!parse_aggregate_functions (&agr)) + goto error; /* Delete documents. */ - if (!(seen & 2)) - dict_set_documents (agr_dict, NULL); + if (!copy_documents) + dict_set_documents (agr.dict, NULL); /* Cancel SPLIT FILE. */ - dict_set_split_vars (agr_dict, NULL, 0); + dict_set_split_vars (agr.dict, NULL, 0); -#if DEBUGGING - debug_print (seen); -#endif - /* Initialize. */ - case_count = 0; - initialize_aggregate_info (); - - /* How to implement all this... There are three important variables: - whether output is going to the active file (0) or a separate file - (1); whether the input data is presorted (0) or needs sorting - (1); whether there is a temporary transformation (1) or not (0). - The eight cases are as follows: - - 000 (0): Pass it through an aggregate transformation that - modifies the data. - - 001 (1): Cancel the temporary transformation and handle as 000. - - 010 (2): Set up a SORT CASES and aggregate the output, writing - the results to the active file. - - 011 (3): Cancel the temporary transformation and handle as 010. - - 100 (4): Pass it through an aggregate transformation that doesn't - modify the data but merely writes it to the output file. - - 101 (5): Handled as 100. - - 110 (6): Set up a SORT CASES and capture the output, aggregate - it, write it to the output file without modifying the active - file. - - 111 (7): Handled as 110. */ - - { - unsigned type = 0; - - if (outfile != NULL) - type |= 4; - if (sort != NULL && (seen & 4) == 0) - type |= 2; - if (temporary) - type |= 1; - - switch (type) - { - case 3: - cancel_temporary (); - /* fall through */ - case 2: - sort_cases (sort, 0); - goto case0; - - case 1: - cancel_temporary (); - /* fall through */ - case 0: - case0: - { - struct trns_header *t = xmalloc (sizeof *t); - t->proc = agr_00x_trns_proc; - t->free = NULL; - add_transformation (t); - - temporary = 2; - temp_dict = agr_dict; - temp_trns = n_trns; - - agr_dict = NULL; - - procedure (NULL, NULL, agr_00x_end_func, NULL); - break; - } + agr.case_cnt = 0; + case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict)); - case 4: - case 5: - { - if (!create_sysfile ()) - goto lossage; - - { - struct trns_header *t = xmalloc (sizeof *t); - t->proc = agr_10x_trns_proc; - t->free = agr_10x_trns_free; - add_transformation (t); - - procedure (NULL, NULL, agr_10x_end_func, NULL); - } - - break; - } - - case 6: - case 7: - sort_cases (sort, 1); - - if (!create_sysfile ()) - goto lossage; - read_sort_output (sort, agr_11x_func, NULL); - - { - struct ccase *save_temp_case = temp_case; - temp_case = NULL; - agr_11x_func (NULL); - temp_case = save_temp_case; - } - - break; - - default: - assert (0); - } - } - - free (buf64_1xx); - free (buf_1xx); - - /* Clean up. */ - destroy_sort_cases_pgm (sort); - free_aggregate_functions (); - free (prev_case); + /* Output to active file or external file? */ + if (out_file == NULL) + { + /* The active file will be replaced by the aggregated data, + so TEMPORARY is moot. */ + cancel_temporary (); + + if (agr.sort != NULL && !presorted) + sort_active_file_in_place (agr.sort); + + agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL); + if (agr.sink->class->open != NULL) + agr.sink->class->open (agr.sink); + vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL); + procedure (agr_to_active_file, &agr); + if (agr.case_cnt > 0) + { + dump_aggregate_info (&agr, &agr.agr_case); + agr.sink->class->write (agr.sink, &agr.agr_case); + } + dict_destroy (default_dict); + default_dict = agr.dict; + agr.dict = NULL; + vfm_source = agr.sink->class->make_source (agr.sink); + free_case_sink (agr.sink); + } + else + { + agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0); + if (agr.writer == NULL) + goto error; + + if (agr.sort != NULL && !presorted) + { + /* Sorting is needed. */ + struct casefile *dst; + struct casereader *reader; + struct ccase c; + + dst = sort_active_file_to_casefile (agr.sort); + if (dst == NULL) + goto error; + reader = casefile_get_destructive_reader (dst); + while (casereader_read_xfer (reader, &c)) + { + if (aggregate_single_case (&agr, &c, &agr.agr_case)) + sfm_write_case (agr.writer, &agr.agr_case); + case_destroy (&c); + } + casereader_destroy (reader); + casefile_destroy (dst); + } + else + { + /* Active file is already sorted. */ + procedure (presorted_agr_to_sysfile, &agr); + } + + if (agr.case_cnt > 0) + { + dump_aggregate_info (&agr, &agr.agr_case); + sfm_write_case (agr.writer, &agr.agr_case); + } + } + agr_destroy (&agr); return CMD_SUCCESS; -lossage: - /* Clean up. */ - destroy_sort_cases_pgm (sort); - free_aggregate_functions (); - free (prev_case); - +error: + agr_destroy (&agr); return CMD_FAILURE; } -/* Create a system file for use in aggregation to an external file, - and allocate temporary buffers for writing out cases. */ -static int -create_sysfile (void) -{ - struct sfm_write_info w; - w.h = outfile; - w.dict = agr_dict; - w.compress = set_scompression; - if (!sfm_write_dictionary (&w)) - { - free_aggregate_functions (); - destroy_sort_cases_pgm (sort); - dict_destroy (agr_dict); - return 0; - } - - buf64_1xx = xmalloc (sizeof *buf64_1xx * w.case_size); - buf_1xx = xmalloc (dict_get_case_size (agr_dict)); - - return 1; -} - /* Parse all the aggregate functions. */ static int -parse_aggregate_functions (void) +parse_aggregate_functions (struct agr_proc *agr) { - agr_first = agr_next = NULL; + struct agr_var *tail; /* Tail of linked list starting at agr->vars. */ /* Parse everything. */ + tail = NULL; for (;;) { char **dest; @@ -468,8 +363,9 @@ parse_aggregate_functions (void) { int n_dest_prev = n_dest; - if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH)) - goto lossage; + if (!parse_DATA_LIST_vars (&dest, &n_dest, + PV_APPEND | PV_SINGLE | PV_NO_SCRATCH)) + goto error; /* Assign empty labels. */ { @@ -482,8 +378,8 @@ parse_aggregate_functions (void) if (token == T_STRING) { - ds_truncate (&tokstr, 120); - dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr)); + ds_truncate (&tokstr, 255); + dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr)); lex_get (); } } @@ -492,7 +388,7 @@ parse_aggregate_functions (void) if (token != T_ID) { lex_error (_("expecting aggregation function")); - goto lossage; + goto error; } include_missing = 0; @@ -503,12 +399,12 @@ parse_aggregate_functions (void) } for (function = agr_func_tab; function->name; function++) - if (!strcmp (function->name, tokid)) + if (!strcasecmp (function->name, tokid)) break; if (NULL == function->name) { msg (SE, _("Unknown aggregation function %s."), tokid); - goto lossage; + goto error; } func_index = function - agr_func_tab; lex_get (); @@ -523,9 +419,11 @@ parse_aggregate_functions (void) else { lex_error (_("expecting `('")); - goto lossage; + goto error; } - } else { + } + else + { /* Parse list of source variables. */ { int pv_opts = PV_NO_SCRATCH; @@ -536,7 +434,7 @@ parse_aggregate_functions (void) pv_opts |= PV_SAME_TYPE; if (!parse_variables (default_dict, &src, &n_src, pv_opts)) - goto lossage; + goto error; } /* Parse function arguments, for those functions that @@ -549,16 +447,16 @@ parse_aggregate_functions (void) lex_match (','); if (token == T_STRING) { - arg[i].c = xstrdup (ds_value (&tokstr)); + arg[i].c = xstrdup (ds_c_str (&tokstr)); type = ALPHA; } - else if (token == T_NUM) + else if (lex_is_number ()) { arg[i].f = tokval; type = NUMERIC; } else { msg (SE, _("Missing argument %d to %s."), i + 1, function->name); - goto lossage; + goto error; } lex_get (); @@ -568,7 +466,7 @@ parse_aggregate_functions (void) msg (SE, _("Arguments to %s must be of same type as " "source variables."), function->name); - goto lossage; + goto error; } } @@ -576,22 +474,39 @@ parse_aggregate_functions (void) if (!lex_match(')')) { lex_error (_("expecting `)'")); - goto lossage; + goto error; } - /* Now check that the number of source variables match the - number of target variables. Do this here because if we - do it earlier then the user can get very misleading error - messages; i.e., `AGGREGATE x=SUM(y t).' will get this - error message when a proper message would be more like - `unknown variable t'. */ + /* Now check that the number of source variables match + the number of target variables. If we check earlier + than this, the user can get very misleading error + message, i.e. `AGGREGATE x=SUM(y t).' will get this + error message when a proper message would be more + like `unknown variable t'. */ if (n_src != n_dest) { msg (SE, _("Number of source variables (%d) does not match " "number of target variables (%d)."), n_src, n_dest); - goto lossage; + goto error; } + + if ((func_index == PIN || func_index == POUT + || func_index == FIN || func_index == FOUT) + && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f) + || (src[0]->type == ALPHA + && st_compare_pad (arg[0].c, strlen (arg[0].c), + arg[1].c, strlen (arg[1].c)) > 0))) + { + union value t = arg[0]; + arg[0] = arg[1]; + arg[1] = t; + + msg (SW, _("The value arguments passed to the %s function " + "are out-of-order. They will be treated as if " + "they had been specified in the correct order."), + function->name); + } } /* Finally add these to the linked list of aggregation @@ -601,55 +516,55 @@ parse_aggregate_functions (void) struct agr_var *v = xmalloc (sizeof *v); /* Add variable to chain. */ - if (agr_first) - agr_next = agr_next->next = v; + if (agr->agr_vars != NULL) + tail->next = v; else - agr_first = agr_next = v; - agr_next->next = NULL; + agr->agr_vars = v; + tail = v; + tail->next = NULL; + v->moments = NULL; /* Create the target variable in the aggregate dictionary. */ { + static const struct fmt_spec f8_2 = {FMT_F, 8, 2}; struct variable *destvar; - agr_next->function = func_index; + v->function = func_index; if (src) { - int output_width; - - agr_next->src = src[i]; + v->src = src[i]; if (src[i]->type == ALPHA) { - agr_next->function |= FSTRING; - agr_next->string = xmalloc (src[i]->width); + v->function |= FSTRING; + v->string = xmalloc (src[i]->width); } - - if (agr_next->src->type == NUMERIC || function->alpha_type == NUMERIC) - output_width = 0; - else - output_width = agr_next->src->width; if (function->alpha_type == ALPHA) - destvar = dict_clone_var (agr_dict, agr_next->src, dest[i]); - else - { - destvar = dict_create_var (agr_dict, dest[i], output_width); - if (output_width == 0) - destvar->print = destvar->write = function->format; - if (output_width == 0 && dict_get_weight (default_dict) != NULL - && (func_index == N || func_index == N_NO_VARS - || func_index == NU || func_index == NU_NO_VARS)) - { - struct fmt_spec f = {FMT_F, 8, 2}; - - destvar->print = destvar->write = f; - } - } + destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] ); + else if (v->src->type == NUMERIC + || function->alpha_type == NUMERIC) + { + destvar = dict_create_var (agr->dict, dest[i], 0); + if (destvar != NULL) + { + if ((func_index == N || func_index == NMISS) + && dict_get_weight (default_dict) != NULL) + destvar->print = destvar->write = f8_2; + else + destvar->print = destvar->write = function->format; + } + } } else { - agr_next->src = NULL; - destvar = dict_create_var (agr_dict, dest[i], 0); + v->src = NULL; + destvar = dict_create_var (agr->dict, dest[i], 0); + if (func_index == N_NO_VARS + && dict_get_weight (default_dict) != NULL) + destvar->print = destvar->write = f8_2; + else + destvar->print = destvar->write = function->format; } if (!destvar) @@ -659,8 +574,7 @@ parse_aggregate_functions (void) "the aggregate variables and the break " "variables."), dest[i]); - free (dest[i]); - goto lossage; + goto error; } free (dest[i]); @@ -670,24 +584,22 @@ parse_aggregate_functions (void) destvar->label = dest_label[i]; dest_label[i] = NULL; } - else if (function->alpha_type == ALPHA) - destvar->print = destvar->write = function->format; - agr_next->dest = destvar; + v->dest = destvar; } - agr_next->include_missing = include_missing; + v->include_missing = include_missing; - if (agr_next->src != NULL) + if (v->src != NULL) { int j; - if (agr_next->src->type == NUMERIC) + if (v->src->type == NUMERIC) for (j = 0; j < function->n_args; j++) - agr_next->arg[j].f = arg[j].f; + v->arg[j].f = arg[j].f; else for (j = 0; j < function->n_args; j++) - agr_next->arg[j].c = xstrdup (arg[j].c); + v->arg[j].c = xstrdup (arg[j].c); } } @@ -712,7 +624,7 @@ parse_aggregate_functions (void) } continue; - lossage: + error: for (i = 0; i < n_dest; i++) { free (dest[i]); @@ -725,7 +637,7 @@ parse_aggregate_functions (void) if (src && n_src && src[0]->type == ALPHA) for (i = 0; i < function->n_args; i++) { - free(arg[i].c); + free (arg[i].c); arg[i].c = NULL; } free (src); @@ -734,15 +646,18 @@ parse_aggregate_functions (void) } } -/* Frees all the state for the AGGREGATE procedure. */ +/* Destroys AGR. */ static void -free_aggregate_functions (void) +agr_destroy (struct agr_proc *agr) { struct agr_var *iter, *next; - if (agr_dict) - dict_destroy (agr_dict); - for (iter = agr_first; iter; iter = next) + sfm_close_writer (agr->writer); + if (agr->sort != NULL) + sort_destroy_criteria (agr->sort); + free (agr->break_vars); + case_destroy (&agr->break_case); + for (iter = agr->agr_vars; iter; iter = next) { next = iter->next; @@ -756,138 +671,61 @@ free_aggregate_functions (void) free (iter->arg[i].c); free (iter->string); } + else if (iter->function == SD) + moments1_destroy (iter->moments); free (iter); } + if (agr->dict != NULL) + dict_destroy (agr->dict); + + case_destroy (&agr->agr_case); } /* Execution. */ -static void accumulate_aggregate_info (struct ccase *input); -static void dump_aggregate_info (struct ccase *output); +static void accumulate_aggregate_info (struct agr_proc *, + const struct ccase *); +static void dump_aggregate_info (struct agr_proc *, struct ccase *); /* Processes a single case INPUT for aggregation. If output is - warranted, it is written to case OUTPUT, which may be (but need not - be) an alias to INPUT. Returns -1 when output is performed, -2 - otherwise. */ -/* The code in this function has an eerie similarity to - vfm.c:SPLIT_FILE_procfunc()... */ + warranted, writes it to OUTPUT and returns nonzero. + Otherwise, returns zero and OUTPUT is unmodified. */ static int -aggregate_single_case (struct ccase *input, struct ccase *output) +aggregate_single_case (struct agr_proc *agr, + const struct ccase *input, struct ccase *output) { - /* The first case always begins a new break group. We also need to - preserve the values of the case for later comparison. */ - if (case_count++ == 0) + bool finished_group = false; + + if (agr->case_cnt++ == 0) + initialize_aggregate_info (agr, input); + else if (case_compare (&agr->break_case, input, + agr->break_vars, agr->break_var_cnt)) { - int n_elem = 0; - - { - int i; + dump_aggregate_info (agr, output); + finished_group = true; - for (i = 0; i < sort->var_cnt; i++) - n_elem += sort->vars[i]->nv; - } - - prev_case = xmalloc (sizeof *prev_case * n_elem); - - /* Copy INPUT into prev_case. */ - { - union value *iter = prev_case; - int i; - - for (i = 0; i < sort->var_cnt; i++) - { - struct variable *v = sort->vars[i]; - - if (v->type == NUMERIC) - (iter++)->f = input->data[v->fv].f; - else - { - memcpy (iter->s, input->data[v->fv].s, v->width); - iter += v->nv; - } - } - } - - accumulate_aggregate_info (input); - - return -2; + initialize_aggregate_info (agr, input); } - - /* Compare the value of each break variable to the values on the - previous case. */ - { - union value *iter = prev_case; - int i; - - for (i = 0; i < sort->var_cnt; i++) - { - struct variable *v = sort->vars[i]; - - switch (v->type) - { - case NUMERIC: - if (input->data[v->fv].f != iter->f) - goto not_equal; - iter++; - break; - case ALPHA: - if (memcmp (input->data[v->fv].s, iter->s, v->width)) - goto not_equal; - iter += v->nv; - break; - default: - assert (0); - } - } - } - accumulate_aggregate_info (input); - - return -2; - -not_equal: - /* The values of the break variable are different from the values on - the previous case. That means that it's time to dump aggregate - info. */ - dump_aggregate_info (output); - initialize_aggregate_info (); - accumulate_aggregate_info (input); - - /* Copy INPUT into prev_case. */ - { - union value *iter = prev_case; - int i; - - for (i = 0; i < sort->var_cnt; i++) - { - struct variable *v = sort->vars[i]; - - if (v->type == NUMERIC) - (iter++)->f = input->data[v->fv].f; - else - { - memcpy (iter->s, input->data[v->fv].s, v->width); - iter += v->nv; - } - } - } - - return -1; + accumulate_aggregate_info (agr, input); + return finished_group; } /* Accumulates aggregation data from the case INPUT. */ static void -accumulate_aggregate_info (struct ccase *input) +accumulate_aggregate_info (struct agr_proc *agr, + const struct ccase *input) { struct agr_var *iter; double weight; + int bad_warn = 1; - weight = dict_get_case_weight (default_dict, input); + weight = dict_get_case_weight (default_dict, input, &bad_warn); - for (iter = agr_first; iter; iter = iter->next) + for (iter = agr->agr_vars; iter; iter = iter->next) if (iter->src) { - union value *v = &input->data[iter->src->fv]; + const union value *v = case_data (input, iter->src->fv); if ((!iter->include_missing && is_missing (v, iter->src)) || (iter->include_missing && iter->src->type == NUMERIC @@ -896,9 +734,11 @@ accumulate_aggregate_info (struct ccase *input) switch (iter->function) { case NMISS: + case NMISS | FSTRING: iter->dbl[0] += weight; break; case NUMISS: + case NUMISS | FSTRING: iter->int1++; break; } @@ -910,20 +750,16 @@ accumulate_aggregate_info (struct ccase *input) switch (iter->function) { case SUM: - iter->dbl[0] += v->f; + iter->dbl[0] += v->f * weight; + iter->int1 = 1; break; case MEAN: iter->dbl[0] += v->f * weight; iter->dbl[1] += weight; break; - case SD: - { - double product = v->f * weight; - iter->dbl[0] += product; - iter->dbl[1] += product * v->f; - iter->dbl[2] += weight; - break; - } + case SD: + moments1_add (iter->moments, v->f, weight); + break; case MAX: iter->dbl[0] = max (iter->dbl[0], v->f); iter->int1 = 1; @@ -988,14 +824,16 @@ accumulate_aggregate_info (struct ccase *input) case FOUT | FSTRING: case POUT | FSTRING: if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0 - && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0) + || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0) iter->dbl[0] += weight; iter->dbl[1] += weight; break; case N: + case N | FSTRING: iter->dbl[0] += weight; break; case NU: + case NU | FSTRING: iter->int1++; break; case FIRST: @@ -1020,6 +858,13 @@ accumulate_aggregate_info (struct ccase *input) memcpy (iter->string, v->s, iter->src->width); iter->int1 = 1; break; + case NMISS: + case NMISS | FSTRING: + case NUMISS: + case NUMISS | FSTRING: + /* Our value is not missing or it would have been + caught earlier. Nothing to do. */ + break; default: assert (0); } @@ -1042,37 +887,34 @@ accumulate_aggregate_info (struct ccase *input) more of the break variables. Make an output record from the accumulated statistics in the OUTPUT case. */ static void -dump_aggregate_info (struct ccase *output) +dump_aggregate_info (struct agr_proc *agr, struct ccase *output) { - debug_printf (("(dumping ")); - { - int n_elem = 0; - - { - int i; + int value_idx = 0; + int i; - for (i = 0; i < sort->var_cnt; i++) - n_elem += sort->vars[i]->nv; - } - debug_printf (("n_elem=%d:", n_elem)); - memcpy (output->data, prev_case, sizeof (union value) * n_elem); + for (i = 0; i < agr->break_var_cnt; i++) + { + struct variable *v = agr->break_vars[i]; + memcpy (case_data_rw (output, value_idx), + case_data (&agr->break_case, v->fv), + sizeof (union value) * v->nv); + value_idx += v->nv; + } } { struct agr_var *i; - for (i = agr_first; i; i = i->next) + for (i = agr->agr_vars; i; i = i->next) { - union value *v = &output->data[i->dest->fv]; - - debug_printf ((" %d,%d", i->dest->fv, i->dest->nv)); + union value *v = case_data_rw (output, i->dest->fv); - if (missing == COLUMNWISE && i->missing != 0 + if (agr->missing == COLUMNWISE && i->missing != 0 && (i->function & FUNC) != N && (i->function & FUNC) != NU && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS) { - if (i->function & FSTRING) + if (i->dest->type == ALPHA) memset (v->s, ' ', i->dest->width); else v->f = SYSMIS; @@ -1082,15 +924,23 @@ dump_aggregate_info (struct ccase *output) switch (i->function) { case SUM: - v->f = i->dbl[0]; + v->f = i->int1 ? i->dbl[0] : SYSMIS; break; case MEAN: v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS; break; case SD: - v->f = ((i->dbl[2] > 1.0) - ? calc_stddev (calc_variance (i->dbl, i->dbl[2])) - : SYSMIS); + { + double variance; + + /* FIXME: we should use two passes. */ + moments1_calculate (i->moments, NULL, NULL, &variance, + NULL, NULL); + if (variance != SYSMIS) + v->f = sqrt (variance); + else + v->f = SYSMIS; + } break; case MAX: case MIN: @@ -1103,16 +953,14 @@ dump_aggregate_info (struct ccase *output) else memset (v->s, ' ', i->dest->width); break; - case FGT | FSTRING: - case FLT | FSTRING: - case FIN | FSTRING: - case FOUT | FSTRING: - v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS; - break; case FGT: + case FGT | FSTRING: case FLT: + case FLT | FSTRING: case FIN: + case FIN | FSTRING: case FOUT: + case FOUT | FSTRING: v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS; break; case PGT: @@ -1126,9 +974,11 @@ dump_aggregate_info (struct ccase *output) v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS; break; case N: + case N | FSTRING: v->f = i->dbl[0]; break; case NU: + case NU | FSTRING: v->f = i->int1; break; case FIRST: @@ -1149,9 +999,11 @@ dump_aggregate_info (struct ccase *output) v->f = i->int1; break; case NMISS: + case NMISS | FSTRING: v->f = i->dbl[0]; break; case NUMISS: + case NUMISS | FSTRING: v->f = i->int1; break; default: @@ -1159,18 +1011,22 @@ dump_aggregate_info (struct ccase *output) } } } - debug_printf ((") ")); } /* Resets the state for all the aggregate functions. */ static void -initialize_aggregate_info (void) +initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input) { struct agr_var *iter; - for (iter = agr_first; iter; iter = iter->next) + case_destroy (&agr->break_case); + case_clone (&agr->break_case, input); + + for (iter = agr->agr_vars; iter; iter = iter->next) { iter->missing = 0; + iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0; + iter->int1 = iter->int2 = 0; switch (iter->function) { case MIN: @@ -1185,10 +1041,14 @@ initialize_aggregate_info (void) case MAX | FSTRING: memset (iter->string, 0, iter->src->width); break; - default: - iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0; - iter->int1 = iter->int2 = 0; - break; + case SD: + if (iter->moments == NULL) + iter->moments = moments1_create (MOMENT_VARIANCE); + else + moments1_clear (iter->moments); + break; + default: + break; } } } @@ -1196,171 +1056,25 @@ initialize_aggregate_info (void) /* Aggregate each case as it comes through. Cases which aren't needed are dropped. */ static int -agr_00x_trns_proc (struct trns_header *h UNUSED, struct ccase *c, - int case_num UNUSED) +agr_to_active_file (struct ccase *c, void *agr_) { - int code = aggregate_single_case (c, compaction_case); - debug_printf (("%d ", code)); - return code; -} - -/* Output the last aggregate case. It's okay to call the vfm_sink's - write() method here because end_func is called so soon after all - the cases have been output; very little has been cleaned up at this - point. */ -static void -agr_00x_end_func (void *aux UNUSED) -{ - /* Ensure that info for the last break group gets written to the - active file. */ - dump_aggregate_info (compaction_case); - vfm_sink->class->write (vfm_sink, temp_case); -} - -/* Transform the aggregate case buf_1xx, in internal format, to system - file format, in buf64_1xx, and write the resultant case to the - system file. */ -static void -write_case_to_sfm (void) -{ - flt64 *p = buf64_1xx; - int i; + struct agr_proc *agr = agr_; - for (i = 0; i < dict_get_var_cnt (agr_dict); i++) - { - struct variable *v = dict_get_var (agr_dict, i); - - if (v->type == NUMERIC) - { - double src = buf_1xx->data[v->fv].f; - if (src == SYSMIS) - *p++ = -FLT64_MAX; - else - *p++ = src; - } - else - { - memcpy (p, buf_1xx->data[v->fv].s, v->width); - memset (&((char *) p)[v->width], ' ', - REM_RND_UP (v->width, sizeof (flt64))); - p += DIV_RND_UP (v->width, sizeof (flt64)); - } - } + if (aggregate_single_case (agr, c, &agr->agr_case)) + agr->sink->class->write (agr->sink, &agr->agr_case); - sfm_write_case (outfile, buf64_1xx, p - buf64_1xx); + return 1; } /* Aggregate the current case and output it if we passed a breakpoint. */ static int -agr_10x_trns_proc (struct trns_header *h UNUSED, struct ccase *c, - int case_num UNUSED) +presorted_agr_to_sysfile (struct ccase *c, void *agr_) { - int code = aggregate_single_case (c, buf_1xx); - - assert (code == -2 || code == -1); - if (code == -1) - write_case_to_sfm (); - return -1; -} + struct agr_proc *agr = agr_; -/* Close the system file now that we're done with it. */ -static void -agr_10x_trns_free (struct trns_header *h UNUSED) -{ - fh_close_handle (outfile); -} + if (aggregate_single_case (agr, c, &agr->agr_case)) + sfm_write_case (agr->writer, &agr->agr_case); -/* Ensure that info for the last break group gets written to the - system file. */ -static void -agr_10x_end_func (void *aux UNUSED) -{ - dump_aggregate_info (buf_1xx); - write_case_to_sfm (); -} - -/* When called with temp_case non-NULL (the normal case), runs the - case through the aggregater and outputs it to the system file if - appropriate. If temp_case is NULL, finishes up writing the last - case if necessary. */ -static int -agr_11x_func (write_case_data wc_data UNUSED) -{ - if (temp_case != NULL) - { - int code = aggregate_single_case (temp_case, buf_1xx); - - assert (code == -2 || code == -1); - if (code == -1) - write_case_to_sfm (); - } - else - { - if (case_count) - { - dump_aggregate_info (buf_1xx); - write_case_to_sfm (); - } - fh_close_handle (outfile); - } return 1; } - -/* Debugging. */ -#if DEBUGGING -/* Print out useful debugging information. */ -static void -debug_print (int flags) -{ - printf ("AGGREGATE\n /OUTFILE=%s\n", - outfile ? fh_handle_filename (outfile) : "*"); - - if (missing == COLUMNWISE) - puts (" /MISSING=COLUMNWISE"); - - if (flags & 2) - puts (" /DOCUMENT"); - if (flags & 4) - puts (" /PRESORTED"); - - { - int i; - - printf (" /BREAK="); - for (i = 0; i < sort->var_cnt; i++) - printf ("%s(%c) ", sort->vars[i]->name, - sort->vars[i]->p.srt.order == SRT_ASCEND ? 'A' : 'D'); - putc ('\n', stdout); - } - - { - struct agr_var *iter; - - for (iter = agr_first; iter; iter = iter->next) - { - struct agr_func *f = &agr_func_tab[iter->function & FUNC]; - - printf (" /%s", iter->dest->name); - if (iter->dest->label) - printf ("'%s'", iter->dest->label); - printf ("=%s(%s", f->name, iter->src->name); - if (f->n_args) - { - int i; - - for (i = 0; i < f->n_args; i++) - { - putc (',', stdout); - if (iter->src->type == NUMERIC) - printf ("%g", iter->arg[i].f); - else - printf ("%.*s", iter->src->width, iter->arg[i].c); - } - } - printf (")\n"); - } - } -} - -#endif /* DEBUGGING */