const struct ccase *input,
struct ccase *output);
static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
\f
/* Parsing. */
/* Output to active file or external file? */
if (out_file == NULL)
{
+ struct ccase *c;
+
/* The active file will be replaced by the aggregated data,
so TEMPORARY is moot. */
proc_cancel_temporary_transformations (ds);
if (agr.sink->class->open != NULL)
agr.sink->class->open (agr.sink);
proc_set_sink (ds,
- create_case_sink (&null_sink_class,
- dict, NULL));
- if (!procedure (ds, agr_to_active_file, &agr))
+ create_case_sink (&null_sink_class, dict, NULL));
+ proc_open (ds);
+ while (proc_read (ds, &c))
+ if (aggregate_single_case (&agr, c, &agr.agr_case))
+ if (!agr.sink->class->write (agr.sink, &agr.agr_case))
+ {
+ proc_close (ds);
+ goto error;
+ }
+ if (!proc_close (ds))
goto error;
if (agr.case_cnt > 0)
{
dict_destroy (dict);
dataset_set_dict (ds, agr.dict);
agr.dict = NULL;
- proc_set_source (ds,
- agr.sink->class->make_source (agr.sink));
+ proc_set_source (ds, agr.sink->class->make_source (agr.sink));
free_case_sink (agr.sink);
}
else
else
{
/* Active file is already sorted. */
- if (!procedure (ds, presorted_agr_to_sysfile, &agr))
+ struct ccase *c;
+
+ proc_open (ds);
+ while (proc_read (ds, &c))
+ if (aggregate_single_case (&agr, c, &agr.agr_case))
+ if (!any_writer_write (agr.writer, &agr.agr_case))
+ {
+ proc_close (ds);
+ goto error;
+ }
+ if (!proc_close (ds))
goto error;
}
}
}
}
-\f
-/* Aggregate each case as it comes through. Cases which aren't needed
- are dropped.
- Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return agr->sink->class->write (agr->sink, &agr->agr_case);
-
- return true;
-}
-
-/* Aggregate the current case and output it if we passed a
- breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_,
- const struct dataset *ds UNUSED)
-{
- struct agr_proc *agr = agr_;
-
- if (aggregate_single_case (agr, c, &agr->agr_case))
- return any_writer_write (agr->writer, &agr->agr_case);
-
- return true;
-}