X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fvfm.c;h=82af200015d6aa40dc19f4fb6215d8a7c0480201;hb=b70dc1a6b7b15967ceb111f80dff65c44f0fac57;hp=84efc3e0004ea4062735ba13dd24f123abed3e0e;hpb=6bc566408707e018674d1776d835c78368b6b5a3;p=pspp diff --git a/src/vfm.c b/src/vfm.c index 84efc3e000..82af200015 100644 --- a/src/vfm.c +++ b/src/vfm.c @@ -28,12 +28,12 @@ #include /* Required by SunOS4. */ #endif #include "alloc.h" -#include "approx.h" #include "do-ifP.h" #include "error.h" #include "expr.h" #include "misc.h" #include "random.h" +#include "settings.h" #include "som.h" #include "str.h" #include "tab.h" @@ -43,24 +43,28 @@ /* Virtual File Manager (vfm): - vfm is used to process data files. It uses the model that data is - read from one stream (the data source), then written to another - (the data sink). The data source is then deleted and the data sink - becomes the data source for the next procedure. */ + vfm is used to process data files. It uses the model that + data is read from one stream (the data source), processed, + then written to another (the data sink). The data source is + then deleted and the data sink becomes the data source for the + next procedure. */ #include "debug-print.h" -/* This is used to read from the active file. */ -struct case_stream *vfm_source; +/* Procedure execution data. */ +struct write_case_data + { + void (*beginfunc) (void *); + int (*procfunc) (struct ccase *, void *); + void (*endfunc) (void *); + void *aux; + }; -/* `value' indexes to initialize to particular values for certain cases. */ -struct long_vec reinit_sysmis; /* SYSMIS for every case. */ -struct long_vec reinit_blanks; /* Blanks for every case. */ -struct long_vec init_zero; /* Zero for first case only. */ -struct long_vec init_blanks; /* Blanks for first case only. */ +/* The current active file, from which cases are read. */ +struct case_source *vfm_source; -/* This is used to write to the replacement active file. */ -struct case_stream *vfm_sink; +/* The replacement active file, to which cases are written. */ +struct case_sink *vfm_sink; /* Information about the data source. */ struct stream_info vfm_source_info; @@ -68,17 +72,6 @@ struct stream_info vfm_source_info; /* Information about the data sink. */ struct stream_info vfm_sink_info; -/* Filter variable and `value' index. */ -static struct variable *filter_var; -static int filter_index; - -#define FILTERED \ - (filter_index != -1 \ - && (temp_case->data[filter_index].f == 0.0 \ - || temp_case->data[filter_index].f == SYSMIS \ - || is_num_user_missing (temp_case->data[filter_index].f, \ - filter_var))) - /* Nonzero if the case needs to have values deleted before being stored, zero otherwise. */ int compaction_necessary; @@ -91,21 +84,15 @@ int compaction_nval; `value's. */ struct ccase *compaction_case; -/* Within a session, when paging is turned on, it is never turned back - off. This policy might be too aggressive. */ -static int paging = 0; +/* Nonzero means that we've overflowed our allotted workspace. + After that happens once during a session, we always store the + active file on disk instead of in memory. (This policy may be + too aggressive.) */ +static int workspace_overflow = 0; /* Time at which vfm was last invoked. */ time_t last_vfm_invocation; -/* Functions called during procedure processing. */ -static int (*proc_func) (struct ccase *); /* Called for each case. */ -static int (*virt_proc_func) (struct ccase *); /* From SPLIT_FILE_procfunc. */ -static void (*begin_func) (void); /* Called at beginning of a series. */ -static void (*virt_begin_func) (void); /* Called by SPLIT_FILE_procfunc. */ -static void (*end_func) (void); /* Called after end of a series. */ -int (*write_case) (void); - /* Number of cases passed to proc_func(). */ static int case_count; @@ -116,53 +103,76 @@ static int lag_head; /* Index where next case will be added. */ static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */ static void open_active_file (void); -static void close_active_file (void); -static int SPLIT_FILE_procfunc (struct ccase *); +static void close_active_file (struct write_case_data *); +static int SPLIT_FILE_procfunc (struct ccase *, void *); static void finish_compaction (void); static void lag_case (void); -static int procedure_write_case (void); +static int procedure_write_case (struct write_case_data *); +static void clear_temp_case (void); +static int exclude_this_case (void); /* Public functions. */ -/* Reads all the cases from the active file, transforms them by the - active set of transformations, calls PROCFUNC with CURCASE set to - the case and CASENUM set to the case number, and writes them to a - new active file. +/* Reads all the cases from the active file, transforms them by + the active set of transformations, calls PROCFUNC with CURCASE + set to the case, and writes them to a new active file. Divides the active file into zero or more series of one or more cases each. BEGINFUNC is called before each series. ENDFUNC is - called after each series. */ + called after each series. + + Arbitrary user-specified data AUX is passed to BEGINFUNC, + PROCFUNC, and ENDFUNC as auxiliary data. */ void -procedure (void (*beginfunc) (void), - int (*procfunc) (struct ccase *curcase), - void (*endfunc) (void)) +procedure (void (*beginfunc) (void *), + int (*procfunc) (struct ccase *curcase, void *), + void (*endfunc) (void *), + void *aux) { - end_func = endfunc; - write_case = procedure_write_case; + static int recursive_call; - if (dict_get_split_cnt (default_dict) != 0 && procfunc != NULL) + struct write_case_data procedure_write_data; + struct write_case_data split_file_data; + + assert (++recursive_call == 1); + + if (dict_get_split_cnt (default_dict) == 0) { - virt_proc_func = procfunc; - proc_func = SPLIT_FILE_procfunc; - - virt_begin_func = beginfunc; - begin_func = NULL; - } else { - begin_func = beginfunc; - proc_func = procfunc; + /* Normally we just use the data passed by the user. */ + procedure_write_data.beginfunc = beginfunc; + procedure_write_data.procfunc = procfunc; + procedure_write_data.endfunc = endfunc; + procedure_write_data.aux = aux; + } + else + { + /* Under SPLIT FILE, we add a layer of indirection. */ + procedure_write_data.beginfunc = NULL; + procedure_write_data.procfunc = SPLIT_FILE_procfunc; + procedure_write_data.endfunc = endfunc; + procedure_write_data.aux = &split_file_data; + + split_file_data.beginfunc = beginfunc; + split_file_data.procfunc = procfunc; + split_file_data.endfunc = endfunc; + split_file_data.aux = aux; } last_vfm_invocation = time (NULL); open_active_file (); - vfm_source->read (); - close_active_file (); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, + procedure_write_case, &procedure_write_data); + close_active_file (&procedure_write_data); + + assert (--recursive_call == 0); } /* Active file processing support. Subtly different semantics from procedure(). */ -static int process_active_file_write_case (void); +static int process_active_file_write_case (struct write_case_data *data); /* The casefunc might want us to stop calling it. */ static int not_canceled; @@ -175,28 +185,35 @@ static int not_canceled; process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */ void -process_active_file (void (*beginfunc) (void), - int (*casefunc) (struct ccase *curcase), - void (*endfunc) (void)) +process_active_file (void (*beginfunc) (void *), + int (*casefunc) (struct ccase *curcase, void *), + void (*endfunc) (void *), + void *aux) { - proc_func = casefunc; - write_case = process_active_file_write_case; + struct write_case_data process_active_write_data; + + process_active_write_data.beginfunc = beginfunc; + process_active_write_data.procfunc = casefunc; + process_active_write_data.endfunc = endfunc; + process_active_write_data.aux = aux; + not_canceled = 1; open_active_file (); - beginfunc (); + beginfunc (aux); /* There doesn't necessarily need to be an active file. */ - if (vfm_source) - vfm_source->read (); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, process_active_file_write_case, + &process_active_write_data); - endfunc (); - close_active_file (); + endfunc (aux); + close_active_file (&process_active_write_data); } /* Pass the current case to casefunc. */ static int -process_active_file_write_case (void) +process_active_file_write_case (struct write_case_data *data) { /* Index of current transformation. */ int cur_trns; @@ -226,25 +243,14 @@ process_active_file_write_case (void) lag_case (); /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */ - if (not_canceled - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) - not_canceled = proc_func (temp_case); + if (not_canceled && !exclude_this_case ()) + not_canceled = data->procfunc (temp_case, data->aux); case_count++; done: - { - long *lp; + clear_temp_case (); - /* This case is finished. Initialize the variables for the next case. */ - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - } - return 1; } @@ -253,7 +259,7 @@ void process_active_file_output_case (void) { vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); } /* Opening the active file. */ @@ -280,26 +286,27 @@ prepare_for_writing (void) file--it's just a waste of time and space. */ vfm_sink_info.ncases = 0; - vfm_sink_info.nval = dict_get_value_cnt (default_dict); - vfm_sink_info.case_size = (sizeof (struct ccase) - + ((dict_get_value_cnt (default_dict) - 1) - * sizeof (union value))); + vfm_sink_info.nval = dict_get_next_value_idx (default_dict); + vfm_sink_info.case_size = dict_get_case_size (default_dict); if (vfm_sink == NULL) { - if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE - && !paging) + if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace + && !workspace_overflow) { msg (MW, _("Workspace overflow predicted. Max workspace is " "currently set to %d KB (%d cases at %d bytes each). " - "Paging active file to disk."), - MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size, + "Writing active file to disk."), + set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size, vfm_sink_info.case_size); - paging = 1; + workspace_overflow = 1; } - - vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream; + + if (workspace_overflow) + vfm_sink = create_case_sink (&disk_sink_class, NULL); + else + vfm_sink = create_case_sink (&memory_sink_class, NULL); } } @@ -323,17 +330,20 @@ arrange_compaction (void) count_values += v->nv; } } - assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict)); + assert (temporary == 2 + || count_values <= dict_get_next_value_idx (temp_dict)); } /* Compaction is only necessary if the number of `value's to output differs from the number already present. */ compaction_nval = count_values; - compaction_necessary = (temporary == 2 - || count_values != dict_get_value_cnt (temp_dict)); + if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict)) + compaction_necessary = 1; + else + compaction_necessary = 0; - if (vfm_sink->init) - vfm_sink->init (); + if (vfm_sink->class->open != NULL) + vfm_sink->class->open (vfm_sink); } /* Prepares the temporary case and compaction case. */ @@ -366,62 +376,28 @@ index_to_varname (int ccase_index) } #endif -/* Initializes temp_case from the vectors that say which `value's need - to be initialized just once, and which ones need to be +/* Initializes temp_case from the vectors that say which `value's + need to be initialized just once, and which ones need to be re-initialized before every case. */ static void vector_initialization (void) { - int i; - long *lp; - - /* Just once. */ - for (i = 0; i < init_zero.n; i++) - temp_case->data[init_zero.vec[i]].f = 0.0; - for (i = 0; i < init_blanks.n; i++) - memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING); - - /* These vectors need to be repeatedly accessed, so we add a - sentinel to (hopefully) improve speed. */ - vec_insert (&reinit_sysmis, -1); - vec_insert (&reinit_blanks, -1); - - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - -#if DEBUGGING - printf ("vfm: init_zero="); - for (i = 0; i < init_zero.n; i++) - printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i])); - printf (" init_blanks="); - for (i = 0; i < init_blanks.n; i++) - printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i])); - printf (" reinit_sysmis="); - for (lp = reinit_sysmis.vec; *lp != -1; lp++) - printf ("%s%s", lp != reinit_sysmis.vec ? "," : "", - index_to_varname (*lp)); - printf (" reinit_blanks="); - for (lp = reinit_blanks.vec; *lp != -1; lp++) - printf ("%s%s", lp != reinit_blanks.vec ? "," : "", - index_to_varname (*lp)); - printf ("\n"); -#endif -} - -/* Sets filter_index to an appropriate value. */ -static void -setup_filter (void) -{ - filter_var = dict_get_filter (default_dict); + size_t var_cnt = dict_get_var_cnt (default_dict); + size_t i; - if (filter_var != NULL) + for (i = 0; i < var_cnt; i++) { - assert (filter_var->type == NUMERIC); - filter_index = filter_var->index; - } else { - filter_index = -1; + struct variable *v = dict_get_var (default_dict, i); + + if (v->type == NUMERIC) + { + if (v->reinit) + temp_case->data[v->fv].f = 0.0; + else + temp_case->data[v->fv].f = SYSMIS; + } + else + memset (temp_case->data[v->fv].s, ' ', v->width); } } @@ -438,8 +414,7 @@ setup_lag (void) lag_head = 0; lag_queue = xmalloc (n_lag * sizeof *lag_queue); for (i = 0; i < n_lag; i++) - lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict) - * sizeof **lag_queue); + lag_queue[i] = xmalloc (dict_get_case_size (temp_dict)); } /* There is a lot of potential confusion in the vfm and related @@ -500,7 +475,6 @@ open_active_file (void) make_temp_case (); vector_initialization (); discard_ctl_stack (); - setup_filter (); setup_lag (); /* Debug output. */ @@ -515,11 +489,11 @@ open_active_file (void) /* Closes the active file. */ static void -close_active_file (void) +close_active_file (struct write_case_data *data) { /* Close the current case group. */ - if (case_count && end_func != NULL) - end_func (); + if (case_count && data->endfunc != NULL) + data->endfunc (data->aux); /* Stop lagging (catch up?). */ if (n_lag) @@ -546,18 +520,21 @@ close_active_file (void) finish_compaction (); /* Old data sink --> New data source. */ - if (vfm_source && vfm_source->destroy_source) - vfm_source->destroy_source (); - - vfm_source = vfm_sink; + if (vfm_source != NULL) + { + if (vfm_source->class->destroy != NULL) + vfm_source->class->destroy (vfm_source); + free (vfm_source); + } + + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_source_info.ncases = vfm_sink_info.ncases; vfm_source_info.nval = compaction_nval; vfm_source_info.case_size = (sizeof (struct ccase) + (compaction_nval - 1) * sizeof (union value)); - if (vfm_source->mode) - vfm_source->mode (); /* Old data sink is gone now. */ + free (vfm_sink); vfm_sink = NULL; /* Cancel TEMPORARY. */ @@ -575,18 +552,12 @@ close_active_file (void) process_if_expr = NULL; /* Cancel FILTER if temporary. */ - if (filter_var != NULL && !FILTER_before_TEMPORARY) + if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY) dict_set_filter (default_dict, NULL); /* Cancel transformations. */ cancel_transformations (); - /* Clear value-initialization vectors. */ - vec_clear (&init_zero); - vec_clear (&init_blanks); - vec_clear (&reinit_sysmis); - vec_clear (&reinit_blanks); - /* Turn off case limiter. */ dict_set_case_limit (default_dict, 0); @@ -598,16 +569,12 @@ close_active_file (void) /* Disk case stream. */ -/* Associated files. */ -FILE *disk_source_file; -FILE *disk_sink_file; - /* Initializes the disk sink. */ static void -disk_stream_init (void) +disk_sink_create (struct case_sink *sink) { - disk_sink_file = tmpfile (); - if (!disk_sink_file) + sink->aux = tmpfile (); + if (!sink->aux) { msg (ME, _("An error occurred attempting to create a temporary " "file for use as the active file: %s."), @@ -616,44 +583,21 @@ disk_stream_init (void) } } -/* Reads all cases from the disk source and passes them one by one to - write_case(). */ -static void -disk_stream_read (void) -{ - int i; - - for (i = 0; i < vfm_source_info.ncases; i++) - { - if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file)) - { - msg (ME, _("An error occurred while attempting to read from " - "a temporary file created for the active file: %s."), - strerror (errno)); - err_failure (); - return; - } - - if (!write_case ()) - return; - } -} - /* Writes temp_case to the disk sink. */ static void -disk_stream_write (void) +disk_sink_write (struct case_sink *sink, struct ccase *c) { + FILE *file = sink->aux; union value *src_case; if (compaction_necessary) { - compact_case (compaction_case, temp_case); - src_case = (union value *) compaction_case; + compact_case (compaction_case, c); + src_case = compaction_case->data; } - else src_case = (union value *) temp_case; + else src_case = c->data; - if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, - disk_sink_file) != 1) + if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1) { msg (ME, _("An error occurred while attempting to write to a " "temporary file used as the active file: %s."), @@ -662,12 +606,25 @@ disk_stream_write (void) } } -/* Switches the stream from a sink to a source. */ +/* Destroys the sink's internal data. */ static void -disk_stream_mode (void) +disk_sink_destroy (struct case_sink *sink) +{ + FILE *file = sink->aux; + if (file != NULL) + fclose (file); +} + +/* Closes and destroys the sink and returns a disk source to read + back the written data. */ +static struct case_source * +disk_sink_make_source (struct case_sink *sink) { - /* Rewind the sink. */ - if (fseek (disk_sink_file, 0, SEEK_SET) != 0) + FILE *file = sink->aux; + + /* Rewind the file. */ + assert (file != NULL); + if (fseek (file, 0, SEEK_SET) != 0) { msg (ME, _("An error occurred while attempting to rewind a " "temporary file used as the active file: %s."), @@ -675,107 +632,118 @@ disk_stream_mode (void) err_failure (); } - /* Sink --> source variables. */ - disk_source_file = disk_sink_file; + return create_case_source (&disk_source_class, file); } -/* Destroys the source's internal data. */ +/* Disk sink. */ +const struct case_sink_class disk_sink_class = + { + "disk", + disk_sink_create, + disk_sink_write, + disk_sink_destroy, + disk_sink_make_source, + }; + +/* Disk source. */ + +/* Reads all cases from the disk source and passes them one by one to + write_case(). */ static void -disk_stream_destroy_source (void) +disk_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - if (disk_source_file) + FILE *file = source->aux; + int i; + + for (i = 0; i < vfm_source_info.ncases; i++) { - fclose (disk_source_file); - disk_source_file = NULL; + if (!fread (temp_case, vfm_source_info.case_size, 1, file)) + { + msg (ME, _("An error occurred while attempting to read from " + "a temporary file created for the active file: %s."), + strerror (errno)); + err_failure (); + return; + } + + if (!write_case (wc_data)) + return; } } -/* Destroys the sink's internal data. */ +/* Destroys the source's internal data. */ static void -disk_stream_destroy_sink (void) +disk_source_destroy (struct case_source *source) { - if (disk_sink_file) - { - fclose (disk_sink_file); - disk_sink_file = NULL; - } + FILE *file = source->aux; + if (file != NULL) + fclose (file); } -/* Disk stream. */ -struct case_stream vfm_disk_stream = +/* Disk source. */ +const struct case_source_class disk_source_class = { - disk_stream_init, - disk_stream_read, - disk_stream_write, - disk_stream_mode, - disk_stream_destroy_source, - disk_stream_destroy_sink, "disk", + disk_source_read, + disk_source_destroy, }; /* Memory case stream. */ -/* List of cases stored in the stream. */ -struct case_list *memory_source_cases; -struct case_list *memory_sink_cases; - -/* Current case. */ -struct case_list *memory_sink_iter; +/* Memory sink data. */ +struct memory_sink_info + { + int max_cases; /* Maximum cases before switching to disk. */ + struct case_list *head; /* First case in list. */ + struct case_list *tail; /* Last case in list. */ + }; -/* Maximum number of cases. */ -int memory_sink_max_cases; +/* Memory source data. */ +struct memory_source_info + { + struct case_list *cases; /* List of cases. */ + }; -/* Initializes the memory stream variables for writing. */ static void -memory_stream_init (void) +memory_sink_create (struct case_sink *sink) { - memory_sink_cases = NULL; - memory_sink_iter = NULL; + struct memory_sink_info *info; - assert (compaction_nval); - memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval); -} + sink->aux = info = xmalloc (sizeof *info); -/* Reads the case stream from memory and passes it to write_case(). */ -static void -memory_stream_read (void) -{ - while (memory_source_cases != NULL) - { - memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size); - - { - struct case_list *current = memory_source_cases; - memory_source_cases = memory_source_cases->next; - free (current); - } - - if (!write_case ()) - return; - } + assert (compaction_nval > 0); + info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval); + info->head = info->tail = NULL; } -/* Writes temp_case to the memory stream. */ static void -memory_stream_write (void) +memory_sink_write (struct case_sink *sink, struct ccase *c) { - struct case_list *new_case = malloc (sizeof (struct case_list) - + ((compaction_nval - 1) - * sizeof (union value))); + struct memory_sink_info *info = sink->aux; + size_t case_size; + struct case_list *new_case; + + case_size = sizeof (struct case_list) + + ((compaction_nval - 1) * sizeof (union value)); + new_case = malloc (case_size); /* If we've got memory to spare then add it to the linked list. */ - if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL) + if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL) { - if (compaction_necessary) - compact_case (&new_case->c, temp_case); + /* Append case to linked list. */ + new_case->next = NULL; + if (info->head != NULL) + info->tail->next = new_case; else - memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval); + info->head = new_case; + info->tail = new_case; - /* Append case to linked list. */ - if (memory_sink_cases) - memory_sink_iter = memory_sink_iter->next = new_case; + /* Copy data into case. */ + if (compaction_necessary) + compact_case (&new_case->c, c); else - memory_sink_iter = memory_sink_cases = new_case; + memcpy (&new_case->c, c, sizeof (union value) * compaction_nval); } else { @@ -784,36 +752,32 @@ memory_stream_write (void) /* Notify the user. */ if (!new_case) - msg (MW, _("Virtual memory exhausted. Paging active file " + msg (MW, _("Virtual memory exhausted. Writing active file " "to disk.")); else msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) " - "overflowed. Paging active file to disk."), - MAX_WORKSPACE / 1024, memory_sink_max_cases, + "overflowed. Writing active file to disk."), + set_max_workspace / 1024, info->max_cases, compaction_nval * sizeof (union value)); free (new_case); /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; - - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->head; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -821,36 +785,38 @@ memory_stream_write (void) } /* Write the current case to disk. */ - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, c); } } /* If the data is stored in memory, causes it to be written to disk. To be called only *between* procedure()s, not within them. */ void -page_to_disk (void) +write_active_file_to_disk (void) { - if (vfm_source == &vfm_memory_stream) + if (case_source_is_class (vfm_source, &memory_source_class)) { + struct memory_source_info *info = vfm_source->aux; + /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ { struct case_list *cur, *next; - for (cur = memory_source_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -858,64 +824,107 @@ page_to_disk (void) } } - vfm_source = &vfm_disk_stream; - vfm_source->mode (); - + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_sink = NULL; } } -/* Switch the memory stream from sink to source mode. */ +/* Destroy all memory sink data. */ static void -memory_stream_mode (void) +memory_sink_destroy (struct case_sink *sink) +{ + struct memory_sink_info *info = sink->aux; + struct case_list *cur, *next; + + for (cur = info->head; cur; cur = next) + { + next = cur->next; + free (cur); + } + free (info); +} + +/* Switch the memory stream from sink to source mode. */ +static struct case_source * +memory_sink_make_source (struct case_sink *sink) { - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + struct memory_sink_info *sink_info = sink->aux; + struct memory_source_info *source_info; - /* Sink --> source variables. */ - memory_source_cases = memory_sink_cases; - memory_sink_cases = NULL; + source_info = xmalloc (sizeof *source_info); + source_info->cases = sink_info->head; + + free (sink_info); + + return create_case_source (&memory_source_class, source_info); } -/* Destroy all memory source data. */ +const struct case_sink_class memory_sink_class = + { + "memory", + memory_sink_create, + memory_sink_write, + memory_sink_destroy, + memory_sink_make_source, + }; + +/* Reads the case stream from memory and passes it to write_case(). */ static void -memory_stream_destroy_source (void) +memory_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - struct case_list *cur, *next; - - for (cur = memory_source_cases; cur; cur = next) + struct memory_source_info *info = source->aux; + + while (info->cases != NULL) { - next = cur->next; - free (cur); + struct case_list *iter = info->cases; + info->cases = iter->next; + memcpy (temp_case, &iter->c, vfm_source_info.case_size); + free (iter); + + if (!write_case (wc_data)) + return; } - memory_source_cases = NULL; } -/* Destroy all memory sink data. */ +/* Destroy all memory source data. */ static void -memory_stream_destroy_sink (void) +memory_source_destroy (struct case_source *source) { + struct memory_source_info *info = source->aux; struct case_list *cur, *next; - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; free (cur); } - memory_sink_cases = NULL; + free (info); } - + +struct case_list * +memory_source_get_cases (const struct case_source *source) +{ + struct memory_source_info *info = source->aux; + + return info->cases; +} + +void +memory_source_set_cases (const struct case_source *source, + struct case_list *cases) +{ + struct memory_source_info *info = source->aux; + + info->cases = cases; +} + /* Memory stream. */ -struct case_stream vfm_memory_stream = +const struct case_source_class memory_source_class = { - memory_stream_init, - memory_stream_read, - memory_stream_write, - memory_stream_mode, - memory_stream_destroy_source, - memory_stream_destroy_sink, "memory", + memory_source_read, + memory_source_destroy, }; #include "debug-print.h" @@ -927,7 +936,7 @@ lag_case (void) if (lag_count < n_lag) lag_count++; memcpy (lag_queue[lag_head], temp_case, - sizeof (union value) * dict_get_value_cnt (temp_dict)); + dict_get_case_size (temp_dict)); if (++lag_head >= n_lag) lag_head = 0; } @@ -954,7 +963,7 @@ lagged_case (int n_before) otherwise. Do not call this function again after it has returned zero once. */ int -procedure_write_case (void) +procedure_write_case (write_case_data wc_data) { /* Index of current transformation. */ int cur_trns; @@ -976,7 +985,7 @@ procedure_write_case (void) lag_case (); vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); if (dict_get_case_limit (default_dict)) more_cases = (vfm_sink_info.ncases @@ -1012,36 +1021,70 @@ procedure_write_case (void) } /* Call the beginning of group function. */ - if (!case_count && begin_func != NULL) - begin_func (); + if (!case_count && wc_data->beginfunc != NULL) + wc_data->beginfunc (wc_data->aux); /* Call the procedure if there is one and FILTER and PROCESS IF don't prohibit it. */ - if (proc_func != NULL - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) - proc_func (temp_case); + if (wc_data->procfunc != NULL && !exclude_this_case ()) + wc_data->procfunc (temp_case, wc_data->aux); case_count++; done: debug_putc ('\n', stdout); - - { - long *lp; - /* This case is finished. Initialize the variables for the next case. */ - for (lp = reinit_sysmis.vec; *lp != -1;) - temp_case->data[*lp++].f = SYSMIS; - for (lp = reinit_blanks.vec; *lp != -1;) - memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING); - } + clear_temp_case (); /* Return previously determined value. */ return more_cases; } +/* Clears the variables in the temporary case that need to be + cleared between processing cases. */ +static void +clear_temp_case (void) +{ + /* FIXME? This is linear in the number of variables, but + doesn't need to be, so it's an easy optimization target. */ + size_t var_cnt = dict_get_var_cnt (default_dict); + size_t i; + + for (i = 0; i < var_cnt; i++) + { + struct variable *v = dict_get_var (default_dict, i); + if (v->init && v->reinit) + { + if (v->type == NUMERIC) + temp_case->data[v->fv].f = SYSMIS; + else + memset (temp_case->data[v->fv].s, ' ', v->width); + } + } +} + +/* Returns nonzero if this case should be exclude as specified on + FILTER or PROCESS IF, otherwise zero. */ +static int +exclude_this_case (void) +{ + /* FILTER. */ + struct variable *filter_var = dict_get_filter (default_dict); + if (filter_var != NULL) + { + double f = temp_case->data[filter_var->fv].f; + if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var)) + return 1; + } + + /* PROCESS IF. */ + if (process_if_expr != NULL + && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0) + return 1; + + return 0; +} + /* Appends TRNS to t_trns[], the list of all transformations to be performed on data as it is read from the active file. */ void @@ -1103,12 +1146,7 @@ dump_splits (struct ccase *c) assert (v->type == NUMERIC || v->type == ALPHA); tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name); - { - union value val = c->data[v->fv]; - if (v->type == ALPHA) - val.c = c->data[v->fv].s; - data_out (temp_buf, &v->print, &val); - } + data_out (temp_buf, &v->print, &c->data[v->fv]); temp_buf[v->print.w] = 0; tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf); @@ -1125,8 +1163,9 @@ dump_splits (struct ccase *c) SPLIT FILE is active. This function forms a wrapper around that procfunc by dividing the input into series. */ static int -SPLIT_FILE_procfunc (struct ccase *c) +SPLIT_FILE_procfunc (struct ccase *c, void *data_) { + struct write_case_data *data = data_; static struct ccase *prev_case; struct variable *const *split; size_t split_cnt; @@ -1142,10 +1181,10 @@ SPLIT_FILE_procfunc (struct ccase *c) memcpy (prev_case, c, vfm_sink_info.case_size); dump_splits (c); - if (virt_begin_func != NULL) - virt_begin_func (); + if (data->beginfunc != NULL) + data->beginfunc (data->aux); - return virt_proc_func (c); + return data->procfunc (c, data->aux); } /* Compare the value of each SPLIT FILE variable to the values on @@ -1159,7 +1198,7 @@ SPLIT_FILE_procfunc (struct ccase *c) switch (v->type) { case NUMERIC: - if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f)) + if (c->data[v->fv].f != prev_case->data[v->fv].f) goto not_equal; break; case ALPHA: @@ -1170,19 +1209,19 @@ SPLIT_FILE_procfunc (struct ccase *c) assert (0); } } - return virt_proc_func (c); + return data->procfunc (c, data->aux); not_equal: /* The values of the SPLIT FILE variable are different from the values on the previous case. That means that it's time to begin a new series. */ - if (end_func != NULL) - end_func (); + if (data->endfunc != NULL) + data->endfunc (data->aux); dump_splits (c); - if (virt_begin_func != NULL) - virt_begin_func (); + if (data->beginfunc != NULL) + data->beginfunc (data->aux); memcpy (prev_case, c, vfm_sink_info.case_size); - return virt_proc_func (c); + return data->procfunc (c, data->aux); } /* Case compaction. */ @@ -1244,4 +1283,36 @@ finish_compaction (void) dict_compact_values (default_dict); } - +struct case_source * +create_case_source (const struct case_source_class *class, void *aux) +{ + struct case_source *source = xmalloc (sizeof *source); + source->class = class; + source->aux = aux; + return source; +} + +int +case_source_is_complex (const struct case_source *source) +{ + return source != NULL && (source->class == &input_program_source_class + || source->class == &file_type_source_class); +} + +int +case_source_is_class (const struct case_source *source, + const struct case_source_class *class) +{ + return source != NULL && source->class == class; + +} + +struct case_sink * +create_case_sink (const struct case_sink_class *class, void *aux) +{ + struct case_sink *sink = xmalloc (sizeof *sink); + sink->class = class; + sink->aux = aux; + return sink; +} +