X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fvfm.c;h=5afa838ddbd913303db79580c1aeadf7c11a20cd;hb=4fdeb2145d081ff1b84e3f6c99f9d1c048c0d64a;hp=331f287f8c4d1de47283e8da46d909ee62d83c4f;hpb=cb4033020c8a24d573814e6ac9192046bffdccac;p=pspp-builds.git diff --git a/src/vfm.c b/src/vfm.c index 331f287f..5afa838d 100644 --- a/src/vfm.c +++ b/src/vfm.c @@ -14,28 +14,13 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - 02111-1307, USA. */ + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. */ -/* AIX requires this to be the first thing in the file. */ #include -#if __GNUC__ -#define alloca __builtin_alloca -#else -#if HAVE_ALLOCA_H -#include -#else -#ifdef _AIX -#pragma alloca -#else -#ifndef alloca /* predefined by HP cc +Olibcalls */ -char *alloca (); -#endif -#endif -#endif -#endif - -#include +#include "vfm.h" +#include "vfmP.h" +#include "error.h" #include #include #include @@ -43,1030 +28,574 @@ char *alloca (); #include /* Required by SunOS4. */ #endif #include "alloc.h" -#include "approx.h" +#include "case.h" +#include "casefile.h" +#include "dictionary.h" #include "do-ifP.h" #include "error.h" -#include "expr.h" +#include "expressions/public.h" #include "misc.h" -#include "random.h" +#include "settings.h" #include "som.h" #include "str.h" #include "tab.h" #include "var.h" -#include "vector.h" -#include "vfm.h" -#include "vfmP.h" +#include "value-labels.h" /* Virtual File Manager (vfm): - vfm is used to process data files. It uses the model that data is - read from one stream (the data source), then written to another - (the data sink). The data source is then deleted and the data sink - becomes the data source for the next procedure. */ - -#undef DEBUGGING -/*#define DEBUGGING 1 */ -#include "debug-print.h" - -/* This is used to read from the active file. */ -struct case_stream *vfm_source; + vfm is used to process data files. It uses the model that + data is read from one stream (the data source), processed, + then written to another (the data sink). The data source is + then deleted and the data sink becomes the data source for the + next procedure. */ -/* `value' indexes to initialize to particular values for certain cases. */ -struct long_vec reinit_sysmis; /* SYSMIS for every case. */ -struct long_vec reinit_blanks; /* Blanks for every case. */ -struct long_vec init_zero; /* Zero for first case only. */ -struct long_vec init_blanks; /* Blanks for first case only. */ - -/* This is used to write to the replacement active file. */ -struct case_stream *vfm_sink; - -/* Information about the data source. */ -struct stream_info vfm_source_info; - -/* Information about the data sink. */ -struct stream_info vfm_sink_info; +/* Procedure execution data. */ +struct write_case_data + { + /* Function to call for each case. */ + int (*proc_func) (struct ccase *, void *); /* Function. */ + void *aux; /* Auxiliary data. */ + + struct ccase trns_case; /* Case used for transformations. */ + struct ccase sink_case; /* Case written to sink, if + compaction is necessary. */ + size_t cases_written; /* Cases output so far. */ + size_t cases_analyzed; /* Cases passed to procedure so far. */ + }; -/* Filter variable and `value' index. */ -static struct variable *filter_var; -static int filter_index; +/* The current active file, from which cases are read. */ +struct case_source *vfm_source; -#define FILTERED \ - (filter_index != -1 \ - && (temp_case->data[filter_index].f == 0.0 \ - || temp_case->data[filter_index].f == SYSMIS \ - || is_num_user_missing (temp_case->data[filter_index].f, \ - filter_var))) +/* The replacement active file, to which cases are written. */ +struct case_sink *vfm_sink; /* Nonzero if the case needs to have values deleted before being stored, zero otherwise. */ -int compaction_necessary; - -/* Number of values after compaction, or the same as - vfm_sink_info.nval, if compaction is not necessary. */ -int compaction_nval; - -/* Temporary case buffer with enough room for `compaction_nval' - `value's. */ -struct ccase *compaction_case; - -/* Within a session, when paging is turned on, it is never turned back - off. This policy might be too aggressive. */ -static int paging = 0; +static int compaction_necessary; /* Time at which vfm was last invoked. */ time_t last_vfm_invocation; -/* Functions called during procedure processing. */ -static int (*proc_func) (struct ccase *); /* Called for each case. */ -static int (*virt_proc_func) (struct ccase *); /* From SPLIT_FILE_procfunc. */ -static void (*begin_func) (void); /* Called at beginning of a series. */ -static void (*virt_begin_func) (void); /* Called by SPLIT_FILE_procfunc. */ -static void (*end_func) (void); /* Called after end of a series. */ -int (*write_case) (void); - -/* Number of cases passed to proc_func(). */ -static int case_count; - /* Lag queue. */ int n_lag; /* Number of cases to lag. */ static int lag_count; /* Number of cases in lag_queue so far. */ static int lag_head; /* Index where next case will be added. */ -static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */ +static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ +static void internal_procedure (int (*proc_func) (struct ccase *, void *), + void *aux); +static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (void); +static int write_case (struct write_case_data *wc_data); +static int execute_transformations (struct ccase *c, + struct trns_header **trns, + int first_idx, int last_idx, + int case_num); +static int filter_case (const struct ccase *c, int case_num); +static void lag_case (const struct ccase *c); +static void clear_case (struct ccase *c); static void close_active_file (void); -static int SPLIT_FILE_procfunc (struct ccase *); -static void finish_compaction (void); -static void lag_case (void); -static int procedure_write_case (void); /* Public functions. */ -/* Reads all the cases from the active file, transforms them by the - active set of transformations, calls PROCFUNC with CURCASE set to - the case and CASENUM set to the case number, and writes them to a - new active file. +/* Reads the data from the input program and writes it to a new + active file. For each case we read from the input program, we + do the following + + 1. Execute permanent transformations. If these drop the case, + start the next case from step 1. - Divides the active file into zero or more series of one or more - cases each. BEGINFUNC is called before each series. ENDFUNC is - called after each series. */ + 2. N OF CASES. If we have already written N cases, start the + next case from step 1. + + 3. Write case to replacement active file. + + 4. Execute temporary transformations. If these drop the case, + start the next case from step 1. + + 5. FILTER, PROCESS IF. If these drop the case, start the next + case from step 1. + + 6. Post-TEMPORARY N OF CASES. If we have already analyzed N + cases, start the next case from step 1. + + 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */ void -procedure (void (*beginfunc) (void), - int (*procfunc) (struct ccase *curcase), - void (*endfunc) (void)) +procedure (int (*proc_func) (struct ccase *, void *), void *aux) { - end_func = endfunc; - write_case = procedure_write_case; - - if (default_dict.n_splits && procfunc != NULL) + if (proc_func == NULL + && case_source_is_class (vfm_source, &storage_source_class) + && vfm_sink == NULL + && !temporary + && n_trns == 0) { - virt_proc_func = procfunc; - proc_func = SPLIT_FILE_procfunc; - - virt_begin_func = beginfunc; - begin_func = NULL; - } else { - begin_func = beginfunc; - proc_func = procfunc; + /* Nothing to do. */ + return; } - last_vfm_invocation = time (NULL); - open_active_file (); - vfm_source->read (); + internal_procedure (proc_func, aux); close_active_file (); } - -/* Active file processing support. Subtly different semantics from - procedure(). */ -static int process_active_file_write_case (void); +/* Executes a procedure, as procedure(), except that the caller + is responsible for calling open_active_file() and + close_active_file(). */ +static void +internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) +{ + static int recursive_call; -/* The casefunc might want us to stop calling it. */ -static int not_canceled; + struct write_case_data wc_data; -/* Reads all the cases from the active file and passes them one-by-one - to CASEFUNC in temp_case. Before any cases are passed, calls - BEGINFUNC. After all the cases have been passed, calls ENDFUNC. - BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output - file by calling process_active_file_output_case(). + assert (++recursive_call == 1); - process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */ -void -process_active_file (void (*beginfunc) (void), - int (*casefunc) (struct ccase *curcase), - void (*endfunc) (void)) -{ - proc_func = casefunc; - write_case = process_active_file_write_case; - not_canceled = 1; + wc_data.proc_func = proc_func; + wc_data.aux = aux; + create_trns_case (&wc_data.trns_case, default_dict); + case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict)); + wc_data.cases_written = 0; - open_active_file (); - beginfunc (); - - /* There doesn't necessarily need to be an active file. */ - if (vfm_source) - vfm_source->read (); - - endfunc (); - close_active_file (); -} + last_vfm_invocation = time (NULL); -/* Pass the current case to casefunc. */ -static int -process_active_file_write_case (void) -{ - /* Index of current transformation. */ - int cur_trns; + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, + &wc_data.trns_case, + write_case, &wc_data); - for (cur_trns = f_trns ; cur_trns != temp_trns; ) - { - int code; - - code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case); - switch (code) - { - case -1: - /* Next transformation. */ - cur_trns++; - break; - case -2: - /* Delete this case. */ - goto done; - default: - /* Go to that transformation. */ - cur_trns = code; - break; - } - } + case_destroy (&wc_data.sink_case); + case_destroy (&wc_data.trns_case); - if (n_lag) - lag_case (); - - /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */ - if (not_canceled - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) - not_canceled = proc_func (temp_case); - - case_count++; - - done: - { - long *lp; - - /* This case is finished. Initialize the variables for the next case. */ - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - } - - return 1; + assert (--recursive_call == 0); } -/* Write temp_case to the active file. */ -void -process_active_file_output_case (void) -{ - vfm_sink_info.ncases++; - vfm_sink->write (); -} - -/* Opening the active file. */ - -/* It might be usefully noted that the following several functions are - given in the order that they are called by open_active_file(). */ - -/* Prepare to write to the replacement active file. */ +/* Creates and returns a case, initializing it from the vectors + that say which `value's need to be initialized just once, and + which ones need to be re-initialized before every case. */ static void -prepare_for_writing (void) +create_trns_case (struct ccase *trns_case, struct dictionary *dict) { - /* FIXME: If ALL the conditions listed below hold true, then the - replacement active file is guaranteed to be identical to the - original active file: + size_t var_cnt = dict_get_var_cnt (dict); + size_t i; - 1. TEMPORARY was the first transformation, OR, there were no - transformations at all. + case_create (trns_case, dict_get_next_value_idx (dict)); + for (i = 0; i < var_cnt; i++) + { + struct variable *v = dict_get_var (dict, i); + union value *value = case_data_rw (trns_case, v->fv); - 2. Input is not coming from an input program. + if (v->type == NUMERIC) + value->f = v->reinit ? 0.0 : SYSMIS; + else + memset (value->s, ' ', v->width); + } +} - 3. Compaction is not necessary. +/* Makes all preparations for reading from the data source and writing + to the data sink. */ +static void +open_active_file (void) +{ + /* Make temp_dict refer to the dictionary right before data + reaches the sink */ + if (!temporary) + { + temp_trns = n_trns; + temp_dict = default_dict; + } - So, in this case, we shouldn't have to replace the active - file--it's just a waste of time and space. */ + /* Figure out compaction. */ + compaction_necessary = (dict_get_next_value_idx (temp_dict) + != dict_get_compacted_value_cnt (temp_dict)); - vfm_sink_info.ncases = 0; - vfm_sink_info.nval = default_dict.nval; - vfm_sink_info.case_size = (sizeof (struct ccase) - + (default_dict.nval - 1) * sizeof (union value)); - + /* Prepare sink. */ if (vfm_sink == NULL) + vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL); + if (vfm_sink->class->open != NULL) + vfm_sink->class->open (vfm_sink); + + /* Allocate memory for lag queue. */ + if (n_lag > 0) { - if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE - && !paging) - { - msg (MW, _("Workspace overflow predicted. Max workspace is " - "currently set to %d KB (%d cases at %d bytes each). " - "Paging active file to disk."), - MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size, - vfm_sink_info.case_size); - - paging = 1; - } - - vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream; + int i; + + lag_count = 0; + lag_head = 0; + lag_queue = xmalloc (n_lag * sizeof *lag_queue); + for (i = 0; i < n_lag; i++) + case_nullify (&lag_queue[i]); } + + /* Close any unclosed DO IF or LOOP constructs. */ + discard_ctl_stack (); } -/* Arrange for compacting the output cases for storage. */ -static void -arrange_compaction (void) +/* Transforms trns_case and writes it to the replacement active + file if advisable. Returns nonzero if more cases can be + accepted, zero otherwise. Do not call this function again + after it has returned zero once. */ +static int +write_case (struct write_case_data *wc_data) { - int count_values = 0; + /* Execute permanent transformations. */ + if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns, + wc_data->cases_written + 1)) + goto done; - { - int i; - - /* Count up the number of `value's that will be output. */ - for (i = 0; i < temp_dict->nvar; i++) - if (temp_dict->var[i]->name[0] != '#') - { - assert (temp_dict->var[i]->nv > 0); - count_values += temp_dict->var[i]->nv; - } - assert (temporary == 2 || count_values <= temp_dict->nval); - } + /* N OF CASES. */ + if (dict_get_case_limit (default_dict) + && wc_data->cases_written >= dict_get_case_limit (default_dict)) + goto done; + wc_data->cases_written++; + + /* Write case to LAG queue. */ + if (n_lag) + lag_case (&wc_data->trns_case); + + /* Write case to replacement active file. */ + if (vfm_sink->class->write != NULL) + { + if (compaction_necessary) + { + dict_compact_case (temp_dict, &wc_data->sink_case, &wc_data->trns_case); + vfm_sink->class->write (vfm_sink, &wc_data->sink_case); + } + else + vfm_sink->class->write (vfm_sink, &wc_data->trns_case); + } - /* Compaction is only necessary if the number of `value's to output - differs from the number already present. */ - compaction_nval = count_values; - compaction_necessary = temporary == 2 || count_values != temp_dict->nval; + /* Execute temporary transformations. */ + if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns, + wc_data->cases_written)) + goto done; - if (vfm_sink->init) - vfm_sink->init (); -} + /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */ + if (filter_case (&wc_data->trns_case, wc_data->cases_written) + || (dict_get_case_limit (temp_dict) + && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict))) + goto done; + wc_data->cases_analyzed++; -/* Prepares the temporary case and compaction case. */ -static void -make_temp_case (void) -{ - temp_case = xmalloc (vfm_sink_info.case_size); + /* Pass case to procedure. */ + if (wc_data->proc_func != NULL) + wc_data->proc_func (&wc_data->trns_case, wc_data->aux); - if (compaction_necessary) - compaction_case = xmalloc (sizeof (struct ccase) - + sizeof (union value) * (compaction_nval - 1)); + done: + clear_case (&wc_data->trns_case); + return 1; } -#if DEBUGGING -/* Returns the name of the variable that owns the index CCASE_INDEX - into ccase. */ -static const char * -index_to_varname (int ccase_index) -{ - int i; - - for (i = 0; i < default_dict.nvar; i++) - { - variable *v = default_dict.var[i]; - - if (ccase_index >= v->fv && ccase_index < v->fv + v->nv) - return default_dict.var[i]->name; +/* Transforms case C using the transformations in TRNS[] with + indexes FIRST_IDX through LAST_IDX, exclusive. Case C will + become case CASE_NUM (1-based) in the output file. Returns + zero if the case was filtered out by one of the + transformations, nonzero otherwise. */ +static int +execute_transformations (struct ccase *c, + struct trns_header **trns, + int first_idx, int last_idx, + int case_num) +{ + int idx; + + for (idx = first_idx; idx != last_idx; ) + { + int retval = trns[idx]->proc (trns[idx], c, case_num); + switch (retval) + { + case -1: + idx++; + break; + + case -2: + return 0; + + default: + idx = retval; + break; + } } - return _(""); -} -#endif -/* Initializes temp_case from the vectors that say which `value's need - to be initialized just once, and which ones need to be - re-initialized before every case. */ -static void -vector_initialization (void) -{ - int i; - long *lp; - - /* Just once. */ - for (i = 0; i < init_zero.n; i++) - temp_case->data[init_zero.vec[i]].f = 0.0; - for (i = 0; i < init_blanks.n; i++) - memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING); - - /* These vectors need to be repeatedly accessed, so we add a - sentinel to (hopefully) improve speed. */ - vec_insert (&reinit_sysmis, -1); - vec_insert (&reinit_blanks, -1); - - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - -#if DEBUGGING - printf ("vfm: init_zero="); - for (i = 0; i < init_zero.n; i++) - printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i])); - printf (" init_blanks="); - for (i = 0; i < init_blanks.n; i++) - printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i])); - printf (" reinit_sysmis="); - for (lp = reinit_sysmis.vec; *lp != -1; lp++) - printf ("%s%s", lp != reinit_sysmis.vec ? "," : "", - index_to_varname (*lp)); - printf (" reinit_blanks="); - for (lp = reinit_blanks.vec; *lp != -1; lp++) - printf ("%s%s", lp != reinit_blanks.vec ? "," : "", - index_to_varname (*lp)); - printf ("\n"); -#endif + return 1; } -/* Sets filter_index to an appropriate value. */ -static void -setup_filter (void) +/* Returns nonzero if case C with case number CASE_NUM should be + exclude as specified on FILTER or PROCESS IF, otherwise + zero. */ +static int +filter_case (const struct ccase *c, int case_idx) { - filter_index = -1; - - if (default_dict.filter_var[0]) + /* FILTER. */ + struct variable *filter_var = dict_get_filter (default_dict); + if (filter_var != NULL) { - struct variable *fv = find_variable (default_dict.filter_var); - - if (fv == NULL || fv->type == ALPHA) - default_dict.filter_var[0] = 0; - else - { - filter_index = fv->index; - filter_var = fv; - } + double f = case_num (c, filter_var->fv); + if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var)) + return 1; } + + /* PROCESS IF. */ + if (process_if_expr != NULL + && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0) + return 1; + + return 0; } -/* Sets all the lag-related variables based on value of n_lag. */ +/* Add C to the lag queue. */ static void -setup_lag (void) +lag_case (const struct ccase *c) { - int i; - - if (n_lag == 0) - return; - - lag_count = 0; - lag_head = 0; - lag_queue = xmalloc (n_lag * sizeof *lag_queue); - for (i = 0; i < n_lag; i++) - lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue); + if (lag_count < n_lag) + lag_count++; + case_destroy (&lag_queue[lag_head]); + case_clone (&lag_queue[lag_head], c); + if (++lag_head >= n_lag) + lag_head = 0; } -/* There is a lot of potential confusion in the vfm and related - routines over the number of `value's at each stage of the process. - Here is each nval count, with explanation, as set up by - open_active_file(): - - vfm_source_info.nval: Number of `value's in the cases returned by - the source stream. This value turns out not to be very useful, but - we maintain it anyway. - - vfm_sink_info.nval: Number of `value's in the cases after all - transformations have been performed. Never less than - vfm_source_info.nval. - - temp_dict->nval: Number of `value's in the cases after the - transformations leading up to TEMPORARY have been performed. If - TEMPORARY was not specified, this is equal to vfm_sink_info.nval. - Never less than vfm_sink_info.nval. - - compaction_nval: Number of `value's in the cases after the - transformations leading up to TEMPORARY have been performed and the - case has been compacted by compact_case(), if compaction is - necessary. This the number of `value's in the cases saved by the - sink stream. (However, note that the cases passed to the sink - stream have not yet been compacted. It is the responsibility of - the data sink to call compact_case().) This may be less than, - greater than, or equal to vfm_source_info.nval. `compaction' - becomes the new value of default_dict.nval after the procedure is - completed. - - default_dict.nval: This is often an alias for temp_dict->nval. As - such it can really have no separate existence until the procedure - is complete. For this reason it should *not* be referenced inside - the execution of a procedure. */ -/* Makes all preparations for reading from the data source and writing - to the data sink. */ +/* Clears the variables in C that need to be cleared between + processing cases. */ static void -open_active_file (void) +clear_case (struct ccase *c) { - /* Sometimes we want to refer to the dictionary that applies to the - data actually written to the sink. This is either temp_dict or - default_dict. However, if TEMPORARY is not on, then temp_dict - does not apply. So, we can set temp_dict to default_dict in this - case. */ - if (!temporary) + size_t var_cnt = dict_get_var_cnt (default_dict); + size_t i; + + for (i = 0; i < var_cnt; i++) { - temp_trns = n_trns; - temp_dict = &default_dict; + struct variable *v = dict_get_var (default_dict, i); + if (v->init && v->reinit) + { + if (v->type == NUMERIC) + case_data_rw (c, v->fv)->f = SYSMIS; + else + memset (case_data_rw (c, v->fv)->s, ' ', v->width); + } } - - /* No cases passed to the procedure yet. */ - case_count = 0; - - /* The rest. */ - prepare_for_writing (); - arrange_compaction (); - make_temp_case (); - vector_initialization (); - setup_randomize (); - discard_ctl_stack (); - setup_filter (); - setup_lag (); - - /* Debug output. */ - debug_printf (("vfm: reading from %s source, writing to %s sink.\n", - vfm_source->name, vfm_sink->name)); - debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, " - "temp_dict->nval=%d, compaction_nval=%d, " - "default_dict.nval=%d\n", - vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval, - compaction_nval, default_dict.nval)); } - + /* Closes the active file. */ static void close_active_file (void) { - /* Close the current case group. */ - if (case_count && end_func != NULL) - end_func (); - - /* Stop lagging (catch up?). */ - if (n_lag) + /* Free memory for lag queue, and turn off lagging. */ + if (n_lag > 0) { int i; for (i = 0; i < n_lag; i++) - free (lag_queue[i]); + case_destroy (&lag_queue[i]); free (lag_queue); n_lag = 0; } - /* Assume the dictionary from right before TEMPORARY, if any. Turn - off TEMPORARY. */ + /* Dictionary from before TEMPORARY becomes permanent.. */ if (temporary) { - restore_dictionary (temp_dict); + dict_destroy (default_dict); + default_dict = temp_dict; temp_dict = NULL; } - /* The default dictionary assumes the compacted data size. */ - default_dict.nval = compaction_nval; - - /* Old data sink --> New data source. */ - if (vfm_source && vfm_source->destroy_source) - vfm_source->destroy_source (); - - vfm_source = vfm_sink; - vfm_source_info.ncases = vfm_sink_info.ncases; - vfm_source_info.nval = compaction_nval; - vfm_source_info.case_size = (sizeof (struct ccase) - + (compaction_nval - 1) * sizeof (union value)); - if (vfm_source->mode) - vfm_source->mode (); - - /* Old data sink is gone now. */ - vfm_sink = NULL; - /* Finish compaction. */ if (compaction_necessary) - finish_compaction (); - cancel_temporary (); - - /* Free temporary cases. */ - free (temp_case); - temp_case = NULL; + dict_compact_values (default_dict); + + /* Free data source. */ + if (vfm_source != NULL) + { + free_case_source (vfm_source); + vfm_source = NULL; + } - free (compaction_case); - compaction_case = NULL; + /* Old data sink becomes new data source. */ + if (vfm_sink->class->make_source != NULL) + vfm_source = vfm_sink->class->make_source (vfm_sink); + free_case_sink (vfm_sink); + vfm_sink = NULL; - /* Cancel PROCESS IF. */ + /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors, + and get rid of all the transformations. */ + cancel_temporary (); expr_free (process_if_expr); process_if_expr = NULL; - - /* Cancel FILTER if temporary. */ - if (filter_index != -1 && !FILTER_before_TEMPORARY) - default_dict.filter_var[0] = 0; - - /* Cancel transformations. */ + if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY) + dict_set_filter (default_dict, NULL); + dict_set_case_limit (default_dict, 0); + dict_clear_vectors (default_dict); cancel_transformations (); - - /* Clear value-initialization vectors. */ - vec_clear (&init_zero); - vec_clear (&init_blanks); - vec_clear (&reinit_sysmis); - vec_clear (&reinit_blanks); - - /* Turn off case limiter. */ - default_dict.N = 0; - - /* Clear VECTOR vectors. */ - { - int i; - - for (i = 0; i < nvec; i++) - free (vec[i].v); - free (vec); - vec = NULL; - nvec = 0; - } - - debug_printf (("vfm: procedure complete\n\n")); } -/* Disk case stream. */ +/* Storage case stream. */ -/* Associated files. */ -FILE *disk_source_file; -FILE *disk_sink_file; +/* Information about storage sink or source. */ +struct storage_stream_info + { + struct casefile *casefile; /* Storage. */ + }; -/* Initializes the disk sink. */ +/* Initializes a storage sink. */ static void -disk_stream_init (void) +storage_sink_open (struct case_sink *sink) { - disk_sink_file = tmpfile (); - if (!disk_sink_file) - { - msg (ME, _("An error occurred attempting to create a temporary " - "file for use as the active file: %s."), - strerror (errno)); - err_failure (); - } + struct storage_stream_info *info; + + sink->aux = info = xmalloc (sizeof *info); + info->casefile = casefile_create (sink->value_cnt); } -/* Reads all cases from the disk source and passes them one by one to - write_case(). */ +/* Destroys storage stream represented by INFO. */ static void -disk_stream_read (void) +destroy_storage_stream_info (struct storage_stream_info *info) { - int i; - - for (i = 0; i < vfm_source_info.ncases; i++) + if (info != NULL) { - if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file)) - { - msg (ME, _("An error occurred while attempting to read from " - "a temporary file created for the active file: %s."), - strerror (errno)); - err_failure (); - return; - } - - if (!write_case ()) - return; + casefile_destroy (info->casefile); + free (info); } } -/* Writes temp_case to the disk sink. */ +/* Writes case C to the storage sink SINK. */ static void -disk_stream_write (void) +storage_sink_write (struct case_sink *sink, const struct ccase *c) { - union value *src_case; - - if (compaction_necessary) - { - compact_case (compaction_case, temp_case); - src_case = (union value *) compaction_case; - } - else src_case = (union value *) temp_case; + struct storage_stream_info *info = sink->aux; - if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, - disk_sink_file) != 1) - { - msg (ME, _("An error occurred while attempting to write to a " - "temporary file used as the active file: %s."), - strerror (errno)); - err_failure (); - } + casefile_append (info->casefile, c); } -/* Switches the stream from a sink to a source. */ +/* Destroys internal data in SINK. */ static void -disk_stream_mode (void) +storage_sink_destroy (struct case_sink *sink) { - /* Rewind the sink. */ - if (fseek (disk_sink_file, 0, SEEK_SET) != 0) - { - msg (ME, _("An error occurred while attempting to rewind a " - "temporary file used as the active file: %s."), - strerror (errno)); - err_failure (); - } - - /* Sink --> source variables. */ - disk_source_file = disk_sink_file; + destroy_storage_stream_info (sink->aux); } -/* Destroys the source's internal data. */ -static void -disk_stream_destroy_source (void) +/* Closes the sink and returns a storage source to read back the + written data. */ +static struct case_source * +storage_sink_make_source (struct case_sink *sink) { - if (disk_source_file) - { - fclose (disk_source_file); - disk_source_file = NULL; - } + struct case_source *source + = create_case_source (&storage_source_class, sink->aux); + sink->aux = NULL; + return source; } -/* Destroys the sink's internal data. */ -static void -disk_stream_destroy_sink (void) -{ - if (disk_sink_file) - { - fclose (disk_sink_file); - disk_sink_file = NULL; - } -} - -/* Disk stream. */ -struct case_stream vfm_disk_stream = +/* Storage sink. */ +const struct case_sink_class storage_sink_class = { - disk_stream_init, - disk_stream_read, - disk_stream_write, - disk_stream_mode, - disk_stream_destroy_source, - disk_stream_destroy_sink, - "disk", + "storage", + storage_sink_open, + storage_sink_write, + storage_sink_destroy, + storage_sink_make_source, }; -/* Memory case stream. */ - -/* List of cases stored in the stream. */ -struct case_list *memory_source_cases; -struct case_list *memory_sink_cases; - -/* Current case. */ -struct case_list *memory_sink_iter; - -/* Maximum number of cases. */ -int memory_sink_max_cases; +/* Storage source. */ -/* Initializes the memory stream variables for writing. */ -static void -memory_stream_init (void) +/* Returns the number of cases that will be read by + storage_source_read(). */ +static int +storage_source_count (const struct case_source *source) { - memory_sink_cases = NULL; - memory_sink_iter = NULL; - - assert (compaction_nval); - memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval); + struct storage_stream_info *info = source->aux; + + return casefile_get_case_cnt (info->casefile); } -/* Reads the case stream from memory and passes it to write_case(). */ +/* Reads all cases from the storage source and passes them one by one to + write_case(). */ static void -memory_stream_read (void) +storage_source_read (struct case_source *source, + struct ccase *output_case, + write_case_func *write_case, write_case_data wc_data) { - while (memory_source_cases != NULL) + struct storage_stream_info *info = source->aux; + struct ccase casefile_case; + struct casereader *reader; + + for (reader = casefile_get_reader (info->casefile); + casereader_read (reader, &casefile_case); + case_destroy (&casefile_case)) { - memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size); - - { - struct case_list *current = memory_source_cases; - memory_source_cases = memory_source_cases->next; - free (current); - } - - if (!write_case ()) - return; + case_copy (output_case, 0, + &casefile_case, 0, + casefile_get_value_cnt (info->casefile)); + write_case (wc_data); } + casereader_destroy (reader); } -/* Writes temp_case to the memory stream. */ +/* Destroys the source's internal data. */ static void -memory_stream_write (void) +storage_source_destroy (struct case_source *source) { - struct case_list *new_case = malloc (sizeof (struct case_list) - + ((compaction_nval - 1) - * sizeof (union value))); - - /* If we've got memory to spare then add it to the linked list. */ - if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL) - { - if (compaction_necessary) - compact_case (&new_case->c, temp_case); - else - memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval); - - /* Append case to linked list. */ - if (memory_sink_cases) - memory_sink_iter = memory_sink_iter->next = new_case; - else - memory_sink_iter = memory_sink_cases = new_case; - } - else - { - /* Out of memory. Write the active file to disk. */ - struct case_list *cur, *next; - - /* Notify the user. */ - if (!new_case) - msg (MW, _("Virtual memory exhausted. Paging active file " - "to disk.")); - else - msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) " - "overflowed. Paging active file to disk."), - MAX_WORKSPACE / 1024, memory_sink_max_cases, - compaction_nval * sizeof (union value)); - - free (new_case); - - /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; - - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; - - /* Write the cases to disk and destroy them. We can't call - vfm->sink->write() because of compaction. */ - for (cur = memory_sink_cases; cur; cur = next) - { - next = cur->next; - if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1, - disk_sink_file) != 1) - { - msg (ME, _("An error occurred while attempting to " - "write to a temporary file created as the " - "active file, while paging to disk: %s."), - strerror (errno)); - err_failure (); - } - free (cur); - } - - /* Write the current case to disk. */ - vfm_sink->write (); - } + destroy_storage_stream_info (source->aux); } -/* If the data is stored in memory, causes it to be written to disk. - To be called only *between* procedure()s, not within them. */ -void -page_to_disk (void) +/* Storage source. */ +const struct case_source_class storage_source_class = + { + "storage", + storage_source_count, + storage_source_read, + storage_source_destroy, + }; + +struct casefile * +storage_source_get_casefile (struct case_source *source) { - if (vfm_source == &vfm_memory_stream) - { - /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; - - /* Write the cases to disk and destroy them. We can't call - vfm->sink->write() because of compaction. */ - { - struct case_list *cur, *next; - - for (cur = memory_source_cases; cur; cur = next) - { - next = cur->next; - if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1, - disk_sink_file) != 1) - { - msg (ME, _("An error occurred while attempting to " - "write to a temporary file created as the " - "active file, while paging to disk: %s."), - strerror (errno)); - err_failure (); - } - free (cur); - } - } - - vfm_source = &vfm_disk_stream; - vfm_source->mode (); + struct storage_stream_info *info = source->aux; - vfm_sink = NULL; - } + assert (source->class == &storage_source_class); + return info->casefile; } -/* Switch the memory stream from sink to source mode. */ -static void -memory_stream_mode (void) +struct case_source * +storage_source_create (struct casefile *cf) { - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + struct storage_stream_info *info; - /* Sink --> source variables. */ - memory_source_cases = memory_sink_cases; - memory_sink_cases = NULL; -} + info = xmalloc (sizeof *info); + info->casefile = cf; -/* Destroy all memory source data. */ -static void -memory_stream_destroy_source (void) -{ - struct case_list *cur, *next; - - for (cur = memory_source_cases; cur; cur = next) - { - next = cur->next; - free (cur); - } - memory_source_cases = NULL; + return create_case_source (&storage_source_class, info); } + +/* Null sink. Used by a few procedures that keep track of output + themselves and would throw away anything that the sink + contained anyway. */ -/* Destroy all memory sink data. */ -static void -memory_stream_destroy_sink (void) -{ - struct case_list *cur, *next; - - for (cur = memory_sink_cases; cur; cur = next) - { - next = cur->next; - free (cur); - } - memory_sink_cases = NULL; -} - -/* Memory stream. */ -struct case_stream vfm_memory_stream = +const struct case_sink_class null_sink_class = { - memory_stream_init, - memory_stream_read, - memory_stream_write, - memory_stream_mode, - memory_stream_destroy_source, - memory_stream_destroy_sink, - "memory", + "null", + NULL, + NULL, + NULL, + NULL, }; -#undef DEBUGGING -#include "debug-print.h" - -/* Add temp_case to the lag queue. */ -static void -lag_case (void) -{ - if (lag_count < n_lag) - lag_count++; - memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval); - if (++lag_head >= n_lag) - lag_head = 0; -} - /* Returns a pointer to the lagged case from N_BEFORE cases before the current one, or NULL if there haven't been that many cases yet. */ struct ccase * lagged_case (int n_before) { - assert (n_before <= n_lag); - if (n_before > lag_count) - return NULL; - - { - int index = lag_head - n_before; - if (index < 0) - index += n_lag; - return lag_queue[index]; - } -} - -/* Transforms temp_case and writes it to the replacement active file - if advisable. Returns nonzero if more cases can be accepted, zero - otherwise. Do not call this function again after it has returned - zero once. */ -int -procedure_write_case (void) -{ - /* Index of current transformation. */ - int cur_trns; - - /* Return value: whether it's reasonable to write any more cases. */ - int more_cases = 1; - - debug_printf ((_("transform: "))); - - cur_trns = f_trns; - for (;;) + assert (n_before >= 1 && n_before <= n_lag); + if (n_before <= lag_count) { - /* Output the case if this is temp_trns. */ - if (cur_trns == temp_trns) - { - debug_printf (("REC")); - - if (n_lag) - lag_case (); - - vfm_sink_info.ncases++; - vfm_sink->write (); - - if (default_dict.N) - more_cases = vfm_sink_info.ncases < default_dict.N; - } - - /* Are we done? */ - if (cur_trns >= n_trns) - break; - - debug_printf (("$%d", cur_trns)); - - /* Decide which transformation should come next. */ - { - int code; - - code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case); - switch (code) - { - case -1: - /* Next transformation. */ - cur_trns++; - break; - case -2: - /* Delete this case. */ - goto done; - default: - /* Go to that transformation. */ - cur_trns = code; - break; - } - } + int index = lag_head - n_before; + if (index < 0) + index += n_lag; + return &lag_queue[index]; } - - /* Call the beginning of group function. */ - if (!case_count && begin_func != NULL) - begin_func (); - - /* Call the procedure if there is one and FILTER and PROCESS IF - don't prohibit it. */ - if (proc_func != NULL - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) - proc_func (temp_case); - - case_count++; - -done: - debug_putc ('\n', stdout); - - { - long *lp; - - /* This case is finished. Initialize the variables for the next case. */ - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - } - - /* Return previously determined value. */ - return more_cases; + else + return NULL; } - + /* Appends TRNS to t_trns[], the list of all transformations to be performed on data as it is read from the active file. */ void @@ -1094,187 +623,284 @@ cancel_transformations (void) free (t_trns[i]); } n_trns = f_trns = 0; - if (m_trns > 32) + free (t_trns); + t_trns=NULL; + m_trns = 0; +} + +/* Creates a case source with class CLASS and auxiliary data AUX + and based on dictionary DICT. */ +struct case_source * +create_case_source (const struct case_source_class *class, + void *aux) +{ + struct case_source *source = xmalloc (sizeof *source); + source->class = class; + source->aux = aux; + return source; +} + +/* Destroys case source SOURCE. It is the caller's responsible to + call the source's destroy function, if any. */ +void +free_case_source (struct case_source *source) +{ + if (source != NULL) + { + if (source->class->destroy != NULL) + source->class->destroy (source); + free (source); + } +} + +/* Returns nonzero if a case source is "complex". */ +int +case_source_is_complex (const struct case_source *source) +{ + return source != NULL && (source->class == &input_program_source_class + || source->class == &file_type_source_class); +} + +/* Returns nonzero if CLASS is the class of SOURCE. */ +int +case_source_is_class (const struct case_source *source, + const struct case_source_class *class) +{ + return source != NULL && source->class == class; +} + +/* Creates a case sink to accept cases from the given DICT with + class CLASS and auxiliary data AUX. */ +struct case_sink * +create_case_sink (const struct case_sink_class *class, + const struct dictionary *dict, + void *aux) +{ + struct case_sink *sink = xmalloc (sizeof *sink); + sink->class = class; + sink->value_cnt = dict_get_compacted_value_cnt (dict); + sink->aux = aux; + return sink; +} + +/* Destroys case sink SINK. */ +void +free_case_sink (struct case_sink *sink) +{ + if (sink != NULL) + { + if (sink->class->destroy != NULL) + sink->class->destroy (sink); + free (sink); + } +} + +/* Represents auxiliary data for handling SPLIT FILE. */ +struct split_aux_data + { + size_t case_count; /* Number of cases so far. */ + struct ccase prev_case; /* Data in previous case. */ + + /* Functions to call... */ + void (*begin_func) (void *); /* ...before data. */ + int (*proc_func) (struct ccase *, void *); /* ...with data. */ + void (*end_func) (void *); /* ...after data. */ + void *func_aux; /* Auxiliary data. */ + }; + +static int equal_splits (const struct ccase *, const struct ccase *); +static int procedure_with_splits_callback (struct ccase *, void *); +static void dump_splits (struct ccase *); + +/* Like procedure(), but it automatically breaks the case stream + into SPLIT FILE break groups. Before each group of cases with + identical SPLIT FILE variable values, BEGIN_FUNC is called. + Then PROC_FUNC is called with each case in the group. + END_FUNC is called when the group is finished. FUNC_AUX is + passed to each of the functions as auxiliary data. + + If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, + and END_FUNC will be called at all. + + If SPLIT FILE is not in effect, then there is one break group + (if the active file is nonempty), and BEGIN_FUNC and END_FUNC + will be called once. */ +void +procedure_with_splits (void (*begin_func) (void *aux), + int (*proc_func) (struct ccase *, void *aux), + void (*end_func) (void *aux), + void *func_aux) +{ + struct split_aux_data split_aux; + + split_aux.case_count = 0; + case_nullify (&split_aux.prev_case); + split_aux.begin_func = begin_func; + split_aux.proc_func = proc_func; + split_aux.end_func = end_func; + split_aux.func_aux = func_aux; + + open_active_file (); + internal_procedure (procedure_with_splits_callback, &split_aux); + if (split_aux.case_count > 0 && end_func != NULL) + end_func (func_aux); + close_active_file (); + + case_destroy (&split_aux.prev_case); +} + +/* procedure() callback used by procedure_with_splits(). */ +static int +procedure_with_splits_callback (struct ccase *c, void *split_aux_) +{ + struct split_aux_data *split_aux = split_aux_; + + /* Start a new series if needed. */ + if (split_aux->case_count == 0 + || !equal_splits (c, &split_aux->prev_case)) { - free (t_trns); - m_trns = 0; + if (split_aux->case_count > 0 && split_aux->end_func != NULL) + split_aux->end_func (split_aux->func_aux); + + dump_splits (c); + case_destroy (&split_aux->prev_case); + case_clone (&split_aux->prev_case, c); + + if (split_aux->begin_func != NULL) + split_aux->begin_func (split_aux->func_aux); } + + split_aux->case_count++; + if (split_aux->proc_func != NULL) + return split_aux->proc_func (c, split_aux->func_aux); + else + return 1; +} + +/* Compares the SPLIT FILE variables in cases A and B and returns + nonzero only if they differ. */ +static int +equal_splits (const struct ccase *a, const struct ccase *b) +{ + return case_compare (a, b, + dict_get_split_vars (default_dict), + dict_get_split_cnt (default_dict)) == 0; } /* Dumps out the values of all the split variables for the case C. */ static void dump_splits (struct ccase *c) { - struct variable **iter; + struct variable *const *split; struct tab_table *t; + size_t split_cnt; int i; - t = tab_create (3, default_dict.n_splits + 1, 0); + split_cnt = dict_get_split_cnt (default_dict); + if (split_cnt == 0) + return; + + t = tab_create (3, split_cnt + 1, 0); tab_dim (t, tab_natural_dimensions); - tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits); - tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits); + tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt); + tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt); tab_text (t, 0, 0, TAB_NONE, _("Variable")); tab_text (t, 1, 0, TAB_LEFT, _("Value")); tab_text (t, 2, 0, TAB_LEFT, _("Label")); - for (iter = default_dict.splits, i = 0; *iter; iter++, i++) + split = dict_get_split_vars (default_dict); + for (i = 0; i < split_cnt; i++) { - struct variable *v = *iter; + struct variable *v = split[i]; char temp_buf[80]; - char *val_lab; + const char *val_lab; assert (v->type == NUMERIC || v->type == ALPHA); tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name); - { - union value val = c->data[v->fv]; - if (v->type == ALPHA) - val.c = c->data[v->fv].s; - data_out (temp_buf, &v->print, &val); - } + data_out (temp_buf, &v->print, case_data (c, v->fv)); temp_buf[v->print.w] = 0; tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf); - val_lab = get_val_lab (v, c->data[v->fv], 0); + val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv)); if (val_lab) tab_text (t, 2, i + 1, TAB_LEFT, val_lab); } tab_flags (t, SOMF_NO_TITLE); tab_submit (t); } + +/* Represents auxiliary data for handling SPLIT FILE in a + multipass procedure. */ +struct multipass_split_aux_data + { + struct ccase prev_case; /* Data in previous case. */ + struct casefile *casefile; /* Accumulates data for a split. */ -/* This procfunc is substituted for the user-supplied procfunc when - SPLIT FILE is active. This function forms a wrapper around that - procfunc by dividing the input into series. */ -static int -SPLIT_FILE_procfunc (struct ccase *c) -{ - static struct ccase *prev_case; - struct variable **iter; - - /* The first case always begins a new series. We also need to - preserve the values of the case for later comparison. */ - if (case_count == 0) - { - if (prev_case) - free (prev_case); - prev_case = xmalloc (vfm_sink_info.case_size); - memcpy (prev_case, c, vfm_sink_info.case_size); + /* Function to call with the accumulated data. */ + void (*split_func) (const struct casefile *, void *); + void *func_aux; /* Auxiliary data. */ + }; - dump_splits (c); - if (virt_begin_func != NULL) - virt_begin_func (); - - return virt_proc_func (c); - } +static int multipass_split_callback (struct ccase *c, void *aux_); +static void multipass_split_output (struct multipass_split_aux_data *); - /* Compare the value of each SPLIT FILE variable to the values on - the previous case. */ - for (iter = default_dict.splits; *iter; iter++) - { - struct variable *v = *iter; - - switch (v->type) - { - case NUMERIC: - if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f)) - goto not_equal; - break; - case ALPHA: - if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width)) - goto not_equal; - break; - default: - assert (0); - } - } - return virt_proc_func (c); - -not_equal: - /* The values of the SPLIT FILE variable are different from the - values on the previous case. That means that it's time to begin - a new series. */ - if (end_func != NULL) - end_func (); - dump_splits (c); - if (virt_begin_func != NULL) - virt_begin_func (); - memcpy (prev_case, c, vfm_sink_info.case_size); - return virt_proc_func (c); -} - -/* Case compaction. */ - -/* Copies case SRC to case DEST, compacting it in the process. */ void -compact_case (struct ccase *dest, const struct ccase *src) +multipass_procedure_with_splits (void (*split_func) (const struct casefile *, + void *), + void *func_aux) { - int i; - int nval = 0; - - assert (compaction_necessary); + struct multipass_split_aux_data aux; - if (temporary == 2) - { - if (dest != compaction_case) - memcpy (dest, compaction_case, sizeof (union value) * compaction_nval); - return; - } + assert (split_func != NULL); - /* Copy all the variables except the scratch variables from SRC to - DEST. */ - for (i = 0; i < default_dict.nvar; i++) - { - struct variable *v = default_dict.var[i]; - - if (v->name[0] == '#') - continue; + open_active_file (); - if (v->type == NUMERIC) - dest->data[nval++] = src->data[v->fv]; - else - { - int w = DIV_RND_UP (v->width, sizeof (union value)); - - memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value)); - nval += w; - } - } + case_nullify (&aux.prev_case); + aux.casefile = NULL; + aux.split_func = split_func; + aux.func_aux = func_aux; + + internal_procedure (multipass_split_callback, &aux); + if (aux.casefile != NULL) + multipass_split_output (&aux); + case_destroy (&aux.prev_case); + + close_active_file (); } -/* Reassigns `fv' for each variable. Deletes scratch variables. */ -static void -finish_compaction (void) +/* procedure() callback used by multipass_procedure_with_splits(). */ +static int +multipass_split_callback (struct ccase *c, void *aux_) { - int copy_index = 0; - int nval = 0; - int i; + struct multipass_split_aux_data *aux = aux_; - for (i = 0; i < default_dict.nvar; i++) + /* Start a new series if needed. */ + if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case)) { - struct variable *v = default_dict.var[i]; + /* Pass any cases to split_func. */ + if (aux->casefile != NULL) + multipass_split_output (aux); - if (v->name[0] == '#') - { - clear_variable (&default_dict, v); - free (v); - continue; - } + /* Start a new casefile. */ + aux->casefile = casefile_create (dict_get_next_value_idx (default_dict)); - v->fv = nval; - if (v->type == NUMERIC) - nval++; - else - nval += DIV_RND_UP (v->width, sizeof (union value)); - - default_dict.var[copy_index++] = v; - } - if (copy_index != default_dict.nvar) - { - default_dict.var = xrealloc (default_dict.var, - sizeof *default_dict.var * copy_index); - default_dict.nvar = copy_index; + /* Record split values. */ + dump_splits (c); + case_destroy (&aux->prev_case); + case_clone (&aux->prev_case, c); } + + casefile_append (aux->casefile, c); + + return 1; } - +static void +multipass_split_output (struct multipass_split_aux_data *aux) +{ + assert (aux->casefile != NULL); + aux->split_func (aux->casefile, aux->func_aux); + casefile_destroy (aux->casefile); + aux->casefile = NULL; +}