You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- 02111-1307, USA. */
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA. */
#include <config.h>
#include "vfm.h"
#include "vfmP.h"
-#include <assert.h>
+#include "error.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> /* Required by SunOS4. */
#endif
#include "alloc.h"
-#include "approx.h"
-#include "do-ifP.h"
+#include "case.h"
+#include "casefile.h"
+#include "command.h"
+#include "dictionary.h"
+#include "ctl-stack.h"
#include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
#include "misc.h"
-#include "random.h"
+#include "settings.h"
#include "som.h"
#include "str.h"
#include "tab.h"
#include "var.h"
#include "value-labels.h"
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+
/*
Virtual File Manager (vfm):
- vfm is used to process data files. It uses the model that data is
- read from one stream (the data source), then written to another
- (the data sink). The data source is then deleted and the data sink
- becomes the data source for the next procedure. */
-
-#include "debug-print.h"
+ vfm is used to process data files. It uses the model that
+ data is read from one stream (the data source), processed,
+ then written to another (the data sink). The data source is
+ then deleted and the data sink becomes the data source for the
+ next procedure. */
/* Procedure execution data. */
struct write_case_data
{
- void (*beginfunc) (void *);
- int (*procfunc) (struct ccase *, void *);
- void (*endfunc) (void *);
- void *aux;
+ /* Function to call for each case. */
+ int (*proc_func) (struct ccase *, void *); /* Function. */
+ void *aux; /* Auxiliary data. */
+
+ struct ccase trns_case; /* Case used for transformations. */
+ struct ccase sink_case; /* Case written to sink, if
+ compaction is necessary. */
+ size_t cases_written; /* Cases output so far. */
+ size_t cases_analyzed; /* Cases passed to procedure so far. */
};
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
-
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
-/* Information about the data source. */
-struct stream_info vfm_source_info;
-
-/* Information about the data sink. */
-struct stream_info vfm_sink_info;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
/* Nonzero if the case needs to have values deleted before being
stored, zero otherwise. */
-int compaction_necessary;
-
-/* Number of values after compaction, or the same as
- vfm_sink_info.nval, if compaction is not necessary. */
-int compaction_nval;
-
-/* Temporary case buffer with enough room for `compaction_nval'
- `value's. */
-struct ccase *compaction_case;
-
-/* Within a session, when paging is turned on, it is never turned back
- off. This policy might be too aggressive. */
-static int paging = 0;
+static int compaction_necessary;
/* Time at which vfm was last invoked. */
-time_t last_vfm_invocation;
-
-/* Number of cases passed to proc_func(). */
-static int case_count;
+static time_t last_vfm_invocation;
/* Lag queue. */
int n_lag; /* Number of cases to lag. */
static int lag_count; /* Number of cases in lag_queue so far. */
static int lag_head; /* Index where next case will be added. */
-static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
+static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+static void internal_procedure (int (*proc_func) (struct ccase *, void *),
+ void *aux);
+static void update_last_vfm_invocation (void);
+static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (void);
-static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_procfunc (struct ccase *, void *);
-static void finish_compaction (void);
-static void lag_case (void);
-static int procedure_write_case (struct write_case_data *);
-static void clear_temp_case (void);
-static int exclude_this_case (void);
+static int write_case (struct write_case_data *wc_data);
+static int execute_transformations (struct ccase *c,
+ struct transformation *trns,
+ int first_idx, int last_idx,
+ int case_num);
+static int filter_case (const struct ccase *c, int case_num);
+static void lag_case (const struct ccase *c);
+static void clear_case (struct ccase *c);
+static void close_active_file (void);
\f
/* Public functions. */
-/* Reads all the cases from the active file, transforms them by
- the active set of transformations, calls PROCFUNC with CURCASE
- set to the case, and writes them to a new active file.
+/* Returns the last time the data was read. */
+time_t
+vfm_last_invocation (void)
+{
+ if (last_vfm_invocation == 0)
+ update_last_vfm_invocation ();
+ return last_vfm_invocation;
+}
+
+/* Reads the data from the input program and writes it to a new
+ active file. For each case we read from the input program, we
+ do the following
- Divides the active file into zero or more series of one or more
- cases each. BEGINFUNC is called before each series. ENDFUNC is
- called after each series.
+ 1. Execute permanent transformations. If these drop the case,
+ start the next case from step 1.
- Arbitrary user-specified data AUX is passed to BEGINFUNC,
- PROCFUNC, and ENDFUNC as auxiliary data. */
+ 2. N OF CASES. If we have already written N cases, start the
+ next case from step 1.
+
+ 3. Write case to replacement active file.
+
+ 4. Execute temporary transformations. If these drop the case,
+ start the next case from step 1.
+
+ 5. FILTER, PROCESS IF. If these drop the case, start the next
+ case from step 1.
+
+ 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
+ cases, start the next case from step 1.
+
+ 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
void
-procedure (void (*beginfunc) (void *),
- int (*procfunc) (struct ccase *curcase, void *),
- void (*endfunc) (void *),
- void *aux)
+procedure (int (*proc_func) (struct ccase *, void *), void *aux)
{
- struct write_case_data procedure_write_data;
- struct write_case_data split_file_data;
-
- if (dict_get_split_cnt (default_dict) == 0)
+ if (proc_func == NULL
+ && case_source_is_class (vfm_source, &storage_source_class)
+ && vfm_sink == NULL
+ && !temporary
+ && n_trns == 0)
{
- /* Normally we just use the data passed by the user. */
- procedure_write_data.beginfunc = beginfunc;
- procedure_write_data.procfunc = procfunc;
- procedure_write_data.endfunc = endfunc;
- procedure_write_data.aux = aux;
- }
- else
- {
- /* Under SPLIT FILE, we add a layer of indirection. */
- procedure_write_data.beginfunc = NULL;
- procedure_write_data.procfunc = SPLIT_FILE_procfunc;
- procedure_write_data.endfunc = endfunc;
- procedure_write_data.aux = &split_file_data;
-
- split_file_data.beginfunc = beginfunc;
- split_file_data.procfunc = procfunc;
- split_file_data.endfunc = endfunc;
- split_file_data.aux = aux;
+ /* Nothing to do. */
+ update_last_vfm_invocation ();
+ return;
}
- last_vfm_invocation = time (NULL);
-
open_active_file ();
- vfm_source->read (procedure_write_case, &procedure_write_data);
- close_active_file (&procedure_write_data);
+ internal_procedure (proc_func, aux);
+ close_active_file ();
}
-\f
-/* Active file processing support. Subtly different semantics from
- procedure(). */
-
-static int process_active_file_write_case (struct write_case_data *data);
-/* The casefunc might want us to stop calling it. */
-static int not_canceled;
-
-/* Reads all the cases from the active file and passes them one-by-one
- to CASEFUNC in temp_case. Before any cases are passed, calls
- BEGINFUNC. After all the cases have been passed, calls ENDFUNC.
- BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
- file by calling process_active_file_output_case().
-
- process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
-void
-process_active_file (void (*beginfunc) (void *),
- int (*casefunc) (struct ccase *curcase, void *),
- void (*endfunc) (void *),
- void *aux)
+/* Executes a procedure, as procedure(), except that the caller
+ is responsible for calling open_active_file() and
+ close_active_file(). */
+static void
+internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
{
- struct write_case_data process_active_write_data;
+ static int recursive_call;
- process_active_write_data.beginfunc = beginfunc;
- process_active_write_data.procfunc = casefunc;
- process_active_write_data.endfunc = endfunc;
- process_active_write_data.aux = aux;
+ struct write_case_data wc_data;
- not_canceled = 1;
+ assert (++recursive_call == 1);
- open_active_file ();
- beginfunc (aux);
-
- /* There doesn't necessarily need to be an active file. */
- if (vfm_source)
- vfm_source->read (process_active_file_write_case,
- &process_active_write_data);
-
- endfunc (aux);
- close_active_file (&process_active_write_data);
-}
+ wc_data.proc_func = proc_func;
+ wc_data.aux = aux;
+ create_trns_case (&wc_data.trns_case, default_dict);
+ case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
+ wc_data.cases_written = 0;
-/* Pass the current case to casefunc. */
-static int
-process_active_file_write_case (struct write_case_data *data)
-{
- /* Index of current transformation. */
- int cur_trns;
+ update_last_vfm_invocation ();
- for (cur_trns = f_trns ; cur_trns != temp_trns; )
- {
- int code;
-
- code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
- switch (code)
- {
- case -1:
- /* Next transformation. */
- cur_trns++;
- break;
- case -2:
- /* Delete this case. */
- goto done;
- default:
- /* Go to that transformation. */
- cur_trns = code;
- break;
- }
- }
+ if (vfm_source != NULL)
+ vfm_source->class->read (vfm_source,
+ &wc_data.trns_case,
+ write_case, &wc_data);
- if (n_lag)
- lag_case ();
-
- /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
- if (not_canceled && !exclude_this_case ())
- not_canceled = data->procfunc (temp_case, data->aux);
-
- case_count++;
-
- done:
- clear_temp_case ();
+ case_destroy (&wc_data.sink_case);
+ case_destroy (&wc_data.trns_case);
- return 1;
+ assert (--recursive_call == 0);
}
-/* Write temp_case to the active file. */
-void
-process_active_file_output_case (void)
+/* Updates last_vfm_invocation. */
+static void
+update_last_vfm_invocation (void)
{
- vfm_sink_info.ncases++;
- vfm_sink->write ();
+ last_vfm_invocation = time (NULL);
}
-\f
-/* Opening the active file. */
-
-/* It might be usefully noted that the following several functions are
- given in the order that they are called by open_active_file(). */
-/* Prepare to write to the replacement active file. */
+/* Creates and returns a case, initializing it from the vectors
+ that say which `value's need to be initialized just once, and
+ which ones need to be re-initialized before every case. */
static void
-prepare_for_writing (void)
+create_trns_case (struct ccase *trns_case, struct dictionary *dict)
{
- /* FIXME: If ALL the conditions listed below hold true, then the
- replacement active file is guaranteed to be identical to the
- original active file:
+ size_t var_cnt = dict_get_var_cnt (dict);
+ size_t i;
- 1. TEMPORARY was the first transformation, OR, there were no
- transformations at all.
+ case_create (trns_case, dict_get_next_value_idx (dict));
+ for (i = 0; i < var_cnt; i++)
+ {
+ struct variable *v = dict_get_var (dict, i);
+ union value *value = case_data_rw (trns_case, v->fv);
- 2. Input is not coming from an input program.
+ if (v->type == NUMERIC)
+ value->f = v->reinit ? 0.0 : SYSMIS;
+ else
+ memset (value->s, ' ', v->width);
+ }
+}
- 3. Compaction is not necessary.
+/* Makes all preparations for reading from the data source and writing
+ to the data sink. */
+static void
+open_active_file (void)
+{
+ /* Make temp_dict refer to the dictionary right before data
+ reaches the sink */
+ if (!temporary)
+ {
+ temp_trns = n_trns;
+ temp_dict = default_dict;
+ }
- So, in this case, we shouldn't have to replace the active
- file--it's just a waste of time and space. */
+ /* Figure out compaction. */
+ compaction_necessary = (dict_get_next_value_idx (temp_dict)
+ != dict_get_compacted_value_cnt (temp_dict));
- vfm_sink_info.ncases = 0;
- vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
- vfm_sink_info.case_size = dict_get_case_size (default_dict);
-
+ /* Prepare sink. */
if (vfm_sink == NULL)
+ vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+ if (vfm_sink->class->open != NULL)
+ vfm_sink->class->open (vfm_sink);
+
+ /* Allocate memory for lag queue. */
+ if (n_lag > 0)
{
- if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
- && !paging)
- {
- msg (MW, _("Workspace overflow predicted. Max workspace is "
- "currently set to %d KB (%d cases at %d bytes each). "
- "Paging active file to disk."),
- MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
- vfm_sink_info.case_size);
-
- paging = 1;
- }
-
- vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+ int i;
+
+ lag_count = 0;
+ lag_head = 0;
+ lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
+ for (i = 0; i < n_lag; i++)
+ case_nullify (&lag_queue[i]);
}
+
+ /* Close any unclosed DO IF or LOOP constructs. */
+ ctl_stack_clear ();
}
-/* Arrange for compacting the output cases for storage. */
-static void
-arrange_compaction (void)
+/* Transforms trns_case and writes it to the replacement active
+ file if advisable. Returns nonzero if more cases can be
+ accepted, zero otherwise. Do not call this function again
+ after it has returned zero once. */
+static int
+write_case (struct write_case_data *wc_data)
{
- int count_values = 0;
+ /* Execute permanent transformations. */
+ if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
+ wc_data->cases_written + 1))
+ goto done;
- {
- int i;
-
- /* Count up the number of `value's that will be output. */
- for (i = 0; i < dict_get_var_cnt (temp_dict); i++)
- {
- struct variable *v = dict_get_var (temp_dict, i);
-
- if (v->name[0] != '#')
- {
- assert (v->nv > 0);
- count_values += v->nv;
- }
- }
- assert (temporary == 2
- || count_values <= dict_get_next_value_idx (temp_dict));
- }
+ /* N OF CASES. */
+ if (dict_get_case_limit (default_dict)
+ && wc_data->cases_written >= dict_get_case_limit (default_dict))
+ goto done;
+ wc_data->cases_written++;
+
+ /* Write case to LAG queue. */
+ if (n_lag)
+ lag_case (&wc_data->trns_case);
+
+ /* Write case to replacement active file. */
+ if (vfm_sink->class->write != NULL)
+ {
+ if (compaction_necessary)
+ {
+ dict_compact_case (temp_dict, &wc_data->sink_case,
+ &wc_data->trns_case);
+ vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
+ }
+ else
+ vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
+ }
- /* Compaction is only necessary if the number of `value's to output
- differs from the number already present. */
- compaction_nval = count_values;
- if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
- compaction_necessary = 1;
- else
- compaction_necessary = 0;
+ /* Execute temporary transformations. */
+ if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
+ wc_data->cases_written))
+ goto done;
- if (vfm_sink->init)
- vfm_sink->init ();
-}
+ /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
+ if (filter_case (&wc_data->trns_case, wc_data->cases_written)
+ || (dict_get_case_limit (temp_dict)
+ && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
+ goto done;
+ wc_data->cases_analyzed++;
-/* Prepares the temporary case and compaction case. */
-static void
-make_temp_case (void)
-{
- temp_case = xmalloc (vfm_sink_info.case_size);
+ /* Pass case to procedure. */
+ if (wc_data->proc_func != NULL)
+ wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
- if (compaction_necessary)
- compaction_case = xmalloc (sizeof (struct ccase)
- + sizeof (union value) * (compaction_nval - 1));
+ done:
+ clear_case (&wc_data->trns_case);
+ return 1;
}
-#if DEBUGGING
-/* Returns the name of the variable that owns the index CCASE_INDEX
- into ccase. */
-static const char *
-index_to_varname (int ccase_index)
+/* Transforms case C using the transformations in TRNS[] with
+ indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
+ become case CASE_NUM (1-based) in the output file. Returns
+ zero if the case was filtered out by one of the
+ transformations, nonzero otherwise. */
+static int
+execute_transformations (struct ccase *c,
+ struct transformation *trns,
+ int first_idx, int last_idx,
+ int case_num)
{
- int i;
+ int idx;
- for (i = 0; i < default_dict.nvar; i++)
+ for (idx = first_idx; idx != last_idx; )
{
- struct variable *v = default_dict.var[i];
-
- if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
- return default_dict.var[i]->name;
+ struct transformation *t = &trns[idx];
+ int retval = t->proc (t->private, c, case_num);
+ switch (retval)
+ {
+ case -1:
+ idx++;
+ break;
+
+ case -2:
+ return 0;
+
+ default:
+ idx = retval;
+ break;
+ }
}
- return _("<NOVAR>");
+
+ return 1;
}
-#endif
-/* Initializes temp_case from the vectors that say which `value's
- need to be initialized just once, and which ones need to be
- re-initialized before every case. */
-static void
-vector_initialization (void)
+/* Returns nonzero if case C with case number CASE_NUM should be
+ exclude as specified on FILTER or PROCESS IF, otherwise
+ zero. */
+static int
+filter_case (const struct ccase *c, int case_idx)
{
- size_t var_cnt = dict_get_var_cnt (default_dict);
- size_t i;
-
- for (i = 0; i < var_cnt; i++)
+ /* FILTER. */
+ struct variable *filter_var = dict_get_filter (default_dict);
+ if (filter_var != NULL)
{
- struct variable *v = dict_get_var (default_dict, i);
-
- if (v->type == NUMERIC)
- {
- if (v->reinit)
- temp_case->data[v->fv].f = 0.0;
- else
- temp_case->data[v->fv].f = SYSMIS;
- }
- else
- memset (temp_case->data[v->fv].s, ' ', v->width);
+ double f = case_num (c, filter_var->fv);
+ if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
+ return 1;
}
+
+ /* PROCESS IF. */
+ if (process_if_expr != NULL
+ && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
+ return 1;
+
+ return 0;
}
-/* Sets all the lag-related variables based on value of n_lag. */
+/* Add C to the lag queue. */
static void
-setup_lag (void)
+lag_case (const struct ccase *c)
{
- int i;
-
- if (n_lag == 0)
- return;
-
- lag_count = 0;
- lag_head = 0;
- lag_queue = xmalloc (n_lag * sizeof *lag_queue);
- for (i = 0; i < n_lag; i++)
- lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
+ if (lag_count < n_lag)
+ lag_count++;
+ case_destroy (&lag_queue[lag_head]);
+ case_clone (&lag_queue[lag_head], c);
+ if (++lag_head >= n_lag)
+ lag_head = 0;
}
-/* There is a lot of potential confusion in the vfm and related
- routines over the number of `value's at each stage of the process.
- Here is each nval count, with explanation, as set up by
- open_active_file():
-
- vfm_source_info.nval: Number of `value's in the cases returned by
- the source stream. This value turns out not to be very useful, but
- we maintain it anyway.
-
- vfm_sink_info.nval: Number of `value's in the cases after all
- transformations have been performed. Never less than
- vfm_source_info.nval.
-
- temp_dict->nval: Number of `value's in the cases after the
- transformations leading up to TEMPORARY have been performed. If
- TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
- Never less than vfm_sink_info.nval.
-
- compaction_nval: Number of `value's in the cases after the
- transformations leading up to TEMPORARY have been performed and the
- case has been compacted by compact_case(), if compaction is
- necessary. This the number of `value's in the cases saved by the
- sink stream. (However, note that the cases passed to the sink
- stream have not yet been compacted. It is the responsibility of
- the data sink to call compact_case().) This may be less than,
- greater than, or equal to vfm_source_info.nval. `compaction'
- becomes the new value of default_dict.nval after the procedure is
- completed.
-
- default_dict.nval: This is often an alias for temp_dict->nval. As
- such it can really have no separate existence until the procedure
- is complete. For this reason it should *not* be referenced inside
- the execution of a procedure. */
-/* Makes all preparations for reading from the data source and writing
- to the data sink. */
+/* Clears the variables in C that need to be cleared between
+ processing cases. */
static void
-open_active_file (void)
+clear_case (struct ccase *c)
{
- /* Sometimes we want to refer to the dictionary that applies to the
- data actually written to the sink. This is either temp_dict or
- default_dict. However, if TEMPORARY is not on, then temp_dict
- does not apply. So, we can set temp_dict to default_dict in this
- case. */
- if (!temporary)
+ size_t var_cnt = dict_get_var_cnt (default_dict);
+ size_t i;
+
+ for (i = 0; i < var_cnt; i++)
{
- temp_trns = n_trns;
- temp_dict = default_dict;
+ struct variable *v = dict_get_var (default_dict, i);
+ if (v->init && v->reinit)
+ {
+ if (v->type == NUMERIC)
+ case_data_rw (c, v->fv)->f = SYSMIS;
+ else
+ memset (case_data_rw (c, v->fv)->s, ' ', v->width);
+ }
}
-
- /* No cases passed to the procedure yet. */
- case_count = 0;
-
- /* The rest. */
- prepare_for_writing ();
- arrange_compaction ();
- make_temp_case ();
- vector_initialization ();
- discard_ctl_stack ();
- setup_lag ();
-
- /* Debug output. */
- debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
- vfm_source->name, vfm_sink->name));
- debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
- "temp_dict->nval=%d, compaction_nval=%d, "
- "default_dict.nval=%d\n",
- vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
- compaction_nval, default_dict.nval));
}
-\f
+
/* Closes the active file. */
static void
-close_active_file (struct write_case_data *data)
+close_active_file (void)
{
- /* Close the current case group. */
- if (case_count && data->endfunc != NULL)
- data->endfunc (data->aux);
-
- /* Stop lagging (catch up?). */
- if (n_lag)
+ /* Free memory for lag queue, and turn off lagging. */
+ if (n_lag > 0)
{
int i;
for (i = 0; i < n_lag; i++)
- free (lag_queue[i]);
+ case_destroy (&lag_queue[i]);
free (lag_queue);
n_lag = 0;
}
- /* Assume the dictionary from right before TEMPORARY, if any. Turn
- off TEMPORARY. */
+ /* Dictionary from before TEMPORARY becomes permanent.. */
if (temporary)
{
dict_destroy (default_dict);
/* Finish compaction. */
if (compaction_necessary)
- finish_compaction ();
+ dict_compact_values (default_dict);
- /* Old data sink --> New data source. */
- if (vfm_source && vfm_source->destroy_source)
- vfm_source->destroy_source ();
-
- vfm_source = vfm_sink;
- vfm_source_info.ncases = vfm_sink_info.ncases;
- vfm_source_info.nval = compaction_nval;
- vfm_source_info.case_size = (sizeof (struct ccase)
- + (compaction_nval - 1) * sizeof (union value));
- if (vfm_source->mode)
- vfm_source->mode ();
-
- /* Old data sink is gone now. */
+ /* Free data source. */
+ free_case_source (vfm_source);
+ vfm_source = NULL;
+
+ /* Old data sink becomes new data source. */
+ if (vfm_sink->class->make_source != NULL)
+ vfm_source = vfm_sink->class->make_source (vfm_sink);
+ free_case_sink (vfm_sink);
vfm_sink = NULL;
- /* Cancel TEMPORARY. */
+ /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
+ and get rid of all the transformations. */
cancel_temporary ();
-
- /* Free temporary cases. */
- free (temp_case);
- temp_case = NULL;
-
- free (compaction_case);
- compaction_case = NULL;
-
- /* Cancel PROCESS IF. */
expr_free (process_if_expr);
process_if_expr = NULL;
-
- /* Cancel FILTER if temporary. */
if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
dict_set_filter (default_dict, NULL);
-
- /* Cancel transformations. */
- cancel_transformations ();
-
- /* Turn off case limiter. */
dict_set_case_limit (default_dict, 0);
-
- /* Clear VECTOR vectors. */
dict_clear_vectors (default_dict);
-
- debug_printf (("vfm: procedure complete\n\n"));
+ cancel_transformations ();
}
\f
-/* Disk case stream. */
-
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
+/* Storage case stream. */
-/* Initializes the disk sink. */
-static void
-disk_stream_init (void)
-{
- disk_sink_file = tmpfile ();
- if (!disk_sink_file)
- {
- msg (ME, _("An error occurred attempting to create a temporary "
- "file for use as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
-}
+/* Information about storage sink or source. */
+struct storage_stream_info
+ {
+ struct casefile *casefile; /* Storage. */
+ };
-/* Reads all cases from the disk source and passes them one by one to
- write_case(). */
+/* Initializes a storage sink. */
static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
+storage_sink_open (struct case_sink *sink)
{
- int i;
+ struct storage_stream_info *info;
- for (i = 0; i < vfm_source_info.ncases; i++)
- {
- if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
- {
- msg (ME, _("An error occurred while attempting to read from "
- "a temporary file created for the active file: %s."),
- strerror (errno));
- err_failure ();
- return;
- }
-
- if (!write_case (wc_data))
- return;
- }
+ sink->aux = info = xmalloc (sizeof *info);
+ info->casefile = casefile_create (sink->value_cnt);
}
-/* Writes temp_case to the disk sink. */
+/* Destroys storage stream represented by INFO. */
static void
-disk_stream_write (void)
+destroy_storage_stream_info (struct storage_stream_info *info)
{
- union value *src_case;
-
- if (compaction_necessary)
- {
- compact_case (compaction_case, temp_case);
- src_case = (union value *) compaction_case;
- }
- else src_case = (union value *) temp_case;
-
- if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
- disk_sink_file) != 1)
+ if (info != NULL)
{
- msg (ME, _("An error occurred while attempting to write to a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
+ casefile_destroy (info->casefile);
+ free (info);
}
}
-/* Switches the stream from a sink to a source. */
+/* Writes case C to the storage sink SINK. */
static void
-disk_stream_mode (void)
+storage_sink_write (struct case_sink *sink, const struct ccase *c)
{
- /* Rewind the sink. */
- if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
- {
- msg (ME, _("An error occurred while attempting to rewind a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
-
- /* Sink --> source variables. */
- disk_source_file = disk_sink_file;
+ struct storage_stream_info *info = sink->aux;
+
+ casefile_append (info->casefile, c);
}
-/* Destroys the source's internal data. */
+/* Destroys internal data in SINK. */
static void
-disk_stream_destroy_source (void)
+storage_sink_destroy (struct case_sink *sink)
{
- if (disk_source_file)
- {
- fclose (disk_source_file);
- disk_source_file = NULL;
- }
+ destroy_storage_stream_info (sink->aux);
}
-/* Destroys the sink's internal data. */
-static void
-disk_stream_destroy_sink (void)
+/* Closes the sink and returns a storage source to read back the
+ written data. */
+static struct case_source *
+storage_sink_make_source (struct case_sink *sink)
{
- if (disk_sink_file)
- {
- fclose (disk_sink_file);
- disk_sink_file = NULL;
- }
+ struct case_source *source
+ = create_case_source (&storage_source_class, sink->aux);
+ sink->aux = NULL;
+ return source;
}
-/* Disk stream. */
-struct case_stream vfm_disk_stream =
+/* Storage sink. */
+const struct case_sink_class storage_sink_class =
{
- disk_stream_init,
- disk_stream_read,
- disk_stream_write,
- disk_stream_mode,
- disk_stream_destroy_source,
- disk_stream_destroy_sink,
- "disk",
+ "storage",
+ storage_sink_open,
+ storage_sink_write,
+ storage_sink_destroy,
+ storage_sink_make_source,
};
\f
-/* Memory case stream. */
-
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Storage source. */
-/* Maximum number of cases. */
-int memory_sink_max_cases;
-
-/* Initializes the memory stream variables for writing. */
-static void
-memory_stream_init (void)
+/* Returns the number of cases that will be read by
+ storage_source_read(). */
+static int
+storage_source_count (const struct case_source *source)
{
- memory_sink_cases = NULL;
- memory_sink_iter = NULL;
-
- assert (compaction_nval);
- memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
+ struct storage_stream_info *info = source->aux;
+
+ return casefile_get_case_cnt (info->casefile);
}
-/* Reads the case stream from memory and passes it to write_case(). */
+/* Reads all cases from the storage source and passes them one by one to
+ write_case(). */
static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
+storage_source_read (struct case_source *source,
+ struct ccase *output_case,
+ write_case_func *write_case, write_case_data wc_data)
{
- while (memory_source_cases != NULL)
+ struct storage_stream_info *info = source->aux;
+ struct ccase casefile_case;
+ struct casereader *reader;
+
+ for (reader = casefile_get_reader (info->casefile);
+ casereader_read (reader, &casefile_case);
+ case_destroy (&casefile_case))
{
- memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-
- {
- struct case_list *current = memory_source_cases;
- memory_source_cases = memory_source_cases->next;
- free (current);
- }
-
- if (!write_case (wc_data))
- return;
+ case_copy (output_case, 0,
+ &casefile_case, 0,
+ casefile_get_value_cnt (info->casefile));
+ write_case (wc_data);
}
+ casereader_destroy (reader);
}
-/* Writes temp_case to the memory stream. */
+/* Destroys the source's internal data. */
static void
-memory_stream_write (void)
+storage_source_destroy (struct case_source *source)
{
- struct case_list *new_case = malloc (sizeof (struct case_list)
- + ((compaction_nval - 1)
- * sizeof (union value)));
-
- /* If we've got memory to spare then add it to the linked list. */
- if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
- {
- if (compaction_necessary)
- compact_case (&new_case->c, temp_case);
- else
- memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
-
- /* Append case to linked list. */
- if (memory_sink_cases)
- memory_sink_iter = memory_sink_iter->next = new_case;
- else
- memory_sink_iter = memory_sink_cases = new_case;
- }
- else
- {
- /* Out of memory. Write the active file to disk. */
- struct case_list *cur, *next;
-
- /* Notify the user. */
- if (!new_case)
- msg (MW, _("Virtual memory exhausted. Paging active file "
- "to disk."));
- else
- msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
- "overflowed. Paging active file to disk."),
- MAX_WORKSPACE / 1024, memory_sink_max_cases,
- compaction_nval * sizeof (union value));
-
- free (new_case);
-
- /* Switch to a disk sink. */
- vfm_sink = &vfm_disk_stream;
- vfm_sink->init ();
- paging = 1;
-
- /* Terminate the list. */
- if (memory_sink_iter)
- memory_sink_iter->next = NULL;
-
- /* Write the cases to disk and destroy them. We can't call
- vfm->sink->write() because of compaction. */
- for (cur = memory_sink_cases; cur; cur = next)
- {
- next = cur->next;
- if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
- disk_sink_file) != 1)
- {
- msg (ME, _("An error occurred while attempting to "
- "write to a temporary file created as the "
- "active file, while paging to disk: %s."),
- strerror (errno));
- err_failure ();
- }
- free (cur);
- }
-
- /* Write the current case to disk. */
- vfm_sink->write ();
- }
+ destroy_storage_stream_info (source->aux);
}
-/* If the data is stored in memory, causes it to be written to disk.
- To be called only *between* procedure()s, not within them. */
-void
-page_to_disk (void)
+/* Storage source. */
+const struct case_source_class storage_source_class =
+ {
+ "storage",
+ storage_source_count,
+ storage_source_read,
+ storage_source_destroy,
+ };
+
+struct casefile *
+storage_source_get_casefile (struct case_source *source)
{
- if (vfm_source == &vfm_memory_stream)
- {
- /* Switch to a disk sink. */
- vfm_sink = &vfm_disk_stream;
- vfm_sink->init ();
- paging = 1;
-
- /* Write the cases to disk and destroy them. We can't call
- vfm->sink->write() because of compaction. */
- {
- struct case_list *cur, *next;
-
- for (cur = memory_source_cases; cur; cur = next)
- {
- next = cur->next;
- if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
- disk_sink_file) != 1)
- {
- msg (ME, _("An error occurred while attempting to "
- "write to a temporary file created as the "
- "active file, while paging to disk: %s."),
- strerror (errno));
- err_failure ();
- }
- free (cur);
- }
- }
-
- vfm_source = &vfm_disk_stream;
- vfm_source->mode ();
+ struct storage_stream_info *info = source->aux;
- vfm_sink = NULL;
- }
+ assert (source->class == &storage_source_class);
+ return info->casefile;
}
-/* Switch the memory stream from sink to source mode. */
-static void
-memory_stream_mode (void)
+struct case_source *
+storage_source_create (struct casefile *cf)
{
- /* Terminate the list. */
- if (memory_sink_iter)
- memory_sink_iter->next = NULL;
+ struct storage_stream_info *info;
- /* Sink --> source variables. */
- memory_source_cases = memory_sink_cases;
- memory_sink_cases = NULL;
-}
+ info = xmalloc (sizeof *info);
+ info->casefile = cf;
-/* Destroy all memory source data. */
-static void
-memory_stream_destroy_source (void)
-{
- struct case_list *cur, *next;
-
- for (cur = memory_source_cases; cur; cur = next)
- {
- next = cur->next;
- free (cur);
- }
- memory_source_cases = NULL;
+ return create_case_source (&storage_source_class, info);
}
+\f
+/* Null sink. Used by a few procedures that keep track of output
+ themselves and would throw away anything that the sink
+ contained anyway. */
-/* Destroy all memory sink data. */
-static void
-memory_stream_destroy_sink (void)
-{
- struct case_list *cur, *next;
-
- for (cur = memory_sink_cases; cur; cur = next)
- {
- next = cur->next;
- free (cur);
- }
- memory_sink_cases = NULL;
-}
-
-/* Memory stream. */
-struct case_stream vfm_memory_stream =
+const struct case_sink_class null_sink_class =
{
- memory_stream_init,
- memory_stream_read,
- memory_stream_write,
- memory_stream_mode,
- memory_stream_destroy_source,
- memory_stream_destroy_sink,
- "memory",
+ "null",
+ NULL,
+ NULL,
+ NULL,
+ NULL,
};
\f
-#include "debug-print.h"
-
-/* Add temp_case to the lag queue. */
-static void
-lag_case (void)
-{
- if (lag_count < n_lag)
- lag_count++;
- memcpy (lag_queue[lag_head], temp_case,
- dict_get_case_size (temp_dict));
- if (++lag_head >= n_lag)
- lag_head = 0;
-}
-
/* Returns a pointer to the lagged case from N_BEFORE cases before the
current one, or NULL if there haven't been that many cases yet. */
struct ccase *
lagged_case (int n_before)
{
+ assert (n_before >= 1 );
assert (n_before <= n_lag);
- if (n_before > lag_count)
+
+ if (n_before <= lag_count)
+ {
+ int index = lag_head - n_before;
+ if (index < 0)
+ index += n_lag;
+ return &lag_queue[index];
+ }
+ else
return NULL;
-
- {
- int index = lag_head - n_before;
- if (index < 0)
- index += n_lag;
- return lag_queue[index];
- }
}
-/* Transforms temp_case and writes it to the replacement active file
- if advisable. Returns nonzero if more cases can be accepted, zero
- otherwise. Do not call this function again after it has returned
- zero once. */
-int
-procedure_write_case (write_case_data wc_data)
+/* Appends TRNS to t_trns[], the list of all transformations to be
+ performed on data as it is read from the active file. */
+void
+add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
{
- /* Index of current transformation. */
- int cur_trns;
-
- /* Return value: whether it's reasonable to write any more cases. */
- int more_cases = 1;
-
- debug_printf ((_("transform: ")));
-
- cur_trns = f_trns;
- for (;;)
- {
- /* Output the case if this is temp_trns. */
- if (cur_trns == temp_trns)
- {
- debug_printf (("REC"));
-
- if (n_lag)
- lag_case ();
-
- vfm_sink_info.ncases++;
- vfm_sink->write ();
-
- if (dict_get_case_limit (default_dict))
- more_cases = (vfm_sink_info.ncases
- < dict_get_case_limit (default_dict));
- }
-
- /* Are we done? */
- if (cur_trns >= n_trns)
- break;
-
- debug_printf (("$%d", cur_trns));
-
- /* Decide which transformation should come next. */
- {
- int code;
-
- code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
- switch (code)
- {
- case -1:
- /* Next transformation. */
- cur_trns++;
- break;
- case -2:
- /* Delete this case. */
- goto done;
- default:
- /* Go to that transformation. */
- cur_trns = code;
- break;
- }
- }
- }
-
- /* Call the beginning of group function. */
- if (!case_count && wc_data->beginfunc != NULL)
- wc_data->beginfunc (wc_data->aux);
-
- /* Call the procedure if there is one and FILTER and PROCESS IF
- don't prohibit it. */
- if (wc_data->procfunc != NULL && !exclude_this_case ())
- wc_data->procfunc (temp_case, wc_data->aux);
-
- case_count++;
-
-done:
- debug_putc ('\n', stdout);
+ struct transformation *trns;
+ if (n_trns >= m_trns)
+ t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
+ trns = &t_trns[n_trns++];
+ trns->proc = proc;
+ trns->free = free;
+ trns->private = private;
+}
- clear_temp_case ();
-
- /* Return previously determined value. */
- return more_cases;
+/* Returns the index number that the next transformation added by
+ add_transformation() will receive. A trns_proc_func that
+ returns this index causes control flow to jump to it. */
+size_t
+next_transformation (void)
+{
+ return n_trns;
}
-/* Clears the variables in the temporary case that need to be
- cleared between processing cases. */
-static void
-clear_temp_case (void)
+/* Cancels all active transformations, including any transformations
+ created by the input program. */
+void
+cancel_transformations (void)
{
- /* FIXME? This is linear in the number of variables, but
- doesn't need to be, so it's an easy optimization target. */
- size_t var_cnt = dict_get_var_cnt (default_dict);
size_t i;
-
- for (i = 0; i < var_cnt; i++)
+ for (i = 0; i < n_trns; i++)
{
- struct variable *v = dict_get_var (default_dict, i);
- if (v->init && v->reinit)
- {
- if (v->type == NUMERIC)
- temp_case->data[v->fv].f = SYSMIS;
- else
- memset (temp_case->data[v->fv].s, ' ', v->width);
- }
+ struct transformation *t = &t_trns[i];
+ if (t->free != NULL)
+ t->free (t->private);
}
+ n_trns = f_trns = 0;
+ free (t_trns);
+ t_trns = NULL;
+ m_trns = 0;
+}
+\f
+/* Creates a case source with class CLASS and auxiliary data AUX
+ and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+ void *aux)
+{
+ struct case_source *source = xmalloc (sizeof *source);
+ source->class = class;
+ source->aux = aux;
+ return source;
}
-/* Returns nonzero if this case should be exclude as specified on
- FILTER or PROCESS IF, otherwise zero. */
-static int
-exclude_this_case (void)
+/* Destroys case source SOURCE. It is the caller's responsible to
+ call the source's destroy function, if any. */
+void
+free_case_source (struct case_source *source)
{
- /* FILTER. */
- struct variable *filter_var = dict_get_filter (default_dict);
- if (filter_var != NULL)
+ if (source != NULL)
{
- double f = temp_case->data[filter_var->fv].f;
- if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
- return 1;
+ if (source->class->destroy != NULL)
+ source->class->destroy (source);
+ free (source);
}
+}
- /* PROCESS IF. */
- if (process_if_expr != NULL
- && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
- return 1;
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source)
+{
+ return source != NULL && (source->class == &input_program_source_class
+ || source->class == &file_type_source_class);
+}
- return 0;
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+ const struct case_source_class *class)
+{
+ return source != NULL && source->class == class;
}
-/* Appends TRNS to t_trns[], the list of all transformations to be
- performed on data as it is read from the active file. */
+/* Creates a case sink to accept cases from the given DICT with
+ class CLASS and auxiliary data AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class,
+ const struct dictionary *dict,
+ void *aux)
+{
+ struct case_sink *sink = xmalloc (sizeof *sink);
+ sink->class = class;
+ sink->value_cnt = dict_get_compacted_value_cnt (dict);
+ sink->aux = aux;
+ return sink;
+}
+
+/* Destroys case sink SINK. */
void
-add_transformation (struct trns_header * trns)
+free_case_sink (struct case_sink *sink)
{
- if (n_trns >= m_trns)
+ if (sink != NULL)
{
- m_trns += 16;
- t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
+ if (sink->class->destroy != NULL)
+ sink->class->destroy (sink);
+ free (sink);
}
- t_trns[n_trns] = trns;
- trns->index = n_trns++;
}
+\f
+/* Represents auxiliary data for handling SPLIT FILE. */
+struct split_aux_data
+ {
+ size_t case_count; /* Number of cases so far. */
+ struct ccase prev_case; /* Data in previous case. */
+
+ /* Functions to call... */
+ void (*begin_func) (void *); /* ...before data. */
+ int (*proc_func) (struct ccase *, void *); /* ...with data. */
+ void (*end_func) (void *); /* ...after data. */
+ void *func_aux; /* Auxiliary data. */
+ };
-/* Cancels all active transformations, including any transformations
- created by the input program. */
+static int equal_splits (const struct ccase *, const struct ccase *);
+static int procedure_with_splits_callback (struct ccase *, void *);
+static void dump_splits (struct ccase *);
+
+/* Like procedure(), but it automatically breaks the case stream
+ into SPLIT FILE break groups. Before each group of cases with
+ identical SPLIT FILE variable values, BEGIN_FUNC is called.
+ Then PROC_FUNC is called with each case in the group.
+ END_FUNC is called when the group is finished. FUNC_AUX is
+ passed to each of the functions as auxiliary data.
+
+ If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
+ and END_FUNC will be called at all.
+
+ If SPLIT FILE is not in effect, then there is one break group
+ (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
+ will be called once. */
void
-cancel_transformations (void)
+procedure_with_splits (void (*begin_func) (void *aux),
+ int (*proc_func) (struct ccase *, void *aux),
+ void (*end_func) (void *aux),
+ void *func_aux)
{
- int i;
- for (i = 0; i < n_trns; i++)
- {
- if (t_trns[i]->free)
- t_trns[i]->free (t_trns[i]);
- free (t_trns[i]);
- }
- n_trns = f_trns = 0;
- if (m_trns > 32)
+ struct split_aux_data split_aux;
+
+ split_aux.case_count = 0;
+ case_nullify (&split_aux.prev_case);
+ split_aux.begin_func = begin_func;
+ split_aux.proc_func = proc_func;
+ split_aux.end_func = end_func;
+ split_aux.func_aux = func_aux;
+
+ open_active_file ();
+ internal_procedure (procedure_with_splits_callback, &split_aux);
+ if (split_aux.case_count > 0 && end_func != NULL)
+ end_func (func_aux);
+ close_active_file ();
+
+ case_destroy (&split_aux.prev_case);
+}
+
+/* procedure() callback used by procedure_with_splits(). */
+static int
+procedure_with_splits_callback (struct ccase *c, void *split_aux_)
+{
+ struct split_aux_data *split_aux = split_aux_;
+
+ /* Start a new series if needed. */
+ if (split_aux->case_count == 0
+ || !equal_splits (c, &split_aux->prev_case))
{
- free (t_trns);
- m_trns = 0;
+ if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+ split_aux->end_func (split_aux->func_aux);
+
+ dump_splits (c);
+ case_destroy (&split_aux->prev_case);
+ case_clone (&split_aux->prev_case, c);
+
+ if (split_aux->begin_func != NULL)
+ split_aux->begin_func (split_aux->func_aux);
}
+
+ split_aux->case_count++;
+ if (split_aux->proc_func != NULL)
+ return split_aux->proc_func (c, split_aux->func_aux);
+ else
+ return 1;
+}
+
+/* Compares the SPLIT FILE variables in cases A and B and returns
+ nonzero only if they differ. */
+static int
+equal_splits (const struct ccase *a, const struct ccase *b)
+{
+ return case_compare (a, b,
+ dict_get_split_vars (default_dict),
+ dict_get_split_cnt (default_dict)) == 0;
}
/* Dumps out the values of all the split variables for the case C. */
int i;
split_cnt = dict_get_split_cnt (default_dict);
+ if (split_cnt == 0)
+ return;
+
t = tab_create (3, split_cnt + 1, 0);
tab_dim (t, tab_natural_dimensions);
tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
assert (v->type == NUMERIC || v->type == ALPHA);
tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
- {
- union value val = c->data[v->fv];
- if (v->type == ALPHA)
- val.c = c->data[v->fv].s;
- data_out (temp_buf, &v->print, &val);
- }
+ data_out (temp_buf, &v->print, case_data (c, v->fv));
temp_buf[v->print.w] = 0;
tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
- val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+ val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
if (val_lab)
tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
}
tab_flags (t, SOMF_NO_TITLE);
tab_submit (t);
}
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+ multipass procedure. */
+struct multipass_split_aux_data
+ {
+ struct ccase prev_case; /* Data in previous case. */
+ struct casefile *casefile; /* Accumulates data for a split. */
+
+ /* Function to call with the accumulated data. */
+ void (*split_func) (const struct casefile *, void *);
+ void *func_aux; /* Auxiliary data. */
+ };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+ void *),
+ void *func_aux)
+{
+ struct multipass_split_aux_data aux;
+
+ assert (split_func != NULL);
+
+ open_active_file ();
+
+ case_nullify (&aux.prev_case);
+ aux.casefile = NULL;
+ aux.split_func = split_func;
+ aux.func_aux = func_aux;
+
+ internal_procedure (multipass_split_callback, &aux);
+ if (aux.casefile != NULL)
+ multipass_split_output (&aux);
+ case_destroy (&aux.prev_case);
+
+ close_active_file ();
+}
-/* This procfunc is substituted for the user-supplied procfunc when
- SPLIT FILE is active. This function forms a wrapper around that
- procfunc by dividing the input into series. */
+/* procedure() callback used by multipass_procedure_with_splits(). */
static int
-SPLIT_FILE_procfunc (struct ccase *c, void *data_)
+multipass_split_callback (struct ccase *c, void *aux_)
{
- struct write_case_data *data = data_;
- static struct ccase *prev_case;
- struct variable *const *split;
- size_t split_cnt;
- size_t i;
+ struct multipass_split_aux_data *aux = aux_;
- /* The first case always begins a new series. We also need to
- preserve the values of the case for later comparison. */
- if (case_count == 0)
+ /* Start a new series if needed. */
+ if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
{
- if (prev_case)
- free (prev_case);
- prev_case = xmalloc (vfm_sink_info.case_size);
- memcpy (prev_case, c, vfm_sink_info.case_size);
+ /* Pass any cases to split_func. */
+ if (aux->casefile != NULL)
+ multipass_split_output (aux);
+
+ /* Start a new casefile. */
+ aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
+ /* Record split values. */
dump_splits (c);
- if (data->beginfunc != NULL)
- data->beginfunc (data->aux);
-
- return data->procfunc (c, data->aux);
+ case_destroy (&aux->prev_case);
+ case_clone (&aux->prev_case, c);
}
- /* Compare the value of each SPLIT FILE variable to the values on
- the previous case. */
- split = dict_get_split_vars (default_dict);
- split_cnt = dict_get_split_cnt (default_dict);
- for (i = 0; i < split_cnt; i++)
- {
- struct variable *v = split[i];
-
- switch (v->type)
- {
- case NUMERIC:
- if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
- goto not_equal;
- break;
- case ALPHA:
- if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
- goto not_equal;
- break;
- default:
- assert (0);
- }
- }
- return data->procfunc (c, data->aux);
-
-not_equal:
- /* The values of the SPLIT FILE variable are different from the
- values on the previous case. That means that it's time to begin
- a new series. */
- if (data->endfunc != NULL)
- data->endfunc (data->aux);
- dump_splits (c);
- if (data->beginfunc != NULL)
- data->beginfunc (data->aux);
- memcpy (prev_case, c, vfm_sink_info.case_size);
- return data->procfunc (c, data->aux);
+ casefile_append (aux->casefile, c);
+
+ return 1;
+}
+
+static void
+multipass_split_output (struct multipass_split_aux_data *aux)
+{
+ assert (aux->casefile != NULL);
+ aux->split_func (aux->casefile, aux->func_aux);
+ casefile_destroy (aux->casefile);
+ aux->casefile = NULL;
}
-\f
-/* Case compaction. */
-/* Copies case SRC to case DEST, compacting it in the process. */
+
+/* Discards all the current state in preparation for a data-input
+ command like DATA LIST or GET. */
void
-compact_case (struct ccase *dest, const struct ccase *src)
+discard_variables (void)
{
- int i;
- int nval = 0;
- size_t var_cnt;
-
- assert (compaction_necessary);
+ dict_clear (default_dict);
+ default_handle = NULL;
- if (temporary == 2)
+ n_lag = 0;
+
+ if (vfm_source != NULL)
{
- if (dest != compaction_case)
- memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
- return;
+ free_case_source (vfm_source);
+ vfm_source = NULL;
}
- /* Copy all the variables except the scratch variables from SRC to
- DEST. */
- var_cnt = dict_get_var_cnt (default_dict);
- for (i = 0; i < var_cnt; i++)
- {
- struct variable *v = dict_get_var (default_dict, i);
-
- if (v->name[0] == '#')
- continue;
+ cancel_transformations ();
- if (v->type == NUMERIC)
- dest->data[nval++] = src->data[v->fv];
- else
- {
- int w = DIV_RND_UP (v->width, sizeof (union value));
-
- memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
- nval += w;
- }
- }
-}
+ ctl_stack_clear ();
-/* Reassigns `fv' for each variable. Deletes scratch variables. */
-static void
-finish_compaction (void)
-{
- int i;
+ expr_free (process_if_expr);
+ process_if_expr = NULL;
- for (i = 0; i < dict_get_var_cnt (default_dict); )
- {
- struct variable *v = dict_get_var (default_dict, i);
+ cancel_temporary ();
- if (v->name[0] == '#')
- dict_delete_var (default_dict, v);
- else
- i++;
- }
- dict_compact_values (default_dict);
+ pgm_state = STATE_INIT;
}
-
-