X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fvfm.c;h=82af200015d6aa40dc19f4fb6215d8a7c0480201;hb=b70dc1a6b7b15967ceb111f80dff65c44f0fac57;hp=3d40f6c029a55429bebfbd3bad27b15ba4913ffa;hpb=312ad23d59e643e0c72e7ad684b00f3432461dee;p=pspp diff --git a/src/vfm.c b/src/vfm.c index 3d40f6c029..82af200015 100644 --- a/src/vfm.c +++ b/src/vfm.c @@ -28,12 +28,12 @@ #include /* Required by SunOS4. */ #endif #include "alloc.h" -#include "approx.h" #include "do-ifP.h" #include "error.h" #include "expr.h" #include "misc.h" #include "random.h" +#include "settings.h" #include "som.h" #include "str.h" #include "tab.h" @@ -43,10 +43,11 @@ /* Virtual File Manager (vfm): - vfm is used to process data files. It uses the model that data is - read from one stream (the data source), then written to another - (the data sink). The data source is then deleted and the data sink - becomes the data source for the next procedure. */ + vfm is used to process data files. It uses the model that + data is read from one stream (the data source), processed, + then written to another (the data sink). The data source is + then deleted and the data sink becomes the data source for the + next procedure. */ #include "debug-print.h" @@ -59,11 +60,11 @@ struct write_case_data void *aux; }; -/* This is used to read from the active file. */ -struct case_stream *vfm_source; +/* The current active file, from which cases are read. */ +struct case_source *vfm_source; -/* This is used to write to the replacement active file. */ -struct case_stream *vfm_sink; +/* The replacement active file, to which cases are written. */ +struct case_sink *vfm_sink; /* Information about the data source. */ struct stream_info vfm_source_info; @@ -83,9 +84,11 @@ int compaction_nval; `value's. */ struct ccase *compaction_case; -/* Within a session, when paging is turned on, it is never turned back - off. This policy might be too aggressive. */ -static int paging = 0; +/* Nonzero means that we've overflowed our allotted workspace. + After that happens once during a session, we always store the + active file on disk instead of in memory. (This policy may be + too aggressive.) */ +static int workspace_overflow = 0; /* Time at which vfm was last invoked. */ time_t last_vfm_invocation; @@ -126,9 +129,13 @@ procedure (void (*beginfunc) (void *), void (*endfunc) (void *), void *aux) { + static int recursive_call; + struct write_case_data procedure_write_data; struct write_case_data split_file_data; + assert (++recursive_call == 1); + if (dict_get_split_cnt (default_dict) == 0) { /* Normally we just use the data passed by the user. */ @@ -154,8 +161,12 @@ procedure (void (*beginfunc) (void *), last_vfm_invocation = time (NULL); open_active_file (); - vfm_source->read (procedure_write_case, &procedure_write_data); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, + procedure_write_case, &procedure_write_data); close_active_file (&procedure_write_data); + + assert (--recursive_call == 0); } /* Active file processing support. Subtly different semantics from @@ -192,9 +203,9 @@ process_active_file (void (*beginfunc) (void *), beginfunc (aux); /* There doesn't necessarily need to be an active file. */ - if (vfm_source) - vfm_source->read (process_active_file_write_case, - &process_active_write_data); + if (vfm_source != NULL) + vfm_source->class->read (vfm_source, process_active_file_write_case, + &process_active_write_data); endfunc (aux); close_active_file (&process_active_write_data); @@ -248,7 +259,7 @@ void process_active_file_output_case (void) { vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); } /* Opening the active file. */ @@ -280,19 +291,22 @@ prepare_for_writing (void) if (vfm_sink == NULL) { - if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE - && !paging) + if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace + && !workspace_overflow) { msg (MW, _("Workspace overflow predicted. Max workspace is " "currently set to %d KB (%d cases at %d bytes each). " - "Paging active file to disk."), - MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size, + "Writing active file to disk."), + set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size, vfm_sink_info.case_size); - paging = 1; + workspace_overflow = 1; } - - vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream; + + if (workspace_overflow) + vfm_sink = create_case_sink (&disk_sink_class, NULL); + else + vfm_sink = create_case_sink (&memory_sink_class, NULL); } } @@ -328,8 +342,8 @@ arrange_compaction (void) else compaction_necessary = 0; - if (vfm_sink->init) - vfm_sink->init (); + if (vfm_sink->class->open != NULL) + vfm_sink->class->open (vfm_sink); } /* Prepares the temporary case and compaction case. */ @@ -506,18 +520,21 @@ close_active_file (struct write_case_data *data) finish_compaction (); /* Old data sink --> New data source. */ - if (vfm_source && vfm_source->destroy_source) - vfm_source->destroy_source (); - - vfm_source = vfm_sink; + if (vfm_source != NULL) + { + if (vfm_source->class->destroy != NULL) + vfm_source->class->destroy (vfm_source); + free (vfm_source); + } + + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_source_info.ncases = vfm_sink_info.ncases; vfm_source_info.nval = compaction_nval; vfm_source_info.case_size = (sizeof (struct ccase) + (compaction_nval - 1) * sizeof (union value)); - if (vfm_source->mode) - vfm_source->mode (); /* Old data sink is gone now. */ + free (vfm_sink); vfm_sink = NULL; /* Cancel TEMPORARY. */ @@ -552,16 +569,12 @@ close_active_file (struct write_case_data *data) /* Disk case stream. */ -/* Associated files. */ -FILE *disk_source_file; -FILE *disk_sink_file; - /* Initializes the disk sink. */ static void -disk_stream_init (void) +disk_sink_create (struct case_sink *sink) { - disk_sink_file = tmpfile (); - if (!disk_sink_file) + sink->aux = tmpfile (); + if (!sink->aux) { msg (ME, _("An error occurred attempting to create a temporary " "file for use as the active file: %s."), @@ -570,44 +583,21 @@ disk_stream_init (void) } } -/* Reads all cases from the disk source and passes them one by one to - write_case(). */ -static void -disk_stream_read (write_case_func *write_case, write_case_data wc_data) -{ - int i; - - for (i = 0; i < vfm_source_info.ncases; i++) - { - if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file)) - { - msg (ME, _("An error occurred while attempting to read from " - "a temporary file created for the active file: %s."), - strerror (errno)); - err_failure (); - return; - } - - if (!write_case (wc_data)) - return; - } -} - /* Writes temp_case to the disk sink. */ static void -disk_stream_write (void) +disk_sink_write (struct case_sink *sink, struct ccase *c) { + FILE *file = sink->aux; union value *src_case; if (compaction_necessary) { - compact_case (compaction_case, temp_case); - src_case = (union value *) compaction_case; + compact_case (compaction_case, c); + src_case = compaction_case->data; } - else src_case = (union value *) temp_case; + else src_case = c->data; - if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, - disk_sink_file) != 1) + if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1) { msg (ME, _("An error occurred while attempting to write to a " "temporary file used as the active file: %s."), @@ -616,12 +606,25 @@ disk_stream_write (void) } } -/* Switches the stream from a sink to a source. */ +/* Destroys the sink's internal data. */ static void -disk_stream_mode (void) +disk_sink_destroy (struct case_sink *sink) { - /* Rewind the sink. */ - if (fseek (disk_sink_file, 0, SEEK_SET) != 0) + FILE *file = sink->aux; + if (file != NULL) + fclose (file); +} + +/* Closes and destroys the sink and returns a disk source to read + back the written data. */ +static struct case_source * +disk_sink_make_source (struct case_sink *sink) +{ + FILE *file = sink->aux; + + /* Rewind the file. */ + assert (file != NULL); + if (fseek (file, 0, SEEK_SET) != 0) { msg (ME, _("An error occurred while attempting to rewind a " "temporary file used as the active file: %s."), @@ -629,107 +632,118 @@ disk_stream_mode (void) err_failure (); } - /* Sink --> source variables. */ - disk_source_file = disk_sink_file; + return create_case_source (&disk_source_class, file); } -/* Destroys the source's internal data. */ +/* Disk sink. */ +const struct case_sink_class disk_sink_class = + { + "disk", + disk_sink_create, + disk_sink_write, + disk_sink_destroy, + disk_sink_make_source, + }; + +/* Disk source. */ + +/* Reads all cases from the disk source and passes them one by one to + write_case(). */ static void -disk_stream_destroy_source (void) +disk_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - if (disk_source_file) + FILE *file = source->aux; + int i; + + for (i = 0; i < vfm_source_info.ncases; i++) { - fclose (disk_source_file); - disk_source_file = NULL; + if (!fread (temp_case, vfm_source_info.case_size, 1, file)) + { + msg (ME, _("An error occurred while attempting to read from " + "a temporary file created for the active file: %s."), + strerror (errno)); + err_failure (); + return; + } + + if (!write_case (wc_data)) + return; } } -/* Destroys the sink's internal data. */ +/* Destroys the source's internal data. */ static void -disk_stream_destroy_sink (void) +disk_source_destroy (struct case_source *source) { - if (disk_sink_file) - { - fclose (disk_sink_file); - disk_sink_file = NULL; - } + FILE *file = source->aux; + if (file != NULL) + fclose (file); } -/* Disk stream. */ -struct case_stream vfm_disk_stream = +/* Disk source. */ +const struct case_source_class disk_source_class = { - disk_stream_init, - disk_stream_read, - disk_stream_write, - disk_stream_mode, - disk_stream_destroy_source, - disk_stream_destroy_sink, "disk", + disk_source_read, + disk_source_destroy, }; /* Memory case stream. */ -/* List of cases stored in the stream. */ -struct case_list *memory_source_cases; -struct case_list *memory_sink_cases; - -/* Current case. */ -struct case_list *memory_sink_iter; +/* Memory sink data. */ +struct memory_sink_info + { + int max_cases; /* Maximum cases before switching to disk. */ + struct case_list *head; /* First case in list. */ + struct case_list *tail; /* Last case in list. */ + }; -/* Maximum number of cases. */ -int memory_sink_max_cases; +/* Memory source data. */ +struct memory_source_info + { + struct case_list *cases; /* List of cases. */ + }; -/* Initializes the memory stream variables for writing. */ static void -memory_stream_init (void) +memory_sink_create (struct case_sink *sink) { - memory_sink_cases = NULL; - memory_sink_iter = NULL; + struct memory_sink_info *info; - assert (compaction_nval); - memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval); -} + sink->aux = info = xmalloc (sizeof *info); -/* Reads the case stream from memory and passes it to write_case(). */ -static void -memory_stream_read (write_case_func *write_case, write_case_data wc_data) -{ - while (memory_source_cases != NULL) - { - memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size); - - { - struct case_list *current = memory_source_cases; - memory_source_cases = memory_source_cases->next; - free (current); - } - - if (!write_case (wc_data)) - return; - } + assert (compaction_nval > 0); + info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval); + info->head = info->tail = NULL; } -/* Writes temp_case to the memory stream. */ static void -memory_stream_write (void) +memory_sink_write (struct case_sink *sink, struct ccase *c) { - struct case_list *new_case = malloc (sizeof (struct case_list) - + ((compaction_nval - 1) - * sizeof (union value))); + struct memory_sink_info *info = sink->aux; + size_t case_size; + struct case_list *new_case; + + case_size = sizeof (struct case_list) + + ((compaction_nval - 1) * sizeof (union value)); + new_case = malloc (case_size); /* If we've got memory to spare then add it to the linked list. */ - if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL) + if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL) { - if (compaction_necessary) - compact_case (&new_case->c, temp_case); + /* Append case to linked list. */ + new_case->next = NULL; + if (info->head != NULL) + info->tail->next = new_case; else - memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval); + info->head = new_case; + info->tail = new_case; - /* Append case to linked list. */ - if (memory_sink_cases) - memory_sink_iter = memory_sink_iter->next = new_case; + /* Copy data into case. */ + if (compaction_necessary) + compact_case (&new_case->c, c); else - memory_sink_iter = memory_sink_cases = new_case; + memcpy (&new_case->c, c, sizeof (union value) * compaction_nval); } else { @@ -738,36 +752,32 @@ memory_stream_write (void) /* Notify the user. */ if (!new_case) - msg (MW, _("Virtual memory exhausted. Paging active file " + msg (MW, _("Virtual memory exhausted. Writing active file " "to disk.")); else msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) " - "overflowed. Paging active file to disk."), - MAX_WORKSPACE / 1024, memory_sink_max_cases, + "overflowed. Writing active file to disk."), + set_max_workspace / 1024, info->max_cases, compaction_nval * sizeof (union value)); free (new_case); /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; - - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->head; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -775,36 +785,38 @@ memory_stream_write (void) } /* Write the current case to disk. */ - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, c); } } /* If the data is stored in memory, causes it to be written to disk. To be called only *between* procedure()s, not within them. */ void -page_to_disk (void) +write_active_file_to_disk (void) { - if (vfm_source == &vfm_memory_stream) + if (case_source_is_class (vfm_source, &memory_source_class)) { + struct memory_source_info *info = vfm_source->aux; + /* Switch to a disk sink. */ - vfm_sink = &vfm_disk_stream; - vfm_sink->init (); - paging = 1; + vfm_sink = create_case_sink (&disk_sink_class, NULL); + vfm_sink->class->open (vfm_sink); + workspace_overflow = 1; /* Write the cases to disk and destroy them. We can't call vfm->sink->write() because of compaction. */ { struct case_list *cur, *next; - for (cur = memory_source_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1, - disk_sink_file) != 1) + vfm_sink->aux) != 1) { msg (ME, _("An error occurred while attempting to " "write to a temporary file created as the " - "active file, while paging to disk: %s."), + "active file: %s."), strerror (errno)); err_failure (); } @@ -812,64 +824,107 @@ page_to_disk (void) } } - vfm_source = &vfm_disk_stream; - vfm_source->mode (); - + vfm_source = vfm_sink->class->make_source (vfm_sink); vfm_sink = NULL; } } -/* Switch the memory stream from sink to source mode. */ +/* Destroy all memory sink data. */ static void -memory_stream_mode (void) +memory_sink_destroy (struct case_sink *sink) { - /* Terminate the list. */ - if (memory_sink_iter) - memory_sink_iter->next = NULL; + struct memory_sink_info *info = sink->aux; + struct case_list *cur, *next; + + for (cur = info->head; cur; cur = next) + { + next = cur->next; + free (cur); + } + free (info); +} + +/* Switch the memory stream from sink to source mode. */ +static struct case_source * +memory_sink_make_source (struct case_sink *sink) +{ + struct memory_sink_info *sink_info = sink->aux; + struct memory_source_info *source_info; + + source_info = xmalloc (sizeof *source_info); + source_info->cases = sink_info->head; + + free (sink_info); - /* Sink --> source variables. */ - memory_source_cases = memory_sink_cases; - memory_sink_cases = NULL; + return create_case_source (&memory_source_class, source_info); } -/* Destroy all memory source data. */ +const struct case_sink_class memory_sink_class = + { + "memory", + memory_sink_create, + memory_sink_write, + memory_sink_destroy, + memory_sink_make_source, + }; + +/* Reads the case stream from memory and passes it to write_case(). */ static void -memory_stream_destroy_source (void) +memory_source_read (struct case_source *source, + write_case_func *write_case, write_case_data wc_data) { - struct case_list *cur, *next; - - for (cur = memory_source_cases; cur; cur = next) + struct memory_source_info *info = source->aux; + + while (info->cases != NULL) { - next = cur->next; - free (cur); + struct case_list *iter = info->cases; + info->cases = iter->next; + memcpy (temp_case, &iter->c, vfm_source_info.case_size); + free (iter); + + if (!write_case (wc_data)) + return; } - memory_source_cases = NULL; } -/* Destroy all memory sink data. */ +/* Destroy all memory source data. */ static void -memory_stream_destroy_sink (void) +memory_source_destroy (struct case_source *source) { + struct memory_source_info *info = source->aux; struct case_list *cur, *next; - for (cur = memory_sink_cases; cur; cur = next) + for (cur = info->cases; cur; cur = next) { next = cur->next; free (cur); } - memory_sink_cases = NULL; + free (info); } - + +struct case_list * +memory_source_get_cases (const struct case_source *source) +{ + struct memory_source_info *info = source->aux; + + return info->cases; +} + +void +memory_source_set_cases (const struct case_source *source, + struct case_list *cases) +{ + struct memory_source_info *info = source->aux; + + info->cases = cases; +} + /* Memory stream. */ -struct case_stream vfm_memory_stream = +const struct case_source_class memory_source_class = { - memory_stream_init, - memory_stream_read, - memory_stream_write, - memory_stream_mode, - memory_stream_destroy_source, - memory_stream_destroy_sink, "memory", + memory_source_read, + memory_source_destroy, }; #include "debug-print.h" @@ -930,7 +985,7 @@ procedure_write_case (write_case_data wc_data) lag_case (); vfm_sink_info.ncases++; - vfm_sink->write (); + vfm_sink->class->write (vfm_sink, temp_case); if (dict_get_case_limit (default_dict)) more_cases = (vfm_sink_info.ncases @@ -1091,12 +1146,7 @@ dump_splits (struct ccase *c) assert (v->type == NUMERIC || v->type == ALPHA); tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name); - { - union value val = c->data[v->fv]; - if (v->type == ALPHA) - val.c = c->data[v->fv].s; - data_out (temp_buf, &v->print, &val); - } + data_out (temp_buf, &v->print, &c->data[v->fv]); temp_buf[v->print.w] = 0; tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf); @@ -1148,7 +1198,7 @@ SPLIT_FILE_procfunc (struct ccase *c, void *data_) switch (v->type) { case NUMERIC: - if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f)) + if (c->data[v->fv].f != prev_case->data[v->fv].f) goto not_equal; break; case ALPHA: @@ -1233,4 +1283,36 @@ finish_compaction (void) dict_compact_values (default_dict); } - +struct case_source * +create_case_source (const struct case_source_class *class, void *aux) +{ + struct case_source *source = xmalloc (sizeof *source); + source->class = class; + source->aux = aux; + return source; +} + +int +case_source_is_complex (const struct case_source *source) +{ + return source != NULL && (source->class == &input_program_source_class + || source->class == &file_type_source_class); +} + +int +case_source_is_class (const struct case_source *source, + const struct case_source_class *class) +{ + return source != NULL && source->class == class; + +} + +struct case_sink * +create_case_sink (const struct case_sink_class *class, void *aux) +{ + struct case_sink *sink = xmalloc (sizeof *sink); + sink->class = class; + sink->aux = aux; + return sink; +} +