X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;ds=sidebyside;f=src%2Fvfm.c;h=82af200015d6aa40dc19f4fb6215d8a7c0480201;hb=b70dc1a6b7b15967ceb111f80dff65c44f0fac57;hp=45d9c28e8b436284bc795bd033b4479cf8a62433;hpb=18febf84744dc7ab4248542c2f88d91c01ef3fa1;p=pspp diff --git a/src/vfm.c b/src/vfm.c index 45d9c28e8b..82af200015 100644 --- a/src/vfm.c +++ b/src/vfm.c @@ -28,12 +28,12 @@ #include /* Required by SunOS4. */ #endif #include "alloc.h" -#include "approx.h" #include "do-ifP.h" #include "error.h" #include "expr.h" #include "misc.h" #include "random.h" +#include "settings.h" #include "som.h" #include "str.h" #include "tab.h" @@ -43,10 +43,11 @@ /* Virtual File Manager (vfm): - vfm is used to process data files. It uses the model that data is - read from one stream (the data source), then written to another - (the data sink). The data source is then deleted and the data sink - becomes the data source for the next procedure. */ + vfm is used to process data files. It uses the model that + data is read from one stream (the data source), processed, + then written to another (the data sink). The data source is + then deleted and the data sink becomes the data source for the + next procedure. */ #include "debug-print.h" @@ -59,11 +60,11 @@ struct write_case_data void *aux; }; -/* This is used to read from the active file. */ -struct case_stream *vfm_source; +/* The current active file, from which cases are read. */ +struct case_source *vfm_source; -/* This is used to write to the replacement active file. */ -struct case_stream *vfm_sink; +/* The replacement active file, to which cases are written. */ +struct case_sink *vfm_sink; /* Information about the data source. */ struct stream_info vfm_source_info; @@ -71,17 +72,6 @@ struct stream_info vfm_source_info; /* Information about the data sink. */ struct stream_info vfm_sink_info; -/* Filter variable and `value' index. */ -static struct variable *filter_var; -static int filter_index; - -#define FILTERED \ - (filter_index != -1 \ - && (temp_case->data[filter_index].f == 0.0 \ - || temp_case->data[filter_index].f == SYSMIS \ - || is_num_user_missing (temp_case->data[filter_index].f, \ - filter_var))) - /* Nonzero if the case needs to have values deleted before being stored, zero otherwise. */ int compaction_necessary; @@ -94,9 +84,11 @@ int compaction_nval; `value's. */ struct ccase *compaction_case; -/* Within a session, when paging is turned on, it is never turned back - off. This policy might be too aggressive. */ -static int paging = 0; +/* Nonzero means that we've overflowed our allotted workspace. + After that happens once during a session, we always store the + active file on disk instead of in memory. (This policy may be + too aggressive.) */ +static int workspace_overflow = 0; /* Time at which vfm was last invoked. */ time_t last_vfm_invocation; @@ -117,12 +109,13 @@ static void finish_compaction (void); static void lag_case (void); static int procedure_write_case (struct write_case_data *); static void clear_temp_case (void); +static int exclude_this_case (void); /* Public functions. */ /* Reads all the cases from the active file, transforms them by the active set of transformations, calls PROCFUNC with CURCASE - set to the case , and writes them to a new active file. + set to the case, and writes them to a new active file. Divides the active file into zero or more series of one or more cases each. BEGINFUNC is called before each series. ENDFUNC is @@ -136,9 +129,13 @@ procedure (void (*beginfunc) (void *), void (*endfunc) (void *), void *aux) { + static int recursive_call; + struct write_case_data procedure_write_data; struct write_case_data split_file_data; + assert (++recursive_call == 1); + if (dict_get_split_cnt (default_dict) == 0) { /* Normally we just use the data passed by the user. */ @@ -164,8 +161,12 @@ procedure (void (*beginfunc) (void *), last_vfm_invocation = time (NULL); open_active_file (); - vfm_source->read (procedure_write_case, &procedure_write_data); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, + procedure_write_case, &procedure_write_data); close_active_file (&procedure_write_data); + + assert (--recursive_call == 0); } /* Active file processing support. Subtly different semantics from @@ -202,9 +203,9 @@ process_active_file (void (*beginfunc) (void *), beginfunc (aux); /* There doesn't necessarily need to be an active file. */ - if (vfm_source) - vfm_source->read (process_active_file_write_case, - &process_active_write_data); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, process_active_file_write_case, + &process_active_write_data); endfunc (aux); close_active_file (&process_active_write_data); @@ -242,10 +243,7 @@ process_active_file_write_case (struct write_case_data *data) lag_case (); /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */ - if (not_canceled - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) + if (not_canceled && !exclude_this_case ()) not_canceled = data->procfunc (temp_case, data->aux); case_count++; @@ -261,7 +259,7 @@ void process_active_file_output_case (void) { vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); } /* Opening the active file. */ @@ -288,26 +286,27 @@ prepare_for_writing (void) file--it's just a waste of time and space. */ vfm_sink_info.ncases = 0; - vfm_sink_info.nval = dict_get_value_cnt (default_dict); - vfm_sink_info.case_size = (sizeof (struct ccase) - + ((dict_get_value_cnt (default_dict) - 1) - * sizeof (union value))); + vfm_sink_info.nval = dict_get_next_value_idx (default_dict); + vfm_sink_info.case_size = dict_get_case_size (default_dict); if (vfm_sink == NULL) { - if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE - && !paging) + if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace + && !workspace_overflow) { msg (MW, _("Workspace overflow predicted. Max workspace is " "currently set to %d KB (%d cases at %d bytes each). " - "Paging active file to disk."), - MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size, + "Writing active file to disk."), + set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size, vfm_sink_info.case_size); - paging = 1; + workspace_overflow = 1; } - - vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream; + + if (workspace_overflow) + vfm_sink = create_case_sink (&disk_sink_class, NULL); + else + vfm_sink = create_case_sink (&memory_sink_class, NULL); } } @@ -331,17 +330,20 @@ arrange_compaction (void) count_values += v->nv; } } - assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict)); + assert (temporary == 2 + || count_values <= dict_get_next_value_idx (temp_dict)); } /* Compaction is only necessary if the number of `value's to output differs from the number already present. */ compaction_nval = count_values; - compaction_necessary = (temporary == 2 - || count_values != dict_get_value_cnt (temp_dict)); + if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict)) + compaction_necessary = 1; + else + compaction_necessary = 0; - if (vfm_sink->init) - vfm_sink->init (); + if (vfm_sink->class->open != NULL) + vfm_sink->class->open (vfm_sink); } /* Prepares the temporary case and compaction case. */ @@ -399,21 +401,6 @@ vector_initialization (void) } } -/* Sets filter_index to an appropriate value. */ -static void -setup_filter (void) -{ - filter_var = dict_get_filter (default_dict); - - if (filter_var != NULL) - { - assert (filter_var->type == NUMERIC); - filter_index = filter_var->index; - } else { - filter_index = -1; - } -} - /* Sets all the lag-related variables based on value of n_lag. */ static void setup_lag (void) @@ -427,8 +414,7 @@ setup_lag (void) lag_head = 0; lag_queue = xmalloc (n_lag * sizeof *lag_queue); for (i = 0; i < n_lag; i++) - lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict) - * sizeof **lag_queue); + lag_queue[i] = xmalloc (dict_get_case_size (temp_dict)); } /* There is a lot of potential confusion in the vfm and related @@ -489,7 +475,6 @@ open_active_file (void) make_temp_case (); vector_initialization (); discard_ctl_stack (); - setup_filter (); setup_lag (); /* Debug output. */ @@ -535,18 +520,21 @@ close_active_file (struct write_case_data *data) finish_compaction (); /* Old data sink --> New data source. */ - if (vfm_source && vfm_source->destroy_source) - vfm_source->destroy_source (); - - vfm_source = vfm_sink; + if (vfm_source != NULL) + { + if (vfm_source->class->destroy != NULL) + vfm_source->class->destroy (vfm_source); + free (vfm_source); + } + + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_source_info.ncases = vfm_sink_info.ncases; vfm_source_info.nval = compaction_nval; vfm_source_info.case_size = (sizeof (struct ccase) + (compaction_nval - 1) * sizeof (union value)); - if (vfm_source->mode) - vfm_source->mode (); /* Old data sink is gone now. */ + free (vfm_sink); vfm_sink = NULL; /* Cancel TEMPORARY. */ @@ -564,7 +552,7 @@ close_active_file (struct write_case_data *data) process_if_expr = NULL; /* Cancel FILTER if temporary. */ - if (filter_var != NULL && !FILTER_before_TEMPORARY) + if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY) dict_set_filter (default_dict, NULL); /* Cancel transformations. */ @@ -581,16 +569,12 @@ close_active_file (struct write_case_data *data) /* Disk case stream. */ -/* Associated files. */ -FILE *disk_source_file; -FILE *disk_sink_file; - /* Initializes the disk sink. */ static void -disk_stream_init (void) +disk_sink_create (struct case_sink *sink) { - disk_sink_file = tmpfile (); - if (!disk_sink_file) + sink->aux = tmpfile (); + if (!sink->aux) { msg (ME, _("An error occurred attempting to create a temporary " "file for use as the active file: %s."), @@ -599,44 +583,21 @@ disk_stream_init (void) } } -/* Reads all cases from the disk source and passes them one by one to - write_case(). */ -static void -disk_stream_read (write_case_func *write_case, write_case_data wc_data) -{ - int i; - - for (i = 0; i < vfm_source_info.ncases; i++) - { - if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file)) - { - msg (ME, _("An error occurred while attempting to read from " - "a temporary file created for the active file: %s."), - strerror (errno)); - err_failure (); - return; - } - - if (!write_case (wc_data)) - return; - } -} - /* Writes temp_case to the disk sink. */ static void -disk_stream_write (void) +disk_sink_write (struct case_sink *sink, struct ccase *c) { + FILE *file = sink->aux; union value *src_case; if (compaction_necessary) { - compact_case (compaction_case, temp_case); - src_case = (union value *) compaction_case; + compact_case (compaction_case, c); + src_case = compaction_case->data; } - else src_case = (union value *) temp_case; + else src_case = c->data; - if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, - disk_sink_file) != 1) + if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1) { msg (ME, _("An error occurred while attempting to write to a " "temporary file used as the active file: %s."), @@ -645,12 +606,25 @@ disk_stream_write (void) } } -/* Switches the stream from a sink to a source. */ +/* Destroys the sink's internal data. */ static void -disk_stream_mode (void) +disk_sink_destroy (struct case_sink *sink) +{ + FILE *file = sink->aux; + if (file != NULL) + fclose (file); +} + +/* Closes and destroys the sink and returns a disk source to read + back the written data. */ +static struct case_source * +disk_sink_make_source (struct case_sink *sink) { - /* Rewind the sink. */ - if (fseek (disk_sink_file, 0, SEEK_SET) != 0) + FILE *file = sink->aux; + + /* Rewind the file. */ + assert (file != NULL); + if (fseek (file, 0, SEEK_SET) != 0) { msg (ME, _("An error occurred while attempting to rewind a " "temporary file used as the active file: %s."), @@ -658,107 +632,118 @@ disk_stream_mode (void) err_failure (); } - /* Sink --> source variables. */ - disk_source_file = disk_sink_file; + return create_case_source (&disk_source_class, file); } -/* Destroys the source's internal data. */ +/* Disk sink. */ +const struct case_sink_class disk_sink_class = + { + "disk", + disk_sink_create, + disk_sink_write, + disk_sink_destroy, + disk_sink_make_source, + }; + +/* Disk source. */ + +/* Reads all cases from the disk source and passes them one by one to + write_case(). */ static void -disk_stream_destroy_source (void) +disk_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - if (disk_source_file) + FILE *file = source->aux; + int i; + + for (i = 0; i < vfm_source_info.ncases; i++) { - fclose (disk_source_file); - disk_source_file = NULL; + if (!fread (temp_case, vfm_source_info.case_size, 1, file)) + { + msg (ME, _("An error occurred while attempting to read from " + "a temporary file created for the active file: %s."), + strerror (errno)); + err_failure (); + return; + } + + if (!write_case (wc_data)) + return; } } -/* Destroys the sink's internal data. */ +/* Destroys the source's internal data. */ static void -disk_stream_destroy_sink (void) +disk_source_destroy (struct case_source *source) { - if (disk_sink_file) - { - fclose (disk_sink_file); - disk_sink_file = NULL; - } + FILE *file = source->aux; + if (file != NULL) + fclose (file); } -/* Disk stream. */ -struct case_stream vfm_disk_stream = +/* Disk source. */ +const struct case_source_class disk_source_class = { - disk_stream_init, - disk_stream_read, - disk_stream_write, - disk_stream_mode, - disk_stream_destroy_source, - disk_stream_destroy_sink, "disk", + disk_source_read, + disk_source_destroy, }; /* Memory case stream. */ -/* List of cases stored in the stream. */ -struct case_list *memory_source_cases; -struct case_list *memory_sink_cases; - -/* Current case. */ -struct case_list *memory_sink_iter; +/* Memory sink data. */ +struct memory_sink_info + { + int max_cases; /* Maximum cases before switching to disk. */ + struct case_list *head; /* First case in list. */ + struct case_list *tail; /* Last case in list. */ + }; -/* Maximum number of cases. */ -int memory_sink_max_cases; +/* Memory source data. */ +struct memory_source_info + { + struct case_list *cases; /* List of cases. */ + }; -/* Initializes the memory stream variables for writing. */ static void -memory_stream_init (void) +memory_sink_create (struct case_sink *sink) { - memory_sink_cases = NULL; - memory_sink_iter = NULL; + struct memory_sink_info *info; - assert (compaction_nval); - memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval); -} + sink->aux = info = xmalloc (sizeof *info); -/* Reads the case stream from memory and passes it to write_case(). */ -static void -memory_stream_read (write_case_func *write_case, write_case_data wc_data) -{ - while (memory_source_cases != NULL) - { - memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size); - - { - struct case_list *current = memory_source_cases; - memory_source_cases = memory_source_cases->next; - free (current); - } - - if (!write_case (wc_data)) - return; - } + assert (compaction_nval > 0); + info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval); + info->head = info->tail = NULL; } -/* Writes temp_case to the memory stream. */ static void -memory_stream_write (void) +memory_sink_write (struct case_sink *sink, struct ccase *c) { - struct case_list *new_case = malloc (sizeof (struct case_list) - + ((compaction_nval - 1) - * sizeof (union value))); + struct memory_sink_info *info = sink->aux; + size_t case_size; + struct case_list *new_case; + + case_size = sizeof (struct case_list) + + ((compaction_nval - 1) * sizeof (union value)); + new_case = malloc (case_size); /* If we've got memory to spare then add it to the linked list. */ - if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL) + if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL) { - if (compaction_necessary) - compact_case (&new_case->c, temp_case); + /* Append case to linked list. */ + new_case->next = NULL; + if (info->head != NULL) + info->tail->next = new_case; else - memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval); + info->head = new_case; + info->tail = new_case; - /* Append case to linked list. */ - if (memory_sink_cases) - memory_sink_iter = memory_sink_iter->next = new_case; + /* Copy data into case. */ + if (compaction_necessary) + compact_case (&new_case->c, c); else - memory_sink_iter = memory_sink_cases = new_case; + memcpy (&new_case->c, c, sizeof (union value) * compaction_nval); } else { @@ -767,36 +752,32 @@ memory_stream_write (void) /* Notify the user. */ if (!new_case) - msg (MW, _("Virtual memory exhausted. Paging active file " + msg (MW, _("Virtual memory exhausted. Writing active file " "to disk.")); else msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) " - "overflowed. Paging active file to disk."), - MAX_WORKSPACE / 1024, memory_sink_max_cases, + "overflowed. Writing active file to disk."), + set_max_workspace / 1024, info->max_cases, compaction_nval * sizeof (union value)); free (new_case); /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; - - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->head; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -804,36 +785,38 @@ memory_stream_write (void) } /* Write the current case to disk. */ - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, c); } } /* If the data is stored in memory, causes it to be written to disk. To be called only *between* procedure()s, not within them. */ void -page_to_disk (void) +write_active_file_to_disk (void) { - if (vfm_source == &vfm_memory_stream) + if (case_source_is_class (vfm_source, &memory_source_class)) { + struct memory_source_info *info = vfm_source->aux; + /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ { struct case_list *cur, *next; - for (cur = memory_source_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -841,64 +824,107 @@ page_to_disk (void) } } - vfm_source = &vfm_disk_stream; - vfm_source->mode (); - + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_sink = NULL; } } -/* Switch the memory stream from sink to source mode. */ +/* Destroy all memory sink data. */ static void -memory_stream_mode (void) +memory_sink_destroy (struct case_sink *sink) { - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + struct memory_sink_info *info = sink->aux; + struct case_list *cur, *next; + + for (cur = info->head; cur; cur = next) + { + next = cur->next; + free (cur); + } + free (info); +} + +/* Switch the memory stream from sink to source mode. */ +static struct case_source * +memory_sink_make_source (struct case_sink *sink) +{ + struct memory_sink_info *sink_info = sink->aux; + struct memory_source_info *source_info; + + source_info = xmalloc (sizeof *source_info); + source_info->cases = sink_info->head; + + free (sink_info); - /* Sink --> source variables. */ - memory_source_cases = memory_sink_cases; - memory_sink_cases = NULL; + return create_case_source (&memory_source_class, source_info); } -/* Destroy all memory source data. */ +const struct case_sink_class memory_sink_class = + { + "memory", + memory_sink_create, + memory_sink_write, + memory_sink_destroy, + memory_sink_make_source, + }; + +/* Reads the case stream from memory and passes it to write_case(). */ static void -memory_stream_destroy_source (void) +memory_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - struct case_list *cur, *next; - - for (cur = memory_source_cases; cur; cur = next) + struct memory_source_info *info = source->aux; + + while (info->cases != NULL) { - next = cur->next; - free (cur); + struct case_list *iter = info->cases; + info->cases = iter->next; + memcpy (temp_case, &iter->c, vfm_source_info.case_size); + free (iter); + + if (!write_case (wc_data)) + return; } - memory_source_cases = NULL; } -/* Destroy all memory sink data. */ +/* Destroy all memory source data. */ static void -memory_stream_destroy_sink (void) +memory_source_destroy (struct case_source *source) { + struct memory_source_info *info = source->aux; struct case_list *cur, *next; - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; free (cur); } - memory_sink_cases = NULL; + free (info); } - + +struct case_list * +memory_source_get_cases (const struct case_source *source) +{ + struct memory_source_info *info = source->aux; + + return info->cases; +} + +void +memory_source_set_cases (const struct case_source *source, + struct case_list *cases) +{ + struct memory_source_info *info = source->aux; + + info->cases = cases; +} + /* Memory stream. */ -struct case_stream vfm_memory_stream = +const struct case_source_class memory_source_class = { - memory_stream_init, - memory_stream_read, - memory_stream_write, - memory_stream_mode, - memory_stream_destroy_source, - memory_stream_destroy_sink, "memory", + memory_source_read, + memory_source_destroy, }; #include "debug-print.h" @@ -910,7 +936,7 @@ lag_case (void) if (lag_count < n_lag) lag_count++; memcpy (lag_queue[lag_head], temp_case, - sizeof (union value) * dict_get_value_cnt (temp_dict)); + dict_get_case_size (temp_dict)); if (++lag_head >= n_lag) lag_head = 0; } @@ -959,7 +985,7 @@ procedure_write_case (write_case_data wc_data) lag_case (); vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); if (dict_get_case_limit (default_dict)) more_cases = (vfm_sink_info.ncases @@ -1000,10 +1026,7 @@ procedure_write_case (write_case_data wc_data) /* Call the procedure if there is one and FILTER and PROCESS IF don't prohibit it. */ - if (wc_data->procfunc != NULL - && !FILTERED - && (process_if_expr == NULL || - expr_evaluate (process_if_expr, temp_case, NULL) == 1.0)) + if (wc_data->procfunc != NULL && !exclude_this_case ()) wc_data->procfunc (temp_case, wc_data->aux); case_count++; @@ -1040,6 +1063,28 @@ clear_temp_case (void) } } +/* Returns nonzero if this case should be exclude as specified on + FILTER or PROCESS IF, otherwise zero. */ +static int +exclude_this_case (void) +{ + /* FILTER. */ + struct variable *filter_var = dict_get_filter (default_dict); + if (filter_var != NULL) + { + double f = temp_case->data[filter_var->fv].f; + if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var)) + return 1; + } + + /* PROCESS IF. */ + if (process_if_expr != NULL + && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0) + return 1; + + return 0; +} + /* Appends TRNS to t_trns[], the list of all transformations to be performed on data as it is read from the active file. */ void @@ -1101,12 +1146,7 @@ dump_splits (struct ccase *c) assert (v->type == NUMERIC || v->type == ALPHA); tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name); - { - union value val = c->data[v->fv]; - if (v->type == ALPHA) - val.c = c->data[v->fv].s; - data_out (temp_buf, &v->print, &val); - } + data_out (temp_buf, &v->print, &c->data[v->fv]); temp_buf[v->print.w] = 0; tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf); @@ -1158,7 +1198,7 @@ SPLIT_FILE_procfunc (struct ccase *c, void *data_) switch (v->type) { case NUMERIC: - if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f)) + if (c->data[v->fv].f != prev_case->data[v->fv].f) goto not_equal; break; case ALPHA: @@ -1243,4 +1283,36 @@ finish_compaction (void) dict_compact_values (default_dict); } - +struct case_source * +create_case_source (const struct case_source_class *class, void *aux) +{ + struct case_source *source = xmalloc (sizeof *source); + source->class = class; + source->aux = aux; + return source; +} + +int +case_source_is_complex (const struct case_source *source) +{ + return source != NULL && (source->class == &input_program_source_class + || source->class == &file_type_source_class); +} + +int +case_source_is_class (const struct case_source *source, + const struct case_source_class *class) +{ + return source != NULL && source->class == class; + +} + +struct case_sink * +create_case_sink (const struct case_sink_class *class, void *aux) +{ + struct case_sink *sink = xmalloc (sizeof *sink); + sink->class = class; + sink->aux = aux; + return sink; +} +