Beginning of VFM cleanup.
[pspp-builds.git] / src / vfm.c
index 42e6b7ccd29dfef5e2b5792cbe16c705fdceaec9..82af200015d6aa40dc19f4fb6215d8a7c0480201 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
@@ -33,6 +33,7 @@
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
 #include "debug-print.h"
 
@@ -58,11 +60,11 @@ struct write_case_data
     void *aux;
   };
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
@@ -82,9 +84,11 @@ int compaction_nval;
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
@@ -157,7 +161,9 @@ procedure (void (*beginfunc) (void *),
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read (procedure_write_case, &procedure_write_data);
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             procedure_write_case, &procedure_write_data);
   close_active_file (&procedure_write_data);
 
   assert (--recursive_call == 0);
@@ -197,9 +203,9 @@ process_active_file (void (*beginfunc) (void *),
   beginfunc (aux);
   
   /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read (process_active_file_write_case,
-                      &process_active_write_data);
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, process_active_file_write_case,
+                             &process_active_write_data);
   
   endfunc (aux);
   close_active_file (&process_active_write_data);
@@ -253,7 +259,7 @@ void
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, temp_case);
 }
 \f
 /* Opening the active file. */
@@ -285,19 +291,22 @@ prepare_for_writing (void)
   
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
+      if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
+         && !workspace_overflow)
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
+                    "Writing active file to disk."),
+              set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
               vfm_sink_info.case_size);
          
-         paging = 1;
+         workspace_overflow = 1;
        }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
@@ -333,8 +342,8 @@ arrange_compaction (void)
   else
     compaction_necessary = 0;
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 }
 
 /* Prepares the temporary case and compaction case. */
@@ -511,18 +520,21 @@ close_active_file (struct write_case_data *data)
     finish_compaction ();
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  vfm_source = vfm_sink->class->make_source (vfm_sink);
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
   /* Cancel TEMPORARY. */
@@ -557,16 +569,12 @@ close_active_file (struct write_case_data *data)
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
-
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  sink->aux = tmpfile ();
+  if (!sink->aux)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -575,44 +583,21 @@ disk_stream_init (void)
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case (wc_data))
-       return;
-    }
-}
-
 /* Writes temp_case to the disk sink. */
 static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, struct ccase *c)
 {
+  FILE *file = sink->aux;
   union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -621,12 +606,25 @@ disk_stream_write (void)
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  FILE *file = sink->aux;
+  if (file != NULL)
+    fclose (file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
+{
+  FILE *file = sink->aux;
+  
+  /* Rewind the file. */
+  assert (file != NULL);
+  if (fseek (file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -634,107 +632,118 @@ disk_stream_mode (void)
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, file);
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  write_case_func *write_case, write_case_data wc_data)
 {
-  if (disk_source_file)
+  FILE *file = source->aux;
+  int i;
+
+  for (i = 0; i < vfm_source_info.ncases; i++)
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (temp_case, vfm_source_info.case_size, 1, file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         return;
+       }
+
+      if (!write_case (wc_data))
+       return;
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  FILE *file = source->aux;
+  if (file != NULL)
+    fclose (file);
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    struct case_list *cases;    /* List of cases. */
+  };
 
-/* Initializes the memory stream variables for writing. */
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case (wc_data))
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
+  info->head = info->tail = NULL;
 }
 
-/* Writes temp_case to the memory stream. */
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, struct ccase *c) 
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
+      /* Append case to linked list. */
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+        info->head = new_case;
+      info->tail = new_case;
 
-      /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
@@ -743,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
@@ -780,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
@@ -817,64 +824,107 @@ page_to_disk (void)
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
+
+  source_info = xmalloc (sizeof *source_info);
+  source_info->cases = sink_info->head;
+
+  free (sink_info);
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  return create_case_source (&memory_source_class, source_info);
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    write_case_func *write_case, write_case_data wc_data)
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      info->cases = iter->next;
+      memcpy (temp_case, &iter->c, vfm_source_info.case_size);
+      free (iter);
+      
+      if (!write_case (wc_data))
+       return;
     }
-  memory_source_cases = NULL;
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
-  
+
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
 #include "debug-print.h"
@@ -935,7 +985,7 @@ procedure_write_case (write_case_data wc_data)
            lag_case ();
          
          vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, temp_case);
 
          if (dict_get_case_limit (default_dict))
            more_cases = (vfm_sink_info.ncases
@@ -1233,4 +1283,36 @@ finish_compaction (void)
   dict_compact_values (default_dict);
 }
 
-  
+struct case_source *
+create_case_source (const struct case_source_class *class, void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->aux = aux;
+  return source;
+}
+
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+
+}
+
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
+}
+