You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- 02111-1307, USA. */
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA. */
#include <config.h>
#include "vfm.h"
#include "vfmP.h"
-#include <assert.h>
+#include "error.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> /* Required by SunOS4. */
#endif
#include "alloc.h"
-#include "do-ifP.h"
+#include "case.h"
+#include "casefile.h"
+#include "command.h"
+#include "dictionary.h"
+#include "ctl-stack.h"
#include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
#include "misc.h"
-#include "random.h"
#include "settings.h"
#include "som.h"
#include "str.h"
#include "var.h"
#include "value-labels.h"
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+
/*
Virtual File Manager (vfm):
/* Procedure execution data. */
struct write_case_data
{
- /* Functions to call... */
- void (*begin_func) (void *); /* ...before data. */
- int (*proc_func) (struct ccase *, void *); /* ...with data. */
- void (*end_func) (void *); /* ...after data. */
- void *func_aux; /* Auxiliary data. */
-
- /* Extra auxiliary data. */
- void *aux;
+ /* Function to call for each case. */
+ int (*proc_func) (struct ccase *, void *); /* Function. */
+ void *aux; /* Auxiliary data. */
+
+ struct ccase trns_case; /* Case used for transformations. */
+ struct ccase sink_case; /* Case written to sink, if
+ compaction is necessary. */
+ size_t cases_written; /* Cases output so far. */
+ size_t cases_analyzed; /* Cases passed to procedure so far. */
};
/* The current active file, from which cases are read. */
/* Nonzero if the case needs to have values deleted before being
stored, zero otherwise. */
-int compaction_necessary;
-
-/* Number of values after compaction. */
-int compaction_nval;
-
-/* Temporary case buffer with enough room for `compaction_nval'
- `value's. */
-struct ccase *compaction_case;
-
-/* Nonzero means that we've overflowed our allotted workspace.
- After that happens once during a session, we always store the
- active file on disk instead of in memory. (This policy may be
- too aggressive.) */
-static int workspace_overflow = 0;
+static int compaction_necessary;
/* Time at which vfm was last invoked. */
-time_t last_vfm_invocation;
-
-/* Number of cases passed to proc_func(). */
-static int case_count;
+static time_t last_vfm_invocation;
/* Lag queue. */
int n_lag; /* Number of cases to lag. */
static int lag_count; /* Number of cases in lag_queue so far. */
static int lag_head; /* Index where next case will be added. */
-static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
+static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
-static struct ccase *create_trns_case (struct dictionary *dict);
+static void internal_procedure (int (*proc_func) (struct ccase *, void *),
+ void *aux);
+static void update_last_vfm_invocation (void);
+static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (void);
-static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_proc_func (struct ccase *, void *);
-static void finish_compaction (void);
-static void lag_case (const struct ccase *);
-static write_case_func procedure_write_case;
-static void clear_case (struct ccase *);
-static int exclude_this_case (const struct ccase *, int case_num);
+static int write_case (struct write_case_data *wc_data);
+static int execute_transformations (struct ccase *c,
+ struct transformation *trns,
+ int first_idx, int last_idx,
+ int case_num);
+static int filter_case (const struct ccase *c, int case_num);
+static void lag_case (const struct ccase *c);
+static void clear_case (struct ccase *c);
+static void close_active_file (void);
\f
/* Public functions. */
-/* Auxiliary data for executing a procedure. */
-struct procedure_aux_data
- {
- struct ccase *trns_case; /* Case used for transformations. */
- size_t cases_written; /* Number of cases written so far. */
- };
-
-/* Auxiliary data for SPLIT FILE. */
-struct split_aux_data
- {
- struct ccase *prev_case; /* Data in previous case. */
- };
+/* Returns the last time the data was read. */
+time_t
+vfm_last_invocation (void)
+{
+ if (last_vfm_invocation == 0)
+ update_last_vfm_invocation ();
+ return last_vfm_invocation;
+}
-/* Reads all the cases from the active file, transforms them by
- the active set of transformations, passes each of them to
- PROC_FUNC, and writes them to a new active file.
+/* Reads the data from the input program and writes it to a new
+ active file. For each case we read from the input program, we
+ do the following
- Divides the active file into zero or more series of one or more
- cases each. BEGIN_FUNC is called before each series. END_FUNC is
- called after each series.
+ 1. Execute permanent transformations. If these drop the case,
+ start the next case from step 1.
- Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
- PROC_FUNC, and END_FUNC as auxiliary data. */
+ 2. N OF CASES. If we have already written N cases, start the
+ next case from step 1.
+
+ 3. Write case to replacement active file.
+
+ 4. Execute temporary transformations. If these drop the case,
+ start the next case from step 1.
+
+ 5. FILTER, PROCESS IF. If these drop the case, start the next
+ case from step 1.
+
+ 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
+ cases, start the next case from step 1.
+
+ 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
void
-procedure (void (*begin_func) (void *),
- int (*proc_func) (struct ccase *, void *),
- void (*end_func) (void *),
- void *func_aux)
+procedure (int (*proc_func) (struct ccase *, void *), void *aux)
{
- static int recursive_call;
-
- struct write_case_data procedure_write_data;
- struct procedure_aux_data proc_aux;
-
- struct write_case_data split_file_data;
- struct split_aux_data split_aux;
- int split;
-
- assert (++recursive_call == 1);
-
- proc_aux.cases_written = 0;
- proc_aux.trns_case = create_trns_case (default_dict);
-
- /* Normally we just use the data passed by the user. */
- procedure_write_data.begin_func = begin_func;
- procedure_write_data.proc_func = proc_func;
- procedure_write_data.end_func = end_func;
- procedure_write_data.func_aux = func_aux;
- procedure_write_data.aux = &proc_aux;
-
- /* Under SPLIT FILE, we add a layer of indirection. */
- split = dict_get_split_cnt (default_dict) > 0;
- if (split)
+ if (proc_func == NULL
+ && case_source_is_class (vfm_source, &storage_source_class)
+ && vfm_sink == NULL
+ && !temporary
+ && n_trns == 0)
{
- split_file_data = procedure_write_data;
- split_file_data.aux = &split_aux;
-
- split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
-
- procedure_write_data.begin_func = NULL;
- procedure_write_data.proc_func = SPLIT_FILE_proc_func;
- procedure_write_data.end_func = end_func;
- procedure_write_data.func_aux = &split_file_data;
+ /* Nothing to do. */
+ update_last_vfm_invocation ();
+ return;
}
- last_vfm_invocation = time (NULL);
-
open_active_file ();
- if (vfm_source != NULL)
- vfm_source->class->read (vfm_source,
- proc_aux.trns_case,
- procedure_write_case, &procedure_write_data);
- close_active_file (&procedure_write_data);
-
- if (split)
- free (split_aux.prev_case);
-
- free (proc_aux.trns_case);
-
- assert (--recursive_call == 0);
+ internal_procedure (proc_func, aux);
+ close_active_file ();
}
-\f
-/* Active file processing support. Subtly different semantics from
- procedure(). */
-
-static write_case_func process_active_file_write_case;
-
-/* The case_func might want us to stop calling it. */
-static int not_canceled;
-/* Reads all the cases from the active file and passes them
- one-by-one to CASE_FUNC. Before any cases are passed, calls
- BEGIN_FUNC. After all the cases have been passed, calls
- END_FUNC. BEGIN_FUNC, CASE_FUNC, and END_FUNC can write to
- the output file by calling process_active_file_output_case().
-
- process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
-void
-process_active_file (void (*begin_func) (void *),
- int (*case_func) (struct ccase *, void *),
- void (*end_func) (void *),
- void *func_aux)
+/* Executes a procedure, as procedure(), except that the caller
+ is responsible for calling open_active_file() and
+ close_active_file(). */
+static void
+internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
{
- struct procedure_aux_data proc_aux;
- struct write_case_data process_active_write_data;
+ static int recursive_call;
- proc_aux.cases_written = 0;
- proc_aux.trns_case = create_trns_case (default_dict);
+ struct write_case_data wc_data;
- process_active_write_data.begin_func = begin_func;
- process_active_write_data.proc_func = case_func;
- process_active_write_data.end_func = end_func;
- process_active_write_data.func_aux = func_aux;
- process_active_write_data.aux = &proc_aux;
+ assert (++recursive_call == 1);
- not_canceled = 1;
+ wc_data.proc_func = proc_func;
+ wc_data.aux = aux;
+ create_trns_case (&wc_data.trns_case, default_dict);
+ case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
+ wc_data.cases_written = 0;
- open_active_file ();
- begin_func (func_aux);
- if (vfm_source != NULL)
- vfm_source->class->read (vfm_source, proc_aux.trns_case,
- process_active_file_write_case,
- &process_active_write_data);
- end_func (func_aux);
- close_active_file (&process_active_write_data);
-}
+ update_last_vfm_invocation ();
-/* Pass the current case to case_func. */
-static int
-process_active_file_write_case (struct write_case_data *wc_data)
-{
- struct procedure_aux_data *proc_aux = wc_data->aux;
- int cur_trns; /* Index of current transformation. */
+ if (vfm_source != NULL)
+ vfm_source->class->read (vfm_source,
+ &wc_data.trns_case,
+ write_case, &wc_data);
- for (cur_trns = f_trns; cur_trns != temp_trns; )
- {
- int code;
-
- code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
- case_count + 1);
- switch (code)
- {
- case -1:
- /* Next transformation. */
- cur_trns++;
- break;
- case -2:
- /* Delete this case. */
- goto done;
- default:
- /* Go to that transformation. */
- cur_trns = code;
- break;
- }
- }
+ case_destroy (&wc_data.sink_case);
+ case_destroy (&wc_data.trns_case);
- if (n_lag)
- lag_case (proc_aux->trns_case);
-
- /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
- if (not_canceled && !exclude_this_case (proc_aux->trns_case, case_count + 1))
- not_canceled = wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
-
- case_count++;
-
- done:
- clear_case (proc_aux->trns_case);
-
- return 1;
+ assert (--recursive_call == 0);
}
-/* Write the given case to the active file. */
-void
-process_active_file_output_case (const struct ccase *c)
+/* Updates last_vfm_invocation. */
+static void
+update_last_vfm_invocation (void)
{
- vfm_sink->class->write (vfm_sink, c);
+ last_vfm_invocation = time (NULL);
}
-\f
+
/* Creates and returns a case, initializing it from the vectors
that say which `value's need to be initialized just once, and
which ones need to be re-initialized before every case. */
-static struct ccase *
-create_trns_case (struct dictionary *dict)
+static void
+create_trns_case (struct ccase *trns_case, struct dictionary *dict)
{
- struct ccase *c = xmalloc (dict_get_case_size (dict));
size_t var_cnt = dict_get_var_cnt (dict);
size_t i;
+ case_create (trns_case, dict_get_next_value_idx (dict));
for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (dict, i);
+ union value *value = case_data_rw (trns_case, v->fv);
- if (v->type == NUMERIC)
- {
- if (v->reinit)
- c->data[v->fv].f = 0.0;
- else
- c->data[v->fv].f = SYSMIS;
- }
+ if (v->type == NUMERIC)
+ value->f = v->reinit ? 0.0 : SYSMIS;
else
- memset (c->data[v->fv].s, ' ', v->width);
+ memset (value->s, ' ', v->width);
}
- return c;
}
-\f
-/* Opening the active file. */
-/* It might be usefully noted that the following several functions are
- given in the order that they are called by open_active_file(). */
-
-/* Prepare to write to the replacement active file. */
+/* Makes all preparations for reading from the data source and writing
+ to the data sink. */
static void
-prepare_for_writing (void)
+open_active_file (void)
{
+ /* Make temp_dict refer to the dictionary right before data
+ reaches the sink */
+ if (!temporary)
+ {
+ temp_trns = n_trns;
+ temp_dict = default_dict;
+ }
+
+ /* Figure out compaction. */
+ compaction_necessary = (dict_get_next_value_idx (temp_dict)
+ != dict_get_compacted_value_cnt (temp_dict));
+
+ /* Prepare sink. */
if (vfm_sink == NULL)
+ vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+ if (vfm_sink->class->open != NULL)
+ vfm_sink->class->open (vfm_sink);
+
+ /* Allocate memory for lag queue. */
+ if (n_lag > 0)
{
- if (workspace_overflow)
- vfm_sink = create_case_sink (&disk_sink_class, NULL);
- else
- vfm_sink = create_case_sink (&memory_sink_class, NULL);
+ int i;
+
+ lag_count = 0;
+ lag_head = 0;
+ lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
+ for (i = 0; i < n_lag; i++)
+ case_nullify (&lag_queue[i]);
}
+
+ /* Close any unclosed DO IF or LOOP constructs. */
+ ctl_stack_clear ();
}
-/* Arrange for compacting the output cases for storage. */
-static void
-arrange_compaction (void)
+/* Transforms trns_case and writes it to the replacement active
+ file if advisable. Returns nonzero if more cases can be
+ accepted, zero otherwise. Do not call this function again
+ after it has returned zero once. */
+static int
+write_case (struct write_case_data *wc_data)
{
- int count_values = 0;
+ /* Execute permanent transformations. */
+ if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
+ wc_data->cases_written + 1))
+ goto done;
- {
- int i;
-
- /* Count up the number of `value's that will be output. */
- for (i = 0; i < dict_get_var_cnt (temp_dict); i++)
- {
- struct variable *v = dict_get_var (temp_dict, i);
-
- if (dict_class_from_id (v->name) != DC_SCRATCH)
- {
- assert (v->nv > 0);
- count_values += v->nv;
- }
- }
- assert (temporary == 2
- || count_values <= dict_get_next_value_idx (temp_dict));
- }
+ /* N OF CASES. */
+ if (dict_get_case_limit (default_dict)
+ && wc_data->cases_written >= dict_get_case_limit (default_dict))
+ goto done;
+ wc_data->cases_written++;
+
+ /* Write case to LAG queue. */
+ if (n_lag)
+ lag_case (&wc_data->trns_case);
+
+ /* Write case to replacement active file. */
+ if (vfm_sink->class->write != NULL)
+ {
+ if (compaction_necessary)
+ {
+ dict_compact_case (temp_dict, &wc_data->sink_case,
+ &wc_data->trns_case);
+ vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
+ }
+ else
+ vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
+ }
- /* Compaction is only necessary if the number of `value's to output
- differs from the number already present. */
- compaction_nval = count_values;
- if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
- compaction_necessary = 1;
- else
- compaction_necessary = 0;
+ /* Execute temporary transformations. */
+ if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
+ wc_data->cases_written))
+ goto done;
- if (vfm_sink->class->open != NULL)
- vfm_sink->class->open (vfm_sink);
+ /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
+ if (filter_case (&wc_data->trns_case, wc_data->cases_written)
+ || (dict_get_case_limit (temp_dict)
+ && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
+ goto done;
+ wc_data->cases_analyzed++;
- if (compaction_necessary)
- compaction_case = xmalloc (sizeof (struct ccase)
- + sizeof (union value) * (compaction_nval - 1));
+ /* Pass case to procedure. */
+ if (wc_data->proc_func != NULL)
+ wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
+ done:
+ clear_case (&wc_data->trns_case);
+ return 1;
}
-#if DEBUGGING
-/* Returns the name of the variable that owns the index CCASE_INDEX
- into ccase. */
-static const char *
-index_to_varname (int ccase_index)
+/* Transforms case C using the transformations in TRNS[] with
+ indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
+ become case CASE_NUM (1-based) in the output file. Returns
+ zero if the case was filtered out by one of the
+ transformations, nonzero otherwise. */
+static int
+execute_transformations (struct ccase *c,
+ struct transformation *trns,
+ int first_idx, int last_idx,
+ int case_num)
{
- int i;
+ int idx;
- for (i = 0; i < default_dict.nvar; i++)
+ for (idx = first_idx; idx != last_idx; )
{
- struct variable *v = default_dict.var[i];
-
- if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
- return default_dict.var[i]->name;
+ struct transformation *t = &trns[idx];
+ int retval = t->proc (t->private, c, case_num);
+ switch (retval)
+ {
+ case -1:
+ idx++;
+ break;
+
+ case -2:
+ return 0;
+
+ default:
+ idx = retval;
+ break;
+ }
+ }
+
+ return 1;
+}
+
+/* Returns nonzero if case C with case number CASE_NUM should be
+ exclude as specified on FILTER or PROCESS IF, otherwise
+ zero. */
+static int
+filter_case (const struct ccase *c, int case_idx)
+{
+ /* FILTER. */
+ struct variable *filter_var = dict_get_filter (default_dict);
+ if (filter_var != NULL)
+ {
+ double f = case_num (c, filter_var->fv);
+ if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
+ return 1;
}
- return _("<NOVAR>");
+
+ /* PROCESS IF. */
+ if (process_if_expr != NULL
+ && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
+ return 1;
+
+ return 0;
}
-#endif
-/* Sets all the lag-related variables based on value of n_lag. */
+/* Add C to the lag queue. */
static void
-setup_lag (void)
+lag_case (const struct ccase *c)
{
- int i;
-
- if (n_lag == 0)
- return;
-
- lag_count = 0;
- lag_head = 0;
- lag_queue = xmalloc (n_lag * sizeof *lag_queue);
- for (i = 0; i < n_lag; i++)
- lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
+ if (lag_count < n_lag)
+ lag_count++;
+ case_destroy (&lag_queue[lag_head]);
+ case_clone (&lag_queue[lag_head], c);
+ if (++lag_head >= n_lag)
+ lag_head = 0;
}
-/* There is a lot of potential confusion in the vfm and related
- routines over the number of `value's at each stage of the process.
- Here is each nval count, with explanation, as set up by
- open_active_file():
-
- temp_dict->nval: Number of `value's in the cases after the
- transformations leading up to TEMPORARY have been performed.
-
- compaction_nval: Number of `value's in the cases after the
- transformations leading up to TEMPORARY have been performed
- and the case has been compacted by compact_case(), if
- compaction is necessary. This the number of `value's in the
- cases saved by the sink stream. (However, note that the cases
- passed to the sink stream have not yet been compacted. It is
- the responsibility of the data sink to call compact_case().)
- `compaction' becomes the new value of default_dict.nval after
- the procedure is completed.
-
- default_dict.nval: This is often an alias for temp_dict->nval.
- As such it can really have no separate existence until the
- procedure is complete. For this reason it should *not* be
- referenced inside the execution of a procedure. */
-/* Makes all preparations for reading from the data source and writing
- to the data sink. */
+/* Clears the variables in C that need to be cleared between
+ processing cases. */
static void
-open_active_file (void)
+clear_case (struct ccase *c)
{
- /* Sometimes we want to refer to the dictionary that applies to the
- data actually written to the sink. This is either temp_dict or
- default_dict. However, if TEMPORARY is not on, then temp_dict
- does not apply. So, we can set temp_dict to default_dict in this
- case. */
- if (!temporary)
+ size_t var_cnt = dict_get_var_cnt (default_dict);
+ size_t i;
+
+ for (i = 0; i < var_cnt; i++)
{
- temp_trns = n_trns;
- temp_dict = default_dict;
+ struct variable *v = dict_get_var (default_dict, i);
+ if (v->init && v->reinit)
+ {
+ if (v->type == NUMERIC)
+ case_data_rw (c, v->fv)->f = SYSMIS;
+ else
+ memset (case_data_rw (c, v->fv)->s, ' ', v->width);
+ }
}
-
- /* No cases passed to the procedure yet. */
- case_count = 0;
-
- /* The rest. */
- prepare_for_writing ();
- arrange_compaction ();
- discard_ctl_stack ();
- setup_lag ();
}
-\f
+
/* Closes the active file. */
static void
-close_active_file (struct write_case_data *data)
+close_active_file (void)
{
- /* Close the current case group. */
- if (case_count && data->end_func != NULL)
- data->end_func (data->func_aux);
-
- /* Stop lagging (catch up?). */
- if (n_lag)
+ /* Free memory for lag queue, and turn off lagging. */
+ if (n_lag > 0)
{
int i;
for (i = 0; i < n_lag; i++)
- free (lag_queue[i]);
+ case_destroy (&lag_queue[i]);
free (lag_queue);
n_lag = 0;
}
- /* Assume the dictionary from right before TEMPORARY, if any. Turn
- off TEMPORARY. */
+ /* Dictionary from before TEMPORARY becomes permanent.. */
if (temporary)
{
dict_destroy (default_dict);
/* Finish compaction. */
if (compaction_necessary)
- finish_compaction ();
+ dict_compact_values (default_dict);
- /* Old data sink --> New data source. */
- if (vfm_source != NULL)
- {
- if (vfm_source->class->destroy != NULL)
- vfm_source->class->destroy (vfm_source);
- free (vfm_source);
- }
+ /* Free data source. */
+ free_case_source (vfm_source);
+ vfm_source = NULL;
+ /* Old data sink becomes new data source. */
if (vfm_sink->class->make_source != NULL)
vfm_source = vfm_sink->class->make_source (vfm_sink);
- else
- vfm_source = NULL;
-
- /* Old data sink is gone now. */
- free (vfm_sink);
+ free_case_sink (vfm_sink);
vfm_sink = NULL;
- /* Cancel TEMPORARY. */
+ /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
+ and get rid of all the transformations. */
cancel_temporary ();
-
- /* Free temporary cases. */
- free (compaction_case);
- compaction_case = NULL;
-
- /* Cancel PROCESS IF. */
expr_free (process_if_expr);
process_if_expr = NULL;
-
- /* Cancel FILTER if temporary. */
if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
dict_set_filter (default_dict, NULL);
-
- /* Cancel transformations. */
- cancel_transformations ();
-
- /* Turn off case limiter. */
dict_set_case_limit (default_dict, 0);
-
- /* Clear VECTOR vectors. */
dict_clear_vectors (default_dict);
+ cancel_transformations ();
}
\f
-/* Disk case stream. */
+/* Storage case stream. */
-/* Information about disk sink or source. */
-struct disk_stream_info
+/* Information about storage sink or source. */
+struct storage_stream_info
{
- FILE *file; /* Output file. */
- size_t case_cnt; /* Number of cases written so far. */
- size_t case_size; /* Number of bytes in case. */
+ struct casefile *casefile; /* Storage. */
};
-/* Initializes the disk sink. */
+/* Initializes a storage sink. */
static void
-disk_sink_create (struct case_sink *sink)
+storage_sink_open (struct case_sink *sink)
{
- struct disk_stream_info *info = xmalloc (sizeof *info);
- info->file = tmpfile ();
- info->case_cnt = 0;
- info->case_size = compaction_nval;
- sink->aux = info;
- if (info->file == NULL)
- {
- msg (ME, _("An error occurred attempting to create a temporary "
- "file for use as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
+ struct storage_stream_info *info;
+
+ sink->aux = info = xmalloc (sizeof *info);
+ info->casefile = casefile_create (sink->value_cnt);
}
-/* Writes case C to the disk sink. */
+/* Destroys storage stream represented by INFO. */
static void
-disk_sink_write (struct case_sink *sink, const struct ccase *c)
+destroy_storage_stream_info (struct storage_stream_info *info)
{
- struct disk_stream_info *info = sink->aux;
- const union value *src_case;
-
- if (compaction_necessary)
+ if (info != NULL)
{
- compact_case (compaction_case, c);
- src_case = compaction_case->data;
+ casefile_destroy (info->casefile);
+ free (info);
}
- else src_case = c->data;
+}
- info->case_cnt++;
- if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
- info->file) != 1)
- {
- msg (ME, _("An error occurred while attempting to write to a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
+/* Writes case C to the storage sink SINK. */
+static void
+storage_sink_write (struct case_sink *sink, const struct ccase *c)
+{
+ struct storage_stream_info *info = sink->aux;
+
+ casefile_append (info->casefile, c);
}
-/* Destroys the sink's internal data. */
+/* Destroys internal data in SINK. */
static void
-disk_sink_destroy (struct case_sink *sink)
+storage_sink_destroy (struct case_sink *sink)
{
- struct disk_stream_info *info = sink->aux;
- if (info->file != NULL)
- fclose (info->file);
+ destroy_storage_stream_info (sink->aux);
}
-/* Closes and destroys the sink and returns a disk source to read
- back the written data. */
+/* Closes the sink and returns a storage source to read back the
+ written data. */
static struct case_source *
-disk_sink_make_source (struct case_sink *sink)
+storage_sink_make_source (struct case_sink *sink)
{
- struct disk_stream_info *info = sink->aux;
-
- /* Rewind the file. */
- assert (info->file != NULL);
- if (fseek (info->file, 0, SEEK_SET) != 0)
- {
- msg (ME, _("An error occurred while attempting to rewind a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
-
- return create_case_source (&disk_source_class, default_dict, info);
+ struct case_source *source
+ = create_case_source (&storage_source_class, sink->aux);
+ sink->aux = NULL;
+ return source;
}
-/* Disk sink. */
-const struct case_sink_class disk_sink_class =
+/* Storage sink. */
+const struct case_sink_class storage_sink_class =
{
- "disk",
- disk_sink_create,
- disk_sink_write,
- disk_sink_destroy,
- disk_sink_make_source,
+ "storage",
+ storage_sink_open,
+ storage_sink_write,
+ storage_sink_destroy,
+ storage_sink_make_source,
};
\f
-/* Disk source. */
+/* Storage source. */
/* Returns the number of cases that will be read by
- disk_source_read(). */
+ storage_source_read(). */
static int
-disk_source_count (const struct case_source *source)
+storage_source_count (const struct case_source *source)
{
- struct disk_stream_info *info = source->aux;
+ struct storage_stream_info *info = source->aux;
- return info->case_cnt;
+ return casefile_get_case_cnt (info->casefile);
}
-/* Reads all cases from the disk source and passes them one by one to
+/* Reads all cases from the storage source and passes them one by one to
write_case(). */
static void
-disk_source_read (struct case_source *source,
- struct ccase *c,
- write_case_func *write_case, write_case_data wc_data)
+storage_source_read (struct case_source *source,
+ struct ccase *output_case,
+ write_case_func *write_case, write_case_data wc_data)
{
- struct disk_stream_info *info = source->aux;
- int i;
+ struct storage_stream_info *info = source->aux;
+ struct ccase casefile_case;
+ struct casereader *reader;
- for (i = 0; i < info->case_cnt; i++)
+ for (reader = casefile_get_reader (info->casefile);
+ casereader_read (reader, &casefile_case);
+ case_destroy (&casefile_case))
{
- if (!fread (c, info->case_size, 1, info->file))
- {
- msg (ME, _("An error occurred while attempting to read from "
- "a temporary file created for the active file: %s."),
- strerror (errno));
- err_failure ();
- break;
- }
-
- if (!write_case (wc_data))
- break;
+ case_copy (output_case, 0,
+ &casefile_case, 0,
+ casefile_get_value_cnt (info->casefile));
+ write_case (wc_data);
}
+ casereader_destroy (reader);
}
/* Destroys the source's internal data. */
static void
-disk_source_destroy (struct case_source *source)
+storage_source_destroy (struct case_source *source)
{
- struct disk_stream_info *info = source->aux;
- if (info->file != NULL)
- fclose (info->file);
- free (info);
+ destroy_storage_stream_info (source->aux);
}
-/* Disk source. */
-const struct case_source_class disk_source_class =
- {
- "disk",
- disk_source_count,
- disk_source_read,
- disk_source_destroy,
- };
-\f
-/* Memory case stream. */
-
-/* Memory sink data. */
-struct memory_sink_info
- {
- size_t case_cnt; /* Number of cases. */
- size_t case_size; /* Case size in bytes. */
- int max_cases; /* Maximum cases before switching to disk. */
- struct case_list *head; /* First case in list. */
- struct case_list *tail; /* Last case in list. */
- };
-
-/* Memory source data. */
-struct memory_source_info
+/* Storage source. */
+const struct case_source_class storage_source_class =
{
- size_t case_cnt; /* Number of cases. */
- size_t case_size; /* Case size in bytes. */
- struct case_list *cases; /* List of cases. */
+ "storage",
+ storage_source_count,
+ storage_source_read,
+ storage_source_destroy,
};
-/* Creates the SINK memory sink. */
-static void
-memory_sink_create (struct case_sink *sink)
+struct casefile *
+storage_source_get_casefile (struct case_source *source)
{
- struct memory_sink_info *info;
-
- sink->aux = info = xmalloc (sizeof *info);
+ struct storage_stream_info *info = source->aux;
- assert (compaction_nval > 0);
- info->case_cnt = 0;
- info->case_size = compaction_nval * sizeof (union value);
- info->max_cases = set_max_workspace / info->case_size;
- info->head = info->tail = NULL;
+ assert (source->class == &storage_source_class);
+ return info->casefile;
}
-/* Writes case C to memory sink SINK. */
-static void
-memory_sink_write (struct case_sink *sink, const struct ccase *c)
+struct case_source *
+storage_source_create (struct casefile *cf)
{
- struct memory_sink_info *info = sink->aux;
- size_t case_size;
- struct case_list *new_case;
-
- case_size = sizeof (struct case_list)
- + ((compaction_nval - 1) * sizeof (union value));
- new_case = malloc (case_size);
-
- /* If we've got memory to spare then add it to the linked list. */
- if (info->case_cnt <= info->max_cases && new_case != NULL)
- {
- info->case_cnt++;
-
- /* Append case to linked list. */
- new_case->next = NULL;
- if (info->head != NULL)
- info->tail->next = new_case;
- else
- info->head = new_case;
- info->tail = new_case;
+ struct storage_stream_info *info;
- /* Copy data into case. */
- if (compaction_necessary)
- compact_case (&new_case->c, c);
- else
- memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
- }
- else
- {
- /* Out of memory. Write the active file to disk. */
- struct case_list *cur, *next;
+ info = xmalloc (sizeof *info);
+ info->casefile = cf;
- /* Notify the user. */
- if (!new_case)
- msg (MW, _("Virtual memory exhausted. Writing active file "
- "to disk."));
- else
- msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
- "overflowed. Writing active file to disk."),
- set_max_workspace / 1024, info->max_cases,
- compaction_nval * sizeof (union value));
-
- free (new_case);
-
- /* Switch to a disk sink. */
- vfm_sink = create_case_sink (&disk_sink_class, NULL);
- vfm_sink->class->open (vfm_sink);
- workspace_overflow = 1;
-
- /* Write the cases to disk and destroy them. We can't call
- vfm->sink->write() because of compaction. */
- for (cur = info->head; cur; cur = next)
- {
- next = cur->next;
- if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
- vfm_sink->aux) != 1)
- {
- msg (ME, _("An error occurred while attempting to "
- "write to a temporary file created as the "
- "active file: %s."),
- strerror (errno));
- err_failure ();
- }
- free (cur);
- }
-
- /* Write the current case to disk. */
- vfm_sink->class->write (vfm_sink, c);
- }
+ return create_case_source (&storage_source_class, info);
}
+\f
+/* Null sink. Used by a few procedures that keep track of output
+ themselves and would throw away anything that the sink
+ contained anyway. */
-/* If the data is stored in memory, causes it to be written to disk.
- To be called only *between* procedure()s, not within them. */
-void
-write_active_file_to_disk (void)
+const struct case_sink_class null_sink_class =
+ {
+ "null",
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ };
+\f
+/* Returns a pointer to the lagged case from N_BEFORE cases before the
+ current one, or NULL if there haven't been that many cases yet. */
+struct ccase *
+lagged_case (int n_before)
{
- if (case_source_is_class (vfm_source, &memory_source_class))
- {
- struct memory_source_info *info = vfm_source->aux;
-
- /* Switch to a disk sink. */
- vfm_sink = create_case_sink (&disk_sink_class, NULL);
- vfm_sink->class->open (vfm_sink);
- workspace_overflow = 1;
-
- /* Write the cases to disk and destroy them. We can't call
- vfm->sink->write() because of compaction. */
- {
- struct case_list *cur, *next;
-
- for (cur = info->cases; cur; cur = next)
- {
- next = cur->next;
- if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
- vfm_sink->aux) != 1)
- {
- msg (ME, _("An error occurred while attempting to "
- "write to a temporary file created as the "
- "active file: %s."),
- strerror (errno));
- err_failure ();
- }
- free (cur);
- }
- }
-
- vfm_source = vfm_sink->class->make_source (vfm_sink);
- vfm_sink = NULL;
- }
-}
+ assert (n_before >= 1 );
+ assert (n_before <= n_lag);
-/* Destroy all memory sink data. */
-static void
-memory_sink_destroy (struct case_sink *sink)
-{
- struct memory_sink_info *info = sink->aux;
- struct case_list *cur, *next;
-
- for (cur = info->head; cur; cur = next)
+ if (n_before <= lag_count)
{
- next = cur->next;
- free (cur);
+ int index = lag_head - n_before;
+ if (index < 0)
+ index += n_lag;
+ return &lag_queue[index];
}
- free (info);
+ else
+ return NULL;
}
-
-/* Switch the memory stream from sink to source mode. */
-static struct case_source *
-memory_sink_make_source (struct case_sink *sink)
+
+/* Appends TRNS to t_trns[], the list of all transformations to be
+ performed on data as it is read from the active file. */
+void
+add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
{
- struct memory_sink_info *sink_info = sink->aux;
- struct memory_source_info *source_info;
-
- source_info = xmalloc (sizeof *source_info);
- source_info->case_cnt = sink_info->case_cnt;
- source_info->case_size = sink_info->case_size;
- source_info->cases = sink_info->head;
-
- free (sink_info);
-
- return create_case_source (&memory_source_class,
- default_dict, source_info);
+ struct transformation *trns;
+ if (n_trns >= m_trns)
+ t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
+ trns = &t_trns[n_trns++];
+ trns->proc = proc;
+ trns->free = free;
+ trns->private = private;
}
-const struct case_sink_class memory_sink_class =
- {
- "memory",
- memory_sink_create,
- memory_sink_write,
- memory_sink_destroy,
- memory_sink_make_source,
- };
-
-/* Returns the number of cases in the source. */
-static int
-memory_source_count (const struct case_source *source)
+/* Returns the index number that the next transformation added by
+ add_transformation() will receive. A trns_proc_func that
+ returns this index causes control flow to jump to it. */
+size_t
+next_transformation (void)
{
- struct memory_source_info *info = source->aux;
-
- return info->case_cnt;
+ return n_trns;
}
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_source_read (struct case_source *source,
- struct ccase *c,
- write_case_func *write_case, write_case_data wc_data)
+/* Cancels all active transformations, including any transformations
+ created by the input program. */
+void
+cancel_transformations (void)
{
- struct memory_source_info *info = source->aux;
-
- while (info->cases != NULL)
+ size_t i;
+ for (i = 0; i < n_trns; i++)
{
- struct case_list *iter = info->cases;
- memcpy (c, &iter->c, info->case_size);
- if (!write_case (wc_data))
- break;
-
- info->cases = iter->next;
- free (iter);
+ struct transformation *t = &t_trns[i];
+ if (t->free != NULL)
+ t->free (t->private);
}
+ n_trns = f_trns = 0;
+ free (t_trns);
+ t_trns = NULL;
+ m_trns = 0;
+}
+\f
+/* Creates a case source with class CLASS and auxiliary data AUX
+ and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+ void *aux)
+{
+ struct case_source *source = xmalloc (sizeof *source);
+ source->class = class;
+ source->aux = aux;
+ return source;
}
-/* Destroy all memory source data. */
-static void
-memory_source_destroy (struct case_source *source)
+/* Destroys case source SOURCE. It is the caller's responsible to
+ call the source's destroy function, if any. */
+void
+free_case_source (struct case_source *source)
{
- struct memory_source_info *info = source->aux;
- struct case_list *cur, *next;
-
- for (cur = info->cases; cur; cur = next)
+ if (source != NULL)
{
- next = cur->next;
- free (cur);
+ if (source->class->destroy != NULL)
+ source->class->destroy (source);
+ free (source);
}
- free (info);
}
-/* Returns the list of cases in memory source SOURCE. */
-struct case_list *
-memory_source_get_cases (const struct case_source *source)
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source)
{
- struct memory_source_info *info = source->aux;
-
- return info->cases;
+ return source != NULL && (source->class == &input_program_source_class
+ || source->class == &file_type_source_class);
}
-/* Sets the list of cases in memory source SOURCE to CASES. */
-void
-memory_source_set_cases (const struct case_source *source,
- struct case_list *cases)
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+ const struct case_source_class *class)
{
- struct memory_source_info *info = source->aux;
-
- info->cases = cases;
+ return source != NULL && source->class == class;
}
-/* Memory stream. */
-const struct case_source_class memory_source_class =
- {
- "memory",
- memory_source_count,
- memory_source_read,
- memory_source_destroy,
- };
-\f
-/* Add C to the lag queue. */
-static void
-lag_case (const struct ccase *c)
+/* Creates a case sink to accept cases from the given DICT with
+ class CLASS and auxiliary data AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class,
+ const struct dictionary *dict,
+ void *aux)
{
- if (lag_count < n_lag)
- lag_count++;
- memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
- if (++lag_head >= n_lag)
- lag_head = 0;
+ struct case_sink *sink = xmalloc (sizeof *sink);
+ sink->class = class;
+ sink->value_cnt = dict_get_compacted_value_cnt (dict);
+ sink->aux = aux;
+ return sink;
}
-/* Returns a pointer to the lagged case from N_BEFORE cases before the
- current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
-lagged_case (int n_before)
+/* Destroys case sink SINK. */
+void
+free_case_sink (struct case_sink *sink)
{
- assert (n_before <= n_lag);
- if (n_before > lag_count)
- return NULL;
-
- {
- int index = lag_head - n_before;
- if (index < 0)
- index += n_lag;
- return lag_queue[index];
- }
+ if (sink != NULL)
+ {
+ if (sink->class->destroy != NULL)
+ sink->class->destroy (sink);
+ free (sink);
+ }
}
-
-/* Transforms trns_case and writes it to the replacement active
- file if advisable. Returns nonzero if more cases can be
- accepted, zero otherwise. Do not call this function again
- after it has returned zero once. */
-int
-procedure_write_case (write_case_data wc_data)
-{
- struct procedure_aux_data *proc_aux = wc_data->aux;
+\f
+/* Represents auxiliary data for handling SPLIT FILE. */
+struct split_aux_data
+ {
+ size_t case_count; /* Number of cases so far. */
+ struct ccase prev_case; /* Data in previous case. */
- /* Index of current transformation. */
- int cur_trns;
+ /* Functions to call... */
+ void (*begin_func) (void *); /* ...before data. */
+ int (*proc_func) (struct ccase *, void *); /* ...with data. */
+ void (*end_func) (void *); /* ...after data. */
+ void *func_aux; /* Auxiliary data. */
+ };
- /* Return value: whether it's reasonable to write any more cases. */
- int more_cases = 1;
+static int equal_splits (const struct ccase *, const struct ccase *);
+static int procedure_with_splits_callback (struct ccase *, void *);
+static void dump_splits (struct ccase *);
- cur_trns = f_trns;
- for (;;)
- {
- /* Output the case if this is temp_trns. */
- if (cur_trns == temp_trns)
- {
- int case_limit;
-
- if (n_lag)
- lag_case (proc_aux->trns_case);
-
- vfm_sink->class->write (vfm_sink, proc_aux->trns_case);
-
- proc_aux->cases_written++;
- case_limit = dict_get_case_limit (default_dict);
- if (case_limit != 0 && proc_aux->cases_written >= case_limit)
- more_cases = 0;
- }
-
- /* Are we done? */
- if (cur_trns >= n_trns)
- break;
-
- /* Decide which transformation should come next. */
- {
- int code;
-
- code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
- proc_aux->cases_written + 1);
- switch (code)
- {
- case -1:
- /* Next transformation. */
- cur_trns++;
- break;
- case -2:
- /* Delete this case. */
- goto done;
- default:
- /* Go to that transformation. */
- cur_trns = code;
- break;
- }
- }
- }
+/* Like procedure(), but it automatically breaks the case stream
+ into SPLIT FILE break groups. Before each group of cases with
+ identical SPLIT FILE variable values, BEGIN_FUNC is called.
+ Then PROC_FUNC is called with each case in the group.
+ END_FUNC is called when the group is finished. FUNC_AUX is
+ passed to each of the functions as auxiliary data.
- /* Call the beginning of group function. */
- if (!case_count && wc_data->begin_func != NULL)
- wc_data->begin_func (wc_data->func_aux);
+ If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
+ and END_FUNC will be called at all.
- /* Call the procedure if there is one and FILTER and PROCESS IF
- don't prohibit it. */
- if (wc_data->proc_func != NULL
- && !exclude_this_case (proc_aux->trns_case, proc_aux->cases_written + 1))
- wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
+ If SPLIT FILE is not in effect, then there is one break group
+ (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
+ will be called once. */
+void
+procedure_with_splits (void (*begin_func) (void *aux),
+ int (*proc_func) (struct ccase *, void *aux),
+ void (*end_func) (void *aux),
+ void *func_aux)
+{
+ struct split_aux_data split_aux;
- case_count++;
-
-done:
- clear_case (proc_aux->trns_case);
-
- /* Return previously determined value. */
- return more_cases;
-}
+ split_aux.case_count = 0;
+ case_nullify (&split_aux.prev_case);
+ split_aux.begin_func = begin_func;
+ split_aux.proc_func = proc_func;
+ split_aux.end_func = end_func;
+ split_aux.func_aux = func_aux;
-/* Clears the variables in C that need to be cleared between
- processing cases. */
-static void
-clear_case (struct ccase *c)
-{
- /* FIXME? This is linear in the number of variables, but
- doesn't need to be, so it's an easy optimization target. */
- size_t var_cnt = dict_get_var_cnt (default_dict);
- size_t i;
-
- for (i = 0; i < var_cnt; i++)
- {
- struct variable *v = dict_get_var (default_dict, i);
- if (v->init && v->reinit)
- {
- if (v->type == NUMERIC)
- c->data[v->fv].f = SYSMIS;
- else
- memset (c->data[v->fv].s, ' ', v->width);
- }
- }
+ open_active_file ();
+ internal_procedure (procedure_with_splits_callback, &split_aux);
+ if (split_aux.case_count > 0 && end_func != NULL)
+ end_func (func_aux);
+ close_active_file ();
+
+ case_destroy (&split_aux.prev_case);
}
-/* Returns nonzero if case C with case number CASE_NUM should be
- exclude as specified on FILTER or PROCESS IF, otherwise
- zero. */
+/* procedure() callback used by procedure_with_splits(). */
static int
-exclude_this_case (const struct ccase *c, int case_num)
+procedure_with_splits_callback (struct ccase *c, void *split_aux_)
{
- /* FILTER. */
- struct variable *filter_var = dict_get_filter (default_dict);
- if (filter_var != NULL)
+ struct split_aux_data *split_aux = split_aux_;
+
+ /* Start a new series if needed. */
+ if (split_aux->case_count == 0
+ || !equal_splits (c, &split_aux->prev_case))
{
- double f = c->data[filter_var->fv].f;
- if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
- return 1;
+ if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+ split_aux->end_func (split_aux->func_aux);
+
+ dump_splits (c);
+ case_destroy (&split_aux->prev_case);
+ case_clone (&split_aux->prev_case, c);
+
+ if (split_aux->begin_func != NULL)
+ split_aux->begin_func (split_aux->func_aux);
}
- /* PROCESS IF. */
- if (process_if_expr != NULL
- && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
+ split_aux->case_count++;
+ if (split_aux->proc_func != NULL)
+ return split_aux->proc_func (c, split_aux->func_aux);
+ else
return 1;
-
- return 0;
}
-/* Appends TRNS to t_trns[], the list of all transformations to be
- performed on data as it is read from the active file. */
-void
-add_transformation (struct trns_header * trns)
+/* Compares the SPLIT FILE variables in cases A and B and returns
+ nonzero only if they differ. */
+static int
+equal_splits (const struct ccase *a, const struct ccase *b)
{
- if (n_trns >= m_trns)
- {
- m_trns += 16;
- t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
- }
- t_trns[n_trns] = trns;
- trns->index = n_trns++;
-}
-
-/* Cancels all active transformations, including any transformations
- created by the input program. */
-void
-cancel_transformations (void)
-{
- int i;
- for (i = 0; i < n_trns; i++)
- {
- if (t_trns[i]->free)
- t_trns[i]->free (t_trns[i]);
- free (t_trns[i]);
- }
- n_trns = f_trns = 0;
- if (m_trns > 32)
- {
- free (t_trns);
- m_trns = 0;
- }
+ return case_compare (a, b,
+ dict_get_split_vars (default_dict),
+ dict_get_split_cnt (default_dict)) == 0;
}
/* Dumps out the values of all the split variables for the case C. */
int i;
split_cnt = dict_get_split_cnt (default_dict);
+ if (split_cnt == 0)
+ return;
+
t = tab_create (3, split_cnt + 1, 0);
tab_dim (t, tab_natural_dimensions);
tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
assert (v->type == NUMERIC || v->type == ALPHA);
tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
- data_out (temp_buf, &v->print, &c->data[v->fv]);
+ data_out (temp_buf, &v->print, case_data (c, v->fv));
temp_buf[v->print.w] = 0;
tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
- val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+ val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
if (val_lab)
tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
}
tab_flags (t, SOMF_NO_TITLE);
tab_submit (t);
}
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+ multipass procedure. */
+struct multipass_split_aux_data
+ {
+ struct ccase prev_case; /* Data in previous case. */
+ struct casefile *casefile; /* Accumulates data for a split. */
-/* This proc_func is substituted for the user-supplied proc_func when
- SPLIT FILE is active. This function forms a wrapper around that
- proc_func by dividing the input into series. */
-static int
-SPLIT_FILE_proc_func (struct ccase *c, void *data_)
+ /* Function to call with the accumulated data. */
+ void (*split_func) (const struct casefile *, void *);
+ void *func_aux; /* Auxiliary data. */
+ };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+ void *),
+ void *func_aux)
{
- struct write_case_data *data = data_;
- struct split_aux_data *split_aux = data->aux;
- struct variable *const *split;
- size_t split_cnt;
- size_t i;
+ struct multipass_split_aux_data aux;
- /* The first case always begins a new series. We also need to
- preserve the values of the case for later comparison. */
- if (case_count == 0)
- {
- memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+ assert (split_func != NULL);
- dump_splits (c);
- if (data->begin_func != NULL)
- data->begin_func (data->func_aux);
-
- return data->proc_func (c, data->func_aux);
- }
+ open_active_file ();
- /* Compare the value of each SPLIT FILE variable to the values on
- the previous case. */
- split = dict_get_split_vars (default_dict);
- split_cnt = dict_get_split_cnt (default_dict);
- for (i = 0; i < split_cnt; i++)
- {
- struct variable *v = split[i];
-
- switch (v->type)
- {
- case NUMERIC:
- if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
- goto not_equal;
- break;
- case ALPHA:
- if (memcmp (c->data[v->fv].s,
- split_aux->prev_case->data[v->fv].s, v->width))
- goto not_equal;
- break;
- default:
- assert (0);
- }
- }
- return data->proc_func (c, data->func_aux);
-
-not_equal:
- /* The values of the SPLIT FILE variable are different from the
- values on the previous case. That means that it's time to begin
- a new series. */
- if (data->end_func != NULL)
- data->end_func (data->func_aux);
- dump_splits (c);
- if (data->begin_func != NULL)
- data->begin_func (data->func_aux);
- memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
- return data->proc_func (c, data->func_aux);
+ case_nullify (&aux.prev_case);
+ aux.casefile = NULL;
+ aux.split_func = split_func;
+ aux.func_aux = func_aux;
+
+ internal_procedure (multipass_split_callback, &aux);
+ if (aux.casefile != NULL)
+ multipass_split_output (&aux);
+ case_destroy (&aux.prev_case);
+
+ close_active_file ();
}
-\f
-/* Case compaction. */
-/* Copies case SRC to case DEST, compacting it in the process. */
-void
-compact_case (struct ccase *dest, const struct ccase *src)
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
{
- int i;
- int nval = 0;
- size_t var_cnt;
-
- assert (compaction_necessary);
+ struct multipass_split_aux_data *aux = aux_;
- if (temporary == 2)
+ /* Start a new series if needed. */
+ if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
{
- if (dest != compaction_case)
- memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
- return;
- }
+ /* Pass any cases to split_func. */
+ if (aux->casefile != NULL)
+ multipass_split_output (aux);
- /* Copy all the variables except the scratch variables from SRC to
- DEST. */
- var_cnt = dict_get_var_cnt (default_dict);
- for (i = 0; i < var_cnt; i++)
- {
- struct variable *v = dict_get_var (default_dict, i);
-
- if (dict_class_from_id (v->name) == DC_SCRATCH)
- continue;
+ /* Start a new casefile. */
+ aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
- if (v->type == NUMERIC)
- dest->data[nval++] = src->data[v->fv];
- else
- {
- int w = DIV_RND_UP (v->width, sizeof (union value));
-
- memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
- nval += w;
- }
+ /* Record split values. */
+ dump_splits (c);
+ case_destroy (&aux->prev_case);
+ case_clone (&aux->prev_case, c);
}
+
+ casefile_append (aux->casefile, c);
+
+ return 1;
}
-/* Reassigns `fv' for each variable. Deletes scratch variables. */
static void
-finish_compaction (void)
+multipass_split_output (struct multipass_split_aux_data *aux)
{
- int i;
+ assert (aux->casefile != NULL);
+ aux->split_func (aux->casefile, aux->func_aux);
+ casefile_destroy (aux->casefile);
+ aux->casefile = NULL;
+}
- for (i = 0; i < dict_get_var_cnt (default_dict); )
- {
- struct variable *v = dict_get_var (default_dict, i);
- if (dict_class_from_id (v->name) == DC_SCRATCH)
- dict_delete_var (default_dict, v);
- else
- i++;
+/* Discards all the current state in preparation for a data-input
+ command like DATA LIST or GET. */
+void
+discard_variables (void)
+{
+ dict_clear (default_dict);
+ default_handle = NULL;
+
+ n_lag = 0;
+
+ if (vfm_source != NULL)
+ {
+ free_case_source (vfm_source);
+ vfm_source = NULL;
}
- dict_compact_values (default_dict);
-}
-/* Creates a case source with class CLASS and auxiliary data AUX
- and based on dictionary DICT. */
-struct case_source *
-create_case_source (const struct case_source_class *class,
- const struct dictionary *dict,
- void *aux)
-{
- struct case_source *source = xmalloc (sizeof *source);
- source->class = class;
- source->value_cnt = dict_get_next_value_idx (dict);
- source->aux = aux;
- return source;
-}
+ cancel_transformations ();
-/* Returns nonzero if a case source is "complex". */
-int
-case_source_is_complex (const struct case_source *source)
-{
- return source != NULL && (source->class == &input_program_source_class
- || source->class == &file_type_source_class);
-}
+ ctl_stack_clear ();
-/* Returns nonzero if CLASS is the class of SOURCE. */
-int
-case_source_is_class (const struct case_source *source,
- const struct case_source_class *class)
-{
- return source != NULL && source->class == class;
-}
+ expr_free (process_if_expr);
+ process_if_expr = NULL;
-/* Creates a case sink with class CLASS and auxiliary data
- AUX. */
-struct case_sink *
-create_case_sink (const struct case_sink_class *class, void *aux)
-{
- struct case_sink *sink = xmalloc (sizeof *sink);
- sink->class = class;
- sink->aux = aux;
- return sink;
+ cancel_temporary ();
+
+ pgm_state = STATE_INIT;
}