#include "case.h"
#include "casefile.h"
#include "command.h"
+#include "dictionary.h"
#include "error.h"
#include "file-handle.h"
#include "lexer.h"
#include "moments.h"
#include "pool.h"
#include "settings.h"
-#include "sfm.h"
+#include "sfm-write.h"
#include "sort.h"
#include "str.h"
#include "var.h"
struct agr_proc
{
/* We have either an output file or a sink. */
- struct file_handle *out_file; /* Output file, or null if none. */
+ struct sfm_writer *writer; /* Output file, or null if none. */
struct case_sink *sink; /* Sink, or null if none. */
/* Break variables. */
struct dictionary *dict; /* Aggregate dictionary. */
int case_cnt; /* Counts aggregated cases. */
struct ccase agr_case; /* Aggregate case for output. */
- flt64 *sfm_agr_case; /* Aggregate case in SFM format. */
};
static void initialize_aggregate_info (struct agr_proc *);
const struct ccase *input,
struct ccase *output);
static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-static int create_sysfile (struct agr_proc *);
/* Aggregating to the active file. */
static int agr_to_active_file (struct ccase *, void *aux);
/* Aggregating to a system file. */
-static void write_case_to_sfm (struct agr_proc *agr);
static int presorted_agr_to_sysfile (struct ccase *, void *aux);
\f
/* Parsing. */
cmd_aggregate (void)
{
struct agr_proc agr;
+ struct file_handle *out_file = NULL;
/* Have we seen these subcommands? */
unsigned seen = 0;
- agr.out_file = NULL;
+ agr.writer = NULL;
agr.sink = NULL;
agr.missing = ITEMWISE;
agr.sort = NULL;
if (seen & 1)
{
msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
- goto lossage;
+ goto error;
}
seen |= 1;
lex_match ('=');
- if (lex_match ('*'))
- agr.out_file = NULL;
- else
+ if (!lex_match ('*'))
{
- agr.out_file = fh_parse_file_handle ();
- if (agr.out_file == NULL)
- goto lossage;
+ out_file = fh_parse ();
+ if (out_file == NULL)
+ goto error;
}
}
else if (lex_match_id ("MISSING"))
if (!lex_match_id ("COLUMNWISE"))
{
lex_error (_("while expecting COLUMNWISE"));
- goto lossage;
+ goto error;
}
agr.missing = COLUMNWISE;
}
if (seen & 8)
{
msg (SE, _("%s subcommand given multiple times."),"BREAK");
- goto lossage;
+ goto error;
}
seen |= 8;
agr.sort = sort_parse_criteria (default_dict,
&agr.break_vars, &agr.break_var_cnt);
if (agr.sort == NULL)
- goto lossage;
+ goto error;
for (i = 0; i < agr.break_var_cnt; i++)
{
/* Read in the aggregate functions. */
if (!parse_aggregate_functions (&agr))
- goto lossage;
+ goto error;
/* Delete documents. */
if (!(seen & 2))
initialize_aggregate_info (&agr);
/* Output to active file or external file? */
- if (agr.out_file == NULL)
+ if (out_file == NULL)
{
/* The active file will be replaced by the aggregated data,
so TEMPORARY is moot. */
}
else
{
- if (!create_sysfile (&agr))
- goto lossage;
-
+ agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
+ if (agr.writer == NULL)
+ goto error;
+
if (agr.sort != NULL && (seen & 4) == 0)
{
/* Sorting is needed. */
dst = sort_active_file_to_casefile (agr.sort);
if (dst == NULL)
- goto lossage;
+ goto error;
reader = casefile_get_destructive_reader (dst);
while (casereader_read_xfer (reader, &c))
{
if (aggregate_single_case (&agr, &c, &agr.agr_case))
- write_case_to_sfm (&agr);
+ sfm_write_case (agr.writer, &agr.agr_case);
case_destroy (&c);
}
casereader_destroy (reader);
if (agr.case_cnt > 0)
{
dump_aggregate_info (&agr, &agr.agr_case);
- write_case_to_sfm (&agr);
+ sfm_write_case (agr.writer, &agr.agr_case);
}
- fh_close_handle (agr.out_file);
}
agr_destroy (&agr);
return CMD_SUCCESS;
-lossage:
+error:
agr_destroy (&agr);
return CMD_FAILURE;
}
-/* Create a system file for use in aggregation to an external
- file. */
-static int
-create_sysfile (struct agr_proc *agr)
-{
- struct sfm_write_info w;
- w.h = agr->out_file;
- w.dict = agr->dict;
- w.compress = get_scompression();
- if (!sfm_write_dictionary (&w))
- return 0;
-
- agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
-
- return 1;
-}
-
/* Parse all the aggregate functions. */
static int
parse_aggregate_functions (struct agr_proc *agr)
{
int n_dest_prev = n_dest;
- if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
- goto lossage;
+ if (!parse_DATA_LIST_vars (&dest, &n_dest,
+ PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
+ goto error;
/* Assign empty labels. */
{
if (token != T_ID)
{
lex_error (_("expecting aggregation function"));
- goto lossage;
+ goto error;
}
include_missing = 0;
if (NULL == function->name)
{
msg (SE, _("Unknown aggregation function %s."), tokid);
- goto lossage;
+ goto error;
}
func_index = function - agr_func_tab;
lex_get ();
else
{
lex_error (_("expecting `('"));
- goto lossage;
+ goto error;
}
} else {
/* Parse list of source variables. */
pv_opts |= PV_SAME_TYPE;
if (!parse_variables (default_dict, &src, &n_src, pv_opts))
- goto lossage;
+ goto error;
}
/* Parse function arguments, for those functions that
type = NUMERIC;
} else {
msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
- goto lossage;
+ goto error;
}
lex_get ();
msg (SE, _("Arguments to %s must be of same type as "
"source variables."),
function->name);
- goto lossage;
+ goto error;
}
}
if (!lex_match(')'))
{
lex_error (_("expecting `)'"));
- goto lossage;
+ goto error;
}
/* Now check that the number of source variables match the
msg (SE, _("Number of source variables (%d) does not match "
"number of target variables (%d)."),
n_src, n_dest);
- goto lossage;
+ goto error;
}
}
"variables."),
dest[i]);
free (dest[i]);
- goto lossage;
+ goto error;
}
free (dest[i]);
}
continue;
- lossage:
+ error:
for (i = 0; i < n_dest; i++)
{
free (dest[i]);
{
struct agr_var *iter, *next;
- if (agr->dict != NULL)
- dict_destroy (agr->dict);
+ sfm_close_writer (agr->writer);
if (agr->sort != NULL)
sort_destroy_criteria (agr->sort);
free (agr->break_vars);
+ free (agr->prev_break);
for (iter = agr->agr_vars; iter; iter = next)
{
next = iter->next;
moments1_destroy (iter->moments);
free (iter);
}
- free (agr->prev_break);
+ if (agr->dict != NULL)
+ dict_destroy (agr->dict);
case_destroy (&agr->agr_case);
}
\f
return 1;
}
-/* Writes AGR->agr_case to AGR->out_file. */
-static void
-write_case_to_sfm (struct agr_proc *agr)
-{
- flt64 *p;
- int i;
-
- p = agr->sfm_agr_case;
- for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
- {
- struct variable *v = dict_get_var (agr->dict, i);
-
- if (v->type == NUMERIC)
- {
- double src = case_num (&agr->agr_case, v->fv);
- if (src == SYSMIS)
- *p++ = -FLT64_MAX;
- else
- *p++ = src;
- }
- else
- {
- memcpy (p, case_str (&agr->agr_case, v->fv), v->width);
- memset (&((char *) p)[v->width], ' ',
- REM_RND_UP (v->width, sizeof (flt64)));
- p += DIV_RND_UP (v->width, sizeof (flt64));
- }
- }
-
- sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
-}
-
/* Aggregate the current case and output it if we passed a
breakpoint. */
static int
struct agr_proc *agr = agr_;
if (aggregate_single_case (agr, c, &agr->agr_case))
- write_case_to_sfm (agr);
+ sfm_write_case (agr->writer, &agr->agr_case);
return 1;
}