Add multipass procedures. Add two-pass moments calculation. Rewrite
[pspp-builds.git] / src / vfm.c
index b5d2a8d3564e76641e692bace61df3fe3befcf76..bfd1ef017a2950ee85993d4930bc48942a9f4d61 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
@@ -20,7 +20,7 @@
 #include <config.h>
 #include "vfm.h"
 #include "vfmP.h"
-#include <assert.h>
+#include "error.h"
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -28,6 +28,7 @@
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
+#include "casefile.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
@@ -73,12 +74,6 @@ struct case_sink *vfm_sink;
    stored, zero otherwise. */
 static int compaction_necessary;
 
-/* Nonzero means that we've overflowed our allotted workspace.
-   After that happens once during a session, we always store the
-   active file on disk instead of in memory.  (This policy may be
-   too aggressive.) */
-static int workspace_overflow = 0;
-
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
@@ -356,6 +351,7 @@ compact_case (struct ccase *dest, const struct ccase *src)
 
   /* Copy all the variables except scratch variables from SRC to
      DEST. */
+  /* FIXME: this should be temp_dict not default_dict I guess. */
   var_cnt = dict_get_var_cnt (default_dict);
   for (i = 0; i < var_cnt; i++)
     {
@@ -461,21 +457,9 @@ close_active_file (void)
 /* Information about storage sink or source. */
 struct storage_stream_info 
   {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Number of bytes in case. */
-    enum { DISK, MEMORY } mode; /* Where is data stored? */
-
-    /* Disk storage.  */
-    FILE *file;                 /* Data file. */
-
-    /* Memory storage. */
-    int max_cases;              /* Maximum cases before switching to disk. */
-    struct case_list *head;     /* First case in list. */
-    struct case_list *tail;     /* Last case in list. */
+    struct casefile *casefile;  /* Storage. */
   };
 
-static void open_storage_file (struct storage_stream_info *info);
-
 /* Initializes a storage sink. */
 static void
 storage_sink_open (struct case_sink *sink)
@@ -483,91 +467,14 @@ storage_sink_open (struct case_sink *sink)
   struct storage_stream_info *info;
 
   sink->aux = info = xmalloc (sizeof *info);
-  info->case_cnt = 0;
-  info->case_size = sink->value_cnt * sizeof (union value);
-  info->file = NULL;
-  info->max_cases = 0;
-  info->head = info->tail = NULL;
-  if (workspace_overflow) 
-    {
-      info->mode = DISK;
-      open_storage_file (info);
-    }
-  else 
-    {
-      info->mode = MEMORY; 
-      info->max_cases = (get_max_workspace()
-                         / (sizeof (struct case_list) + info->case_size));
-    }
-}
-
-/* Creates a new temporary file and puts it into INFO. */
-static void
-open_storage_file (struct storage_stream_info *info) 
-{
-  info->file = tmpfile ();
-  if (info->file == NULL)
-    {
-      msg (ME, _("An error occurred creating a temporary "
-                 "file for use as the active file: %s."),
-           strerror (errno));
-      err_failure ();
-    }
-}
-
-/* Writes the VALUE_CNT values in VALUES to FILE. */
-static void
-write_storage_file (FILE *file, const union value *values, size_t value_cnt) 
-{
-  if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
-    {
-      msg (ME, _("An error occurred writing to a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
-}
-
-/* If INFO represents records in memory, moves them to disk.
-   Each comprises VALUE_CNT `union value's. */
-static void
-storage_to_disk (struct storage_stream_info *info, size_t value_cnt) 
-{
-  struct case_list *cur, *next;
-
-  if (info->mode == MEMORY) 
-    {
-      info->mode = DISK;
-      open_storage_file (info);
-      for (cur = info->head; cur; cur = next)
-        {
-          next = cur->next;
-          write_storage_file (info->file, cur->c.data, value_cnt);
-          free (cur);
-        }
-      info->head = info->tail = NULL; 
-    }
+  info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
 }
 
 /* Destroys storage stream represented by INFO. */
 static void
 destroy_storage_stream_info (struct storage_stream_info *info) 
 {
-  if (info->mode == DISK) 
-    {
-      if (info->file != NULL)
-        fclose (info->file); 
-    }
-  else 
-    {
-      struct case_list *cur, *next;
-  
-      for (cur = info->head; cur; cur = next)
-        {
-          next = cur->next;
-          free (cur);
-        }
-    }
+  casefile_destroy (info->casefile);
   free (info); 
 }
 
@@ -577,39 +484,7 @@ storage_sink_write (struct case_sink *sink, const struct ccase *c)
 {
   struct storage_stream_info *info = sink->aux;
 
-  info->case_cnt++;
-  if (info->mode == MEMORY) 
-    {
-      struct case_list *new_case;
-
-      /* Copy case. */
-      new_case = xmalloc (sizeof (struct case_list)
-                          + ((sink->value_cnt - 1) * sizeof (union value)));
-      memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
-
-      /* Append case to linked list. */
-      new_case->next = NULL;
-      if (info->head != NULL)
-        info->tail->next = new_case;
-      else
-        info->head = new_case;
-      info->tail = new_case;
-
-      /* Dump all the cases to disk if we've run out of
-         workspace. */
-      if (info->case_cnt > info->max_cases) 
-        {
-          workspace_overflow = 1;
-          msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                     "overflowed.  Writing active file to disk."),
-               get_max_workspace() / 1024, info->max_cases,
-               sizeof (struct case_list) + info->case_size);
-
-          storage_to_disk (info, sink->value_cnt);
-        }
-    }
-  else 
-    write_storage_file (info->file, c->data, sink->value_cnt);
+  casefile_append (info->casefile, c);
 }
 
 /* Destroys internal data in SINK. */
@@ -624,22 +499,7 @@ storage_sink_destroy (struct case_sink *sink)
 static struct case_source *
 storage_sink_make_source (struct case_sink *sink) 
 {
-  struct storage_stream_info *info = sink->aux;
-
-  if (info->mode == DISK) 
-    {
-      /* Rewind the file. */
-      assert (info->file != NULL);
-      if (fseek (info->file, 0, SEEK_SET) != 0)
-        {
-          msg (ME, _("An error occurred while attempting to rewind a "
-                     "temporary file used as the active file: %s."),
-               strerror (errno));
-          err_failure ();
-        }
-    }
-
-  return create_case_source (&storage_source_class, sink->dict, info); 
+  return create_case_source (&storage_source_class, sink->dict, sink->aux);
 }
 
 /* Storage sink. */
@@ -661,51 +521,28 @@ storage_source_count (const struct case_source *source)
 {
   struct storage_stream_info *info = source->aux;
 
-  return info->case_cnt;
+  return casefile_get_case_cnt (info->casefile);
 }
 
 /* Reads all cases from the storage source and passes them one by one to
    write_case(). */
 static void
 storage_source_read (struct case_source *source,
-                     struct ccase *c,
+                     struct ccase *output_case,
                      write_case_func *write_case, write_case_data wc_data)
 {
   struct storage_stream_info *info = source->aux;
+  const struct ccase *casefile_case;
+  struct casereader *reader;
 
-  if (info->mode == DISK) 
-    {
-      int i;
-
-      for (i = 0; i < info->case_cnt; i++)
-        {
-          if (!fread (c, info->case_size, 1, info->file))
-            {
-              msg (ME, _("An error occurred while attempting to read from "
-                         "a temporary file created for the active file: %s."),
-                   strerror (errno));
-              err_failure ();
-              break;
-            }
-
-          if (!write_case (wc_data))
-            break;
-        }
-    }
-  else 
+  reader = casefile_get_reader (info->casefile);
+  while (casereader_read (reader, &casefile_case))
     {
-      while (info->head != NULL) 
-        {
-          struct case_list *iter = info->head;
-          memcpy (c, &iter->c, info->case_size);
-          if (!write_case (wc_data)) 
-            break;
-            
-          info->head = iter->next;
-          free (iter);
-        }
-      info->tail = NULL;
+      memcpy (output_case, casefile_case,
+              casefile_get_case_size (info->casefile));
+      write_case (wc_data);
     }
+  casereader_destroy (reader);
 }
 
 /* Destroys the source's internal data. */
@@ -724,44 +561,13 @@ const struct case_source_class storage_source_class =
     storage_source_destroy,
   };
 
-/* Returns nonzero only if SOURCE is stored on disk (instead of
-   in memory). */
-int
-storage_source_on_disk (const struct case_source *source) 
+struct casefile *
+storage_source_get_casefile (struct case_source *source) 
 {
   struct storage_stream_info *info = source->aux;
 
-  return info->mode == DISK;
-}
-
-/* Returns the list of cases in storage source SOURCE. */
-struct case_list *
-storage_source_get_cases (const struct case_source *source) 
-{
-  struct storage_stream_info *info = source->aux;
-
-  assert (info->mode == MEMORY);
-  return info->head;
-}
-
-/* Sets the list of cases in memory source SOURCE to CASES. */
-void
-storage_source_set_cases (const struct case_source *source,
-                          struct case_list *cases) 
-{
-  struct storage_stream_info *info = source->aux;
-
-  assert (info->mode == MEMORY);
-  info->head = cases;
-}
-
-/* If SOURCE has its cases in memory, writes them to disk. */
-void
-storage_source_to_disk (struct case_source *source) 
-{
-  struct storage_stream_info *info = source->aux;
-
-  storage_to_disk (info, source->value_cnt);
+  assert (source->class == &storage_source_class);
+  return info->casefile;
 }
 \f
 /* Null sink.  Used by a few procedures that keep track of output
@@ -1037,3 +843,74 @@ dump_splits (struct ccase *c)
   tab_flags (t, SOMF_NO_TITLE);
   tab_submit (t);
 }
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+   multipass procedure. */
+struct multipass_split_aux_data 
+  {
+    struct ccase *prev_case;    /* Data in previous case. */
+    struct casefile *casefile;  /* Accumulates data for a split. */
+
+    /* Function to call with the accumulated data. */
+    void (*split_func) (const struct casefile *, void *);
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+                                                     void *),
+                                 void *func_aux) 
+{
+  struct multipass_split_aux_data aux;
+
+  assert (split_func != NULL);
+
+  aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+  aux.casefile = NULL;
+  aux.split_func = split_func;
+  aux.func_aux = func_aux;
+  
+  procedure (multipass_split_callback, &aux);
+
+  if (aux.casefile != NULL)
+    multipass_split_output (&aux);
+  free (aux.prev_case);
+}
+
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
+{
+  struct multipass_split_aux_data *aux = aux_;
+
+  /* Start a new series if needed. */
+  if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
+    {
+      /* Pass any cases to split_func. */
+      if (aux->casefile != NULL)
+        multipass_split_output (aux);
+
+      /* Start a new casefile. */
+      aux->casefile = casefile_create (dict_get_case_size (default_dict));
+
+      /* Record split values. */
+      dump_splits (c);
+      memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
+    }
+
+  casefile_append (aux->casefile, c);
+
+  return 1;
+}
+
+static void
+multipass_split_output (struct multipass_split_aux_data *aux)
+{
+  assert (aux->casefile != NULL);
+  aux->split_func (aux->casefile, aux->func_aux);
+  casefile_destroy (aux->casefile);
+  aux->casefile = NULL;
+}