#include <unistd.h> /* Required by SunOS4. */
#endif
#include "alloc.h"
+#include "casefile.h"
#include "do-ifP.h"
#include "error.h"
#include "expr.h"
stored, zero otherwise. */
static int compaction_necessary;
-/* Nonzero means that we've overflowed our allotted workspace.
- After that happens once during a session, we always store the
- active file on disk instead of in memory. (This policy may be
- too aggressive.) */
-static int workspace_overflow = 0;
-
/* Time at which vfm was last invoked. */
time_t last_vfm_invocation;
/* Copy all the variables except scratch variables from SRC to
DEST. */
+ /* FIXME: this should be temp_dict not default_dict I guess. */
var_cnt = dict_get_var_cnt (default_dict);
for (i = 0; i < var_cnt; i++)
{
/* Information about storage sink or source. */
struct storage_stream_info
{
- size_t case_cnt; /* Number of cases. */
- size_t case_size; /* Number of bytes in case. */
- enum { DISK, MEMORY } mode; /* Where is data stored? */
-
- /* Disk storage. */
- FILE *file; /* Data file. */
-
- /* Memory storage. */
- int max_cases; /* Maximum cases before switching to disk. */
- struct case_list *head; /* First case in list. */
- struct case_list *tail; /* Last case in list. */
+ struct casefile *casefile; /* Storage. */
};
-static void open_storage_file (struct storage_stream_info *info);
-
/* Initializes a storage sink. */
static void
storage_sink_open (struct case_sink *sink)
struct storage_stream_info *info;
sink->aux = info = xmalloc (sizeof *info);
- info->case_cnt = 0;
- info->case_size = sink->value_cnt * sizeof (union value);
- info->file = NULL;
- info->max_cases = 0;
- info->head = info->tail = NULL;
- if (workspace_overflow)
- {
- info->mode = DISK;
- open_storage_file (info);
- }
- else
- {
- info->mode = MEMORY;
- info->max_cases = (get_max_workspace()
- / (sizeof (struct case_list) + info->case_size));
- }
-}
-
-/* Creates a new temporary file and puts it into INFO. */
-static void
-open_storage_file (struct storage_stream_info *info)
-{
- info->file = tmpfile ();
- if (info->file == NULL)
- {
- msg (ME, _("An error occurred creating a temporary "
- "file for use as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
-}
-
-/* Writes the VALUE_CNT values in VALUES to FILE. */
-static void
-write_storage_file (FILE *file, const union value *values, size_t value_cnt)
-{
- if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
- {
- msg (ME, _("An error occurred writing to a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
-}
-
-/* If INFO represents records in memory, moves them to disk.
- Each comprises VALUE_CNT `union value's. */
-static void
-storage_to_disk (struct storage_stream_info *info, size_t value_cnt)
-{
- struct case_list *cur, *next;
-
- if (info->mode == MEMORY)
- {
- info->mode = DISK;
- open_storage_file (info);
- for (cur = info->head; cur; cur = next)
- {
- next = cur->next;
- write_storage_file (info->file, cur->c.data, value_cnt);
- free (cur);
- }
- info->head = info->tail = NULL;
- }
+ info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
}
/* Destroys storage stream represented by INFO. */
static void
destroy_storage_stream_info (struct storage_stream_info *info)
{
- if (info->mode == DISK)
- {
- if (info->file != NULL)
- fclose (info->file);
- }
- else
- {
- struct case_list *cur, *next;
-
- for (cur = info->head; cur; cur = next)
- {
- next = cur->next;
- free (cur);
- }
- }
+ casefile_destroy (info->casefile);
free (info);
}
{
struct storage_stream_info *info = sink->aux;
- info->case_cnt++;
- if (info->mode == MEMORY)
- {
- struct case_list *new_case;
-
- /* Copy case. */
- new_case = xmalloc (sizeof (struct case_list)
- + ((sink->value_cnt - 1) * sizeof (union value)));
- memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
-
- /* Append case to linked list. */
- new_case->next = NULL;
- if (info->head != NULL)
- info->tail->next = new_case;
- else
- info->head = new_case;
- info->tail = new_case;
-
- /* Dump all the cases to disk if we've run out of
- workspace. */
- if (info->case_cnt > info->max_cases)
- {
- workspace_overflow = 1;
- msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
- "overflowed. Writing active file to disk."),
- get_max_workspace() / 1024, info->max_cases,
- sizeof (struct case_list) + info->case_size);
-
- storage_to_disk (info, sink->value_cnt);
- }
- }
- else
- write_storage_file (info->file, c->data, sink->value_cnt);
+ casefile_append (info->casefile, c);
}
/* Destroys internal data in SINK. */
static struct case_source *
storage_sink_make_source (struct case_sink *sink)
{
- struct storage_stream_info *info = sink->aux;
-
- if (info->mode == DISK)
- {
- /* Rewind the file. */
- assert (info->file != NULL);
- if (fseek (info->file, 0, SEEK_SET) != 0)
- {
- msg (ME, _("An error occurred while attempting to rewind a "
- "temporary file used as the active file: %s."),
- strerror (errno));
- err_failure ();
- }
- }
-
- return create_case_source (&storage_source_class, sink->dict, info);
+ return create_case_source (&storage_source_class, sink->dict, sink->aux);
}
/* Storage sink. */
{
struct storage_stream_info *info = source->aux;
- return info->case_cnt;
+ return casefile_get_case_cnt (info->casefile);
}
/* Reads all cases from the storage source and passes them one by one to
write_case(). */
static void
storage_source_read (struct case_source *source,
- struct ccase *c,
+ struct ccase *output_case,
write_case_func *write_case, write_case_data wc_data)
{
struct storage_stream_info *info = source->aux;
+ const struct ccase *casefile_case;
+ struct casereader *reader;
- if (info->mode == DISK)
+ reader = casefile_get_reader (info->casefile);
+ while (casereader_read (reader, &casefile_case))
{
- int i;
-
- for (i = 0; i < info->case_cnt; i++)
- {
- if (!fread (c, info->case_size, 1, info->file))
- {
- msg (ME, _("An error occurred while attempting to read from "
- "a temporary file created for the active file: %s."),
- strerror (errno));
- err_failure ();
- break;
- }
-
- if (!write_case (wc_data))
- break;
- }
- }
- else
- {
- while (info->head != NULL)
- {
- struct case_list *iter = info->head;
- memcpy (c, &iter->c, info->case_size);
- if (!write_case (wc_data))
- break;
-
- info->head = iter->next;
- free (iter);
- }
- info->tail = NULL;
+ memcpy (output_case, casefile_case,
+ casefile_get_case_size (info->casefile));
+ write_case (wc_data);
}
+ casereader_destroy (reader);
}
/* Destroys the source's internal data. */
storage_source_destroy,
};
-/* Returns nonzero only if SOURCE is stored on disk (instead of
- in memory). */
-int
-storage_source_on_disk (const struct case_source *source)
+struct casefile *
+storage_source_get_casefile (struct case_source *source)
{
struct storage_stream_info *info = source->aux;
- return info->mode == DISK;
-}
-
-/* Returns the list of cases in storage source SOURCE. */
-struct case_list *
-storage_source_get_cases (const struct case_source *source)
-{
- struct storage_stream_info *info = source->aux;
-
- assert (info->mode == MEMORY);
- return info->head;
-}
-
-/* Sets the list of cases in memory source SOURCE to CASES. */
-void
-storage_source_set_cases (const struct case_source *source,
- struct case_list *cases)
-{
- struct storage_stream_info *info = source->aux;
-
- assert (info->mode == MEMORY);
- info->head = cases;
-}
-
-/* If SOURCE has its cases in memory, writes them to disk. */
-void
-storage_source_to_disk (struct case_source *source)
-{
- struct storage_stream_info *info = source->aux;
-
- storage_to_disk (info, source->value_cnt);
+ assert (source->class == &storage_source_class);
+ return info->casefile;
}
\f
/* Null sink. Used by a few procedures that keep track of output
tab_flags (t, SOMF_NO_TITLE);
tab_submit (t);
}
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+ multipass procedure. */
+struct multipass_split_aux_data
+ {
+ struct ccase *prev_case; /* Data in previous case. */
+ struct casefile *casefile; /* Accumulates data for a split. */
+
+ /* Function to call with the accumulated data. */
+ void (*split_func) (const struct casefile *, void *);
+ void *func_aux; /* Auxiliary data. */
+ };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+ void *),
+ void *func_aux)
+{
+ struct multipass_split_aux_data aux;
+
+ assert (split_func != NULL);
+
+ aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+ aux.casefile = NULL;
+ aux.split_func = split_func;
+ aux.func_aux = func_aux;
+
+ procedure (multipass_split_callback, &aux);
+
+ if (aux.casefile != NULL)
+ multipass_split_output (&aux);
+ free (aux.prev_case);
+}
+
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
+{
+ struct multipass_split_aux_data *aux = aux_;
+
+ /* Start a new series if needed. */
+ if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
+ {
+ /* Pass any cases to split_func. */
+ if (aux->casefile != NULL)
+ multipass_split_output (aux);
+
+ /* Start a new casefile. */
+ aux->casefile = casefile_create (dict_get_case_size (default_dict));
+
+ /* Record split values. */
+ dump_splits (c);
+ memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
+ }
+
+ casefile_append (aux->casefile, c);
+
+ return 1;
+}
+
+static void
+multipass_split_output (struct multipass_split_aux_data *aux)
+{
+ assert (aux->casefile != NULL);
+ aux->split_func (aux->casefile, aux->func_aux);
+ casefile_destroy (aux->casefile);
+ aux->casefile = NULL;
+}