#include "expr.h"
#include "misc.h"
#include "random.h"
+#include "settings.h"
#include "som.h"
#include "str.h"
#include "tab.h"
/*
Virtual File Manager (vfm):
- vfm is used to process data files. It uses the model that data is
- read from one stream (the data source), then written to another
- (the data sink). The data source is then deleted and the data sink
- becomes the data source for the next procedure. */
+ vfm is used to process data files. It uses the model that
+ data is read from one stream (the data source), processed,
+ then written to another (the data sink). The data source is
+ then deleted and the data sink becomes the data source for the
+ next procedure. */
#include "debug-print.h"
void *aux;
};
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
/* Information about the data source. */
struct stream_info vfm_source_info;
`value's. */
struct ccase *compaction_case;
-/* Within a session, when paging is turned on, it is never turned back
- off. This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+ After that happens once during a session, we always store the
+ active file on disk instead of in memory. (This policy may be
+ too aggressive.) */
+static int workspace_overflow = 0;
/* Time at which vfm was last invoked. */
time_t last_vfm_invocation;
last_vfm_invocation = time (NULL);
open_active_file ();
- vfm_source->read (procedure_write_case, &procedure_write_data);
+ if (vfm_source != NULL)
+ vfm_source->class->read (vfm_source,
+ procedure_write_case, &procedure_write_data);
close_active_file (&procedure_write_data);
assert (--recursive_call == 0);
beginfunc (aux);
/* There doesn't necessarily need to be an active file. */
- if (vfm_source)
- vfm_source->read (process_active_file_write_case,
- &process_active_write_data);
+ if (vfm_source != NULL)
+ vfm_source->class->read (vfm_source, process_active_file_write_case,
+ &process_active_write_data);
endfunc (aux);
close_active_file (&process_active_write_data);
process_active_file_output_case (void)
{
vfm_sink_info.ncases++;
- vfm_sink->write ();
+ vfm_sink->class->write (vfm_sink, temp_case);
}
\f
/* Opening the active file. */
if (vfm_sink == NULL)
{
- if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
- && !paging)
+ if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
+ && !workspace_overflow)
{
msg (MW, _("Workspace overflow predicted. Max workspace is "
"currently set to %d KB (%d cases at %d bytes each). "
- "Paging active file to disk."),
- MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
+ "Writing active file to disk."),
+ set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
vfm_sink_info.case_size);
- paging = 1;
+ workspace_overflow = 1;
}
-
- vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+
+ if (workspace_overflow)
+ vfm_sink = create_case_sink (&disk_sink_class, NULL);
+ else
+ vfm_sink = create_case_sink (&memory_sink_class, NULL);
}
}
else
compaction_necessary = 0;
- if (vfm_sink->init)
- vfm_sink->init ();
+ if (vfm_sink->class->open != NULL)
+ vfm_sink->class->open (vfm_sink);
}
/* Prepares the temporary case and compaction case. */
finish_compaction ();
/* Old data sink --> New data source. */
- if (vfm_source && vfm_source->destroy_source)
- vfm_source->destroy_source ();
-
- vfm_source = vfm_sink;
+ if (vfm_source != NULL)
+ {
+ if (vfm_source->class->destroy != NULL)
+ vfm_source->class->destroy (vfm_source);
+ free (vfm_source);
+ }
+
+ vfm_source = vfm_sink->class->make_source (vfm_sink);
vfm_source_info.ncases = vfm_sink_info.ncases;
vfm_source_info.nval = compaction_nval;
vfm_source_info.case_size = (sizeof (struct ccase)
+ (compaction_nval - 1) * sizeof (union value));
- if (vfm_source->mode)
- vfm_source->mode ();
/* Old data sink is gone now. */
+ free (vfm_sink);
vfm_sink = NULL;
/* Cancel TEMPORARY. */
\f
/* Disk case stream. */
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
-
/* Initializes the disk sink. */
static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
{
- disk_sink_file = tmpfile ();
- if (!disk_sink_file)
+ sink->aux = tmpfile ();
+ if (!sink->aux)
{
msg (ME, _("An error occurred attempting to create a temporary "
"file for use as the active file: %s."),
}
}
-/* Reads all cases from the disk source and passes them one by one to
- write_case(). */
-static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
- int i;
-
- for (i = 0; i < vfm_source_info.ncases; i++)
- {
- if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
- {
- msg (ME, _("An error occurred while attempting to read from "
- "a temporary file created for the active file: %s."),
- strerror (errno));
- err_failure ();
- return;
- }
-
- if (!write_case (wc_data))
- return;
- }
-}
-
/* Writes temp_case to the disk sink. */
static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, struct ccase *c)
{
+ FILE *file = sink->aux;
union value *src_case;
if (compaction_necessary)
{
- compact_case (compaction_case, temp_case);
- src_case = (union value *) compaction_case;
+ compact_case (compaction_case, c);
+ src_case = compaction_case->data;
}
- else src_case = (union value *) temp_case;
+ else src_case = c->data;
- if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
- disk_sink_file) != 1)
+ if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
{
msg (ME, _("An error occurred while attempting to write to a "
"temporary file used as the active file: %s."),
}
}
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
{
- /* Rewind the sink. */
- if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+ FILE *file = sink->aux;
+ if (file != NULL)
+ fclose (file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+ back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink)
+{
+ FILE *file = sink->aux;
+
+ /* Rewind the file. */
+ assert (file != NULL);
+ if (fseek (file, 0, SEEK_SET) != 0)
{
msg (ME, _("An error occurred while attempting to rewind a "
"temporary file used as the active file: %s."),
err_failure ();
}
- /* Sink --> source variables. */
- disk_source_file = disk_sink_file;
+ return create_case_source (&disk_source_class, file);
}
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class =
+ {
+ "disk",
+ disk_sink_create,
+ disk_sink_write,
+ disk_sink_destroy,
+ disk_sink_make_source,
+ };
+\f
+/* Disk source. */
+
+/* Reads all cases from the disk source and passes them one by one to
+ write_case(). */
static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+ write_case_func *write_case, write_case_data wc_data)
{
- if (disk_source_file)
+ FILE *file = source->aux;
+ int i;
+
+ for (i = 0; i < vfm_source_info.ncases; i++)
{
- fclose (disk_source_file);
- disk_source_file = NULL;
+ if (!fread (temp_case, vfm_source_info.case_size, 1, file))
+ {
+ msg (ME, _("An error occurred while attempting to read from "
+ "a temporary file created for the active file: %s."),
+ strerror (errno));
+ err_failure ();
+ return;
+ }
+
+ if (!write_case (wc_data))
+ return;
}
}
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
{
- if (disk_sink_file)
- {
- fclose (disk_sink_file);
- disk_sink_file = NULL;
- }
+ FILE *file = source->aux;
+ if (file != NULL)
+ fclose (file);
}
-/* Disk stream. */
-struct case_stream vfm_disk_stream =
+/* Disk source. */
+const struct case_source_class disk_source_class =
{
- disk_stream_init,
- disk_stream_read,
- disk_stream_write,
- disk_stream_mode,
- disk_stream_destroy_source,
- disk_stream_destroy_sink,
"disk",
+ disk_source_read,
+ disk_source_destroy,
};
\f
/* Memory case stream. */
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+ {
+ int max_cases; /* Maximum cases before switching to disk. */
+ struct case_list *head; /* First case in list. */
+ struct case_list *tail; /* Last case in list. */
+ };
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info
+ {
+ struct case_list *cases; /* List of cases. */
+ };
-/* Initializes the memory stream variables for writing. */
static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink)
{
- memory_sink_cases = NULL;
- memory_sink_iter = NULL;
+ struct memory_sink_info *info;
- assert (compaction_nval);
- memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+ sink->aux = info = xmalloc (sizeof *info);
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
- while (memory_source_cases != NULL)
- {
- memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-
- {
- struct case_list *current = memory_source_cases;
- memory_source_cases = memory_source_cases->next;
- free (current);
- }
-
- if (!write_case (wc_data))
- return;
- }
+ assert (compaction_nval > 0);
+ info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
+ info->head = info->tail = NULL;
}
-/* Writes temp_case to the memory stream. */
static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, struct ccase *c)
{
- struct case_list *new_case = malloc (sizeof (struct case_list)
- + ((compaction_nval - 1)
- * sizeof (union value)));
+ struct memory_sink_info *info = sink->aux;
+ size_t case_size;
+ struct case_list *new_case;
+
+ case_size = sizeof (struct case_list)
+ + ((compaction_nval - 1) * sizeof (union value));
+ new_case = malloc (case_size);
/* If we've got memory to spare then add it to the linked list. */
- if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+ if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
{
- if (compaction_necessary)
- compact_case (&new_case->c, temp_case);
+ /* Append case to linked list. */
+ new_case->next = NULL;
+ if (info->head != NULL)
+ info->tail->next = new_case;
else
- memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+ info->head = new_case;
+ info->tail = new_case;
- /* Append case to linked list. */
- if (memory_sink_cases)
- memory_sink_iter = memory_sink_iter->next = new_case;
+ /* Copy data into case. */
+ if (compaction_necessary)
+ compact_case (&new_case->c, c);
else
- memory_sink_iter = memory_sink_cases = new_case;
+ memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
}
else
{
/* Notify the user. */
if (!new_case)
- msg (MW, _("Virtual memory exhausted. Paging active file "
+ msg (MW, _("Virtual memory exhausted. Writing active file "
"to disk."));
else
msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
- "overflowed. Paging active file to disk."),
- MAX_WORKSPACE / 1024, memory_sink_max_cases,
+ "overflowed. Writing active file to disk."),
+ set_max_workspace / 1024, info->max_cases,
compaction_nval * sizeof (union value));
free (new_case);
/* Switch to a disk sink. */
- vfm_sink = &vfm_disk_stream;
- vfm_sink->init ();
- paging = 1;
-
- /* Terminate the list. */
- if (memory_sink_iter)
- memory_sink_iter->next = NULL;
+ vfm_sink = create_case_sink (&disk_sink_class, NULL);
+ vfm_sink->class->open (vfm_sink);
+ workspace_overflow = 1;
/* Write the cases to disk and destroy them. We can't call
vfm->sink->write() because of compaction. */
- for (cur = memory_sink_cases; cur; cur = next)
+ for (cur = info->head; cur; cur = next)
{
next = cur->next;
if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
- disk_sink_file) != 1)
+ vfm_sink->aux) != 1)
{
msg (ME, _("An error occurred while attempting to "
"write to a temporary file created as the "
- "active file, while paging to disk: %s."),
+ "active file: %s."),
strerror (errno));
err_failure ();
}
}
/* Write the current case to disk. */
- vfm_sink->write ();
+ vfm_sink->class->write (vfm_sink, c);
}
}
/* If the data is stored in memory, causes it to be written to disk.
To be called only *between* procedure()s, not within them. */
void
-page_to_disk (void)
+write_active_file_to_disk (void)
{
- if (vfm_source == &vfm_memory_stream)
+ if (case_source_is_class (vfm_source, &memory_source_class))
{
+ struct memory_source_info *info = vfm_source->aux;
+
/* Switch to a disk sink. */
- vfm_sink = &vfm_disk_stream;
- vfm_sink->init ();
- paging = 1;
+ vfm_sink = create_case_sink (&disk_sink_class, NULL);
+ vfm_sink->class->open (vfm_sink);
+ workspace_overflow = 1;
/* Write the cases to disk and destroy them. We can't call
vfm->sink->write() because of compaction. */
{
struct case_list *cur, *next;
- for (cur = memory_source_cases; cur; cur = next)
+ for (cur = info->cases; cur; cur = next)
{
next = cur->next;
if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
- disk_sink_file) != 1)
+ vfm_sink->aux) != 1)
{
msg (ME, _("An error occurred while attempting to "
"write to a temporary file created as the "
- "active file, while paging to disk: %s."),
+ "active file: %s."),
strerror (errno));
err_failure ();
}
}
}
- vfm_source = &vfm_disk_stream;
- vfm_source->mode ();
-
+ vfm_source = vfm_sink->class->make_source (vfm_sink);
vfm_sink = NULL;
}
}
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
{
- /* Terminate the list. */
- if (memory_sink_iter)
- memory_sink_iter->next = NULL;
+ struct memory_sink_info *info = sink->aux;
+ struct case_list *cur, *next;
+
+ for (cur = info->head; cur; cur = next)
+ {
+ next = cur->next;
+ free (cur);
+ }
+ free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+ struct memory_sink_info *sink_info = sink->aux;
+ struct memory_source_info *source_info;
+
+ source_info = xmalloc (sizeof *source_info);
+ source_info->cases = sink_info->head;
+
+ free (sink_info);
- /* Sink --> source variables. */
- memory_source_cases = memory_sink_cases;
- memory_sink_cases = NULL;
+ return create_case_source (&memory_source_class, source_info);
}
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class =
+ {
+ "memory",
+ memory_sink_create,
+ memory_sink_write,
+ memory_sink_destroy,
+ memory_sink_make_source,
+ };
+
+/* Reads the case stream from memory and passes it to write_case(). */
static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+ write_case_func *write_case, write_case_data wc_data)
{
- struct case_list *cur, *next;
-
- for (cur = memory_source_cases; cur; cur = next)
+ struct memory_source_info *info = source->aux;
+
+ while (info->cases != NULL)
{
- next = cur->next;
- free (cur);
+ struct case_list *iter = info->cases;
+ info->cases = iter->next;
+ memcpy (temp_case, &iter->c, vfm_source_info.case_size);
+ free (iter);
+
+ if (!write_case (wc_data))
+ return;
}
- memory_source_cases = NULL;
}
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
{
+ struct memory_source_info *info = source->aux;
struct case_list *cur, *next;
- for (cur = memory_sink_cases; cur; cur = next)
+ for (cur = info->cases; cur; cur = next)
{
next = cur->next;
free (cur);
}
- memory_sink_cases = NULL;
+ free (info);
}
-
+
+struct case_list *
+memory_source_get_cases (const struct case_source *source)
+{
+ struct memory_source_info *info = source->aux;
+
+ return info->cases;
+}
+
+void
+memory_source_set_cases (const struct case_source *source,
+ struct case_list *cases)
+{
+ struct memory_source_info *info = source->aux;
+
+ info->cases = cases;
+}
+
/* Memory stream. */
-struct case_stream vfm_memory_stream =
+const struct case_source_class memory_source_class =
{
- memory_stream_init,
- memory_stream_read,
- memory_stream_write,
- memory_stream_mode,
- memory_stream_destroy_source,
- memory_stream_destroy_sink,
"memory",
+ memory_source_read,
+ memory_source_destroy,
};
\f
#include "debug-print.h"
lag_case ();
vfm_sink_info.ncases++;
- vfm_sink->write ();
+ vfm_sink->class->write (vfm_sink, temp_case);
if (dict_get_case_limit (default_dict))
more_cases = (vfm_sink_info.ncases
dict_compact_values (default_dict);
}
-
+struct case_source *
+create_case_source (const struct case_source_class *class, void *aux)
+{
+ struct case_source *source = xmalloc (sizeof *source);
+ source->class = class;
+ source->aux = aux;
+ return source;
+}
+
+int
+case_source_is_complex (const struct case_source *source)
+{
+ return source != NULL && (source->class == &input_program_source_class
+ || source->class == &file_type_source_class);
+}
+
+int
+case_source_is_class (const struct case_source *source,
+ const struct case_source_class *class)
+{
+ return source != NULL && source->class == class;
+
+}
+
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux)
+{
+ struct case_sink *sink = xmalloc (sizeof *sink);
+ sink->class = class;
+ sink->aux = aux;
+ return sink;
+}
+