Actually implement the new procedure code and adapt all of its clients
[pspp] / src / language / data-io / get.c
index 1cd255194f07fc8ae8eaa205c0ea7cb5bebd499d..d06a8e6594fcfeffec9df1cd96eab3c93c4a054e 100644 (file)
 
 #include <data/any-reader.h>
 #include <data/any-writer.h>
-#include <data/case-sink.h>
-#include <data/case-source.h>
 #include <data/case.h>
-#include <data/casefile.h>
-#include <data/fastfile.h>
+#include <data/casereader.h>
+#include <data/casewriter.h>
 #include <data/format.h>
 #include <data/dictionary.h>
 #include <data/por-file-writer.h>
 #include <data/procedure.h>
 #include <data/settings.h>
-#include <data/storage-stream.h>
 #include <data/sys-file-writer.h>
 #include <data/transformations.h>
 #include <data/value-labels.h>
@@ -46,9 +43,9 @@
 #include <libpspp/compiler.h>
 #include <libpspp/hash.h>
 #include <libpspp/message.h>
-#include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -71,25 +68,18 @@ enum reader_command
     IMPORT_CMD
   };
 
-/* Case reader input program. */
-struct case_reader_pgm 
-  {
-    struct any_reader *reader;  /* File reader. */
-    struct case_map *map;       /* Map from file dict to active file dict. */
-    struct ccase bounce;        /* Bounce buffer. */
-  };
-
-static const struct case_source_class case_reader_source_class;
-
-static void case_reader_pgm_free (struct case_reader_pgm *);
+static void get_translate_case (const struct ccase *, struct ccase *,
+                                void *map_);
+static bool get_destroy_case_map (void *map_);
 
 /* Parses a GET or IMPORT command. */
 static int
 parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command type)
 {
-  struct case_reader_pgm *pgm = NULL;
+  struct casereader *reader = NULL;
   struct file_handle *fh = NULL;
   struct dictionary *dict = NULL;
+  struct case_map *map = NULL;
 
   for (;;)
     {
@@ -127,17 +117,10 @@ parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command
       goto error;
     }
               
-  discard_variables (ds);
-
-  pgm = xmalloc (sizeof *pgm);
-  pgm->reader = any_reader_open (fh, &dict);
-  pgm->map = NULL;
-  case_nullify (&pgm->bounce);
-  if (pgm->reader == NULL)
+  reader = any_reader_open (fh, &dict);
+  if (reader == NULL)
     goto error;
 
-  case_create (&pgm->bounce, dict_get_next_value_idx (dict));
-  
   start_case_map (dict);
 
   while (lex_token (lexer) != '.')
@@ -147,83 +130,40 @@ parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command
         goto error;
     }
 
-  pgm->map = finish_case_map (dict);
+  map = finish_case_map (dict);
+  if (map != NULL)
+    reader = casereader_create_translator (reader,
+                                           dict_get_next_value_idx (dict),
+                                           get_translate_case,
+                                           get_destroy_case_map,
+                                           map);
   
-  dict_destroy (dataset_dict (ds));
-  dataset_set_dict (ds, dict);
-
-  proc_set_source (ds, 
-                  create_case_source (&case_reader_source_class, pgm));
+  proc_set_active_file (ds, reader, dict);
 
   return CMD_SUCCESS;
 
  error:
-  case_reader_pgm_free (pgm);
+  casereader_destroy (reader);
   if (dict != NULL)
     dict_destroy (dict);
   return CMD_CASCADING_FAILURE;
 }
 
-/* Frees a struct case_reader_pgm. */
 static void
-case_reader_pgm_free (struct case_reader_pgm *pgm) 
+get_translate_case (const struct ccase *input, struct ccase *output,
+                    void *map_) 
 {
-  if (pgm != NULL) 
-    {
-      any_reader_close (pgm->reader);
-      destroy_case_map (pgm->map);
-      case_destroy (&pgm->bounce);
-      free (pgm);
-    }
+  struct case_map *map = map_;
+  map_case (map, input, output);
 }
 
-/* Clears internal state related to case reader input procedure. */
-static void
-case_reader_source_destroy (struct case_source *source)
-{
-  struct case_reader_pgm *pgm = source->aux;
-  case_reader_pgm_free (pgm);
-}
-
-/* Reads all the cases from the data file into C and passes them
-   to WRITE_CASE one by one, passing WC_DATA.
-   Returns true if successful, false if an I/O error occurred. */
 static bool
-case_reader_source_read (struct case_source *source,
-                         struct ccase *c,
-                         write_case_func *write_case, write_case_data wc_data)
+get_destroy_case_map (void *map_) 
 {
-  struct case_reader_pgm *pgm = source->aux;
-  bool ok = true;
-
-  do
-    {
-      bool got_case;
-      if (pgm->map == NULL)
-        got_case = any_reader_read (pgm->reader, c);
-      else
-        {
-          got_case = any_reader_read (pgm->reader, &pgm->bounce);
-          if (got_case)
-            map_case (pgm->map, &pgm->bounce, c);
-        }
-      if (!got_case)
-        break;
-
-      ok = write_case (wc_data);
-    }
-  while (ok);
-
-  return ok && !any_reader_error (pgm->reader);
+  struct case_map *map = map_;
+  destroy_case_map (map);
+  return true;
 }
-
-static const struct case_source_class case_reader_source_class =
-  {
-    "case reader",
-    NULL,
-    case_reader_source_read,
-    case_reader_source_destroy,
-  };
 \f
 /* GET. */
 int
@@ -255,30 +195,6 @@ enum command_type
     PROC_CMD            /* Procedure. */
   };
 
-/* File writer plus a case map. */
-struct case_writer
-  {
-    struct any_writer *writer;  /* File writer. */
-    struct case_map *map;       /* Map to output file dictionary
-                                   (null pointer for identity mapping). */
-    struct ccase bounce;        /* Bounce buffer for mapping (if needed). */
-  };
-
-/* Destroys AW. */
-static bool
-case_writer_destroy (struct case_writer *aw)
-{
-  bool ok = true;
-  if (aw != NULL) 
-    {
-      ok = any_writer_close (aw->writer);
-      destroy_case_map (aw->map);
-      case_destroy (&aw->bounce);
-      free (aw);
-    }
-  return ok;
-}
-
 /* Parses SAVE or XSAVE or EXPORT or XEXPORT command.
    WRITER_TYPE identifies the type of file to write,
    and COMMAND_TYPE identifies the type of command.
@@ -289,7 +205,7 @@ case_writer_destroy (struct case_writer *aw)
    included.
 
    On failure, returns a null pointer. */
-static struct case_writer *
+static struct casewriter *
 parse_write_command (struct lexer *lexer, struct dataset *ds, 
                     enum writer_type writer_type,
                      enum command_type command_type,
@@ -298,7 +214,8 @@ parse_write_command (struct lexer *lexer, struct dataset *ds,
   /* Common data. */
   struct file_handle *handle; /* Output file. */
   struct dictionary *dict;    /* Dictionary for output file. */
-  struct case_writer *aw;      /* Writer. */  
+  struct casewriter *writer;  /* Writer. */
+  struct case_map *map;       /* Map from input data to data for writer. */
 
   /* Common options. */
   bool print_map;             /* Print map?  TODO. */
@@ -315,10 +232,8 @@ parse_write_command (struct lexer *lexer, struct dataset *ds,
 
   handle = NULL;
   dict = dict_clone (dataset_dict (ds));
-  aw = xmalloc (sizeof *aw);
-  aw->writer = NULL;
-  aw->map = NULL;
-  case_nullify (&aw->bounce);
+  writer = NULL;
+  map = NULL;
   print_map = false;
   print_short_names = false;
   sysfile_opts = sfm_writer_default_options ();
@@ -424,85 +339,70 @@ parse_write_command (struct lexer *lexer, struct dataset *ds,
     }
 
   dict_compact_values (dict);
-  aw->map = finish_case_map (dict);
-  if (aw->map != NULL)
-    case_create (&aw->bounce, dict_get_next_value_idx (dict));
 
   if (fh_get_referent (handle) == FH_REF_FILE) 
     {
       switch (writer_type) 
         {
         case SYSFILE_WRITER:
-          aw->writer = any_writer_from_sfm_writer (
-            sfm_open_writer (handle, dict, sysfile_opts));
+          writer = sfm_open_writer (handle, dict, sysfile_opts);
           break;
         case PORFILE_WRITER:
-          aw->writer = any_writer_from_pfm_writer (
-            pfm_open_writer (handle, dict, porfile_opts));
+          writer = pfm_open_writer (handle, dict, porfile_opts);
           break;
         }
     }
   else
-    aw->writer = any_writer_open (handle, dict);
-  if (aw->writer == NULL)
+    writer = any_writer_open (handle, dict);
+  if (writer == NULL)
     goto error;
+
+  map = finish_case_map (dict);
+  if (map != NULL)
+    writer = casewriter_create_translator (writer,
+                                           get_translate_case,
+                                           get_destroy_case_map,
+                                           map);
   dict_destroy (dict);
   
-  return aw;
+  return writer;
 
  error:
-  case_writer_destroy (aw);
+  casewriter_destroy (writer);
   dict_destroy (dict);
+  destroy_case_map (map);
   return NULL;
 }
-
-/* Writes case C to writer AW. */
-static bool
-case_writer_write_case (struct case_writer *aw, const struct ccase *c) 
-{
-  if (aw->map != NULL) 
-    {
-      map_case (aw->map, c, &aw->bounce);
-      c = &aw->bounce; 
-    }
-  return any_writer_write (aw->writer, c);
-}
 \f
 /* SAVE and EXPORT. */
 
-static bool output_proc (const struct ccase *, void *, const struct dataset *);
-
 /* Parses and performs the SAVE or EXPORT procedure. */
 static int
 parse_output_proc (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type)
 {
   bool retain_unselected;
   struct variable *saved_filter_variable;
-  struct case_writer *aw;
+  struct casewriter *output;
   bool ok;
 
-  aw = parse_write_command (lexer, ds, writer_type, PROC_CMD, &retain_unselected);
-  if (aw == NULL) 
+  output = parse_write_command (lexer, ds, writer_type, PROC_CMD,
+                                &retain_unselected);
+  if (output == NULL) 
     return CMD_CASCADING_FAILURE;
 
   saved_filter_variable = dict_get_filter (dataset_dict (ds));
   if (retain_unselected) 
     dict_set_filter (dataset_dict (ds), NULL);
-  ok = procedure (ds, output_proc, aw);
+
+  casereader_transfer (proc_open (ds), output);
+  ok = casewriter_destroy (output);
+  ok = proc_commit (ds) && ok;
+
   dict_set_filter (dataset_dict (ds), saved_filter_variable);
 
-  case_writer_destroy (aw);
   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
 }
 
-/* Writes case C to file. */
-static bool
-output_proc (const struct ccase *c, void *aw_, const struct dataset *ds UNUSED) 
-{
-  struct case_writer *aw = aw_;
-  return case_writer_write_case (aw, c);
-}
-
 int
 cmd_save (struct lexer *lexer, struct dataset *ds) 
 {
@@ -520,7 +420,7 @@ cmd_export (struct lexer *lexer, struct dataset *ds)
 /* Transformation. */
 struct output_trns 
   {
-    struct case_writer *aw;      /* Writer. */
+    struct casewriter *writer;          /* Writer. */
   };
 
 static trns_proc_func output_trns_proc;
@@ -531,8 +431,8 @@ static int
 parse_output_trns (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type) 
 {
   struct output_trns *t = xmalloc (sizeof *t);
-  t->aw = parse_write_command (lexer, ds, writer_type, XFORM_CMD, NULL);
-  if (t->aw == NULL) 
+  t->writer = parse_write_command (lexer, ds, writer_type, XFORM_CMD, NULL);
+  if (t->writer == NULL) 
     {
       free (t);
       return CMD_CASCADING_FAILURE;
@@ -547,7 +447,9 @@ static int
 output_trns_proc (void *trns_, struct ccase *c, casenumber case_num UNUSED)
 {
   struct output_trns *t = trns_;
-  case_writer_write_case (t->aw, c);
+  struct ccase tmp;
+  case_clone (&tmp, c);
+  casewriter_write (t->writer, &tmp);
   return TRNS_CONTINUE;
 }
 
@@ -557,13 +459,8 @@ static bool
 output_trns_free (void *trns_)
 {
   struct output_trns *t = trns_;
-  bool ok = true;
-
-  if (t != NULL)
-    {
-      ok = case_writer_destroy (t->aw);
-      free (t);
-    }
+  bool ok = casewriter_destroy (t->writer);
+  free (t);
   return ok;
 }
 
@@ -762,10 +659,11 @@ struct mtf_file
     struct mtf_file *next_min; /* Next in the chain of minimums. */
     
     int type;                  /* One of MTF_*. */
-    struct variable **by;      /* List of BY variables for this file. */
+    const struct variable **by;        /* List of BY variables for this file. */
     struct file_handle *handle; /* File handle. */
-    struct any_reader *reader;  /* File reader. */
+    struct casereader *reader;  /* File reader. */
     struct dictionary *dict;   /* Dictionary from system file. */
+    bool active_file;           /* Active file? */
 
     /* IN subcommand. */
     char *in_name;              /* Variable name. */
@@ -788,7 +686,7 @@ struct mtf_proc
     char first[LONG_NAME_LEN + 1], last[LONG_NAME_LEN + 1];
     
     struct dictionary *dict;    /* Dictionary of output file. */
-    struct casefile *output;    /* MATCH FILES output. */
+    struct casewriter *output;  /* MATCH FILES output. */
     struct ccase mtf_case;      /* Case used for output. */
 
     unsigned seq_num;           /* Have we initialized this variable? */
@@ -797,12 +695,12 @@ struct mtf_proc
 
 static bool mtf_free (struct mtf_proc *);
 static bool mtf_close_file (struct mtf_file *);
+static bool mtf_close_all_files (struct mtf_proc *);
 static int mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
+static bool mtf_read_records (struct mtf_proc *);
 static bool mtf_delete_file_in_place (struct mtf_proc *, struct mtf_file **);
 
-static bool mtf_read_nonactive_records (void *);
-static bool mtf_processing_finish (void *, const struct dataset *);
-static bool mtf_processing (const struct ccase *, void *, const struct dataset *);
+static bool mtf_processing (struct mtf_proc *);
 
 static char *var_type_description (struct variable *);
 
@@ -820,9 +718,8 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
   bool used_active_file = false;
   bool saw_table = false;
   bool saw_in = false;
+  bool open_active_file = false;
 
-  bool ok;
-  
   mtf.head = mtf.tail = NULL;
   mtf.by_cnt = 0;
   mtf.first[0] = '\0';
@@ -858,6 +755,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
       file->dict = NULL;
       file->in_name = NULL;
       file->in_var = NULL;
+      file->active_file = false;
       case_nullify (&file->input);
 
       /* FILEs go first, then TABLEs. */
@@ -898,7 +796,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
             }
           used_active_file = true;
 
-          if (!proc_has_source (ds))
+          if (!proc_has_active_file (ds))
             {
               msg (SE, _("Cannot specify the active file since no active "
                          "file has been defined."));
@@ -912,6 +810,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
                    "Temporary transformations will be made permanent."));
 
           file->dict = dataset_dict (ds);
+          file->active_file = true;
         }
       else
         {
@@ -922,8 +821,6 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
           file->reader = any_reader_open (file->handle, &file->dict);
           if (file->reader == NULL)
             goto error;
-
-          case_create (&file->input, dict_get_next_value_idx (file->dict));
         }
 
       while (lex_match (lexer, '/'))
@@ -959,7 +856,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
     {
       if (lex_match (lexer, T_BY))
        {
-          struct variable **by;
+          const struct variable **by;
           
          if (mtf.by_cnt)
            {
@@ -968,7 +865,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
            }
              
          lex_match (lexer, '=');
-         if (!parse_variables (lexer, mtf.dict, &by, &mtf.by_cnt,
+         if (!parse_variables_const (lexer, mtf.dict, &by, &mtf.by_cnt,
                                PV_NO_DUPLICATE | PV_NO_SCRATCH))
            goto error;
 
@@ -1119,95 +1016,56 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
 
      7. Repeat from step 2.
 
-     Unfortunately, this algorithm can't be implemented in a
-     straightforward way because there's no function to read a
-     record from the active file.  Instead, it has to be written
-     as a state machine.
-
      FIXME: For merging large numbers of files (more than 10?) a
      better algorithm would use a heap for finding minimum
      values. */
 
-  if (!used_active_file)
-    discard_variables (ds);
+  if (used_active_file) 
+    {
+      proc_discard_output (ds);
+      for (iter = mtf.head; iter != NULL; iter = iter->next)
+        if (iter->reader == NULL) 
+          iter->reader = proc_open (ds);
+      open_active_file = true;
+    }
 
   dict_compact_values (mtf.dict);
-  mtf.output = fastfile_create (dict_get_next_value_idx (mtf.dict));
+  mtf.output = autopaging_writer_create (dict_get_next_value_idx (mtf.dict));
   mtf.seq_nums = xcalloc (dict_get_var_cnt (mtf.dict), sizeof *mtf.seq_nums);
   case_create (&mtf.mtf_case, dict_get_next_value_idx (mtf.dict));
 
-  if (!mtf_read_nonactive_records (&mtf))
+  if (!mtf_read_records (&mtf)) 
+    goto error; 
+  while (mtf.head && mtf.head->type == MTF_FILE)
+    if (!mtf_processing (&mtf))
+      goto error; 
+  if (!mtf_close_all_files (&mtf))
     goto error;
+  if (open_active_file)
+    proc_commit (ds);
 
-  if (used_active_file) 
-    {
-      proc_set_sink (ds, 
-                    create_case_sink (&null_sink_class, 
-                                      dataset_dict (ds), NULL));
-      ok = 
-       ( procedure (ds, mtf_processing, &mtf) && 
-         mtf_processing_finish (&mtf, ds) ); 
-    }
-  else
-    ok = mtf_processing_finish (&mtf, ds);
-
-  discard_variables (ds);
-
-  dict_destroy (dataset_dict (ds));
-  dataset_set_dict (ds, mtf.dict);
+  proc_set_active_file (ds, casewriter_make_reader (mtf.output), mtf.dict);
   mtf.dict = NULL;
-  proc_set_source (ds, storage_source_create (mtf.output));
   mtf.output = NULL;
-  
-  if (!mtf_free (&mtf))
-    ok = false;
-  return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
-  
+
+  return mtf_free (&mtf) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
+
  error:
+  if (open_active_file)
+    proc_commit (ds);
   mtf_free (&mtf);
   return CMD_CASCADING_FAILURE;
 }
 
-/* Repeats 2...7 an arbitrary number of times. */
-static bool
-mtf_processing_finish (void *mtf_, const struct dataset *ds)
-{
-  struct mtf_proc *mtf = mtf_;
-  struct mtf_file *iter;
-
-  /* Find the active file and delete it. */
-  for (iter = mtf->head; iter; iter = iter->next)
-    if (iter->handle == NULL)
-      {
-        if (!mtf_delete_file_in_place (mtf, &iter))
-          NOT_REACHED ();
-        break;
-      }
-  
-  while (mtf->head && mtf->head->type == MTF_FILE)
-    if (!mtf_processing (NULL, mtf, ds))
-      return false;
-
-  return true;
-}
-
-/* Return a string in a static buffer describing V's variable type and
-   width. */
+/* Return a string in an allocated buffer describing V's variable
+   type and width. */
 static char *
 var_type_description (struct variable *v)
 {
-  static char buf[2][32];
-  static int x = 0;
-  char *s;
-
-  x ^= 1;
-  s = buf[x];
-
   if (var_is_numeric (v))
-    strcpy (s, "numeric");
+    return xstrdup ("numeric");
   else
-    sprintf (s, "string with width %d", var_get_width (v));
-  return s;
+    return xasprintf ("string with width %d", var_get_width (v));
 }
 
 /* Closes FILE and frees its associated data.
@@ -1216,22 +1074,18 @@ var_type_description (struct variable *v)
 static bool
 mtf_close_file (struct mtf_file *file)
 {
-  bool ok = file->reader == NULL || !any_reader_error (file->reader);
+  bool ok = casereader_destroy (file->reader);
   free (file->by);
-  any_reader_close (file->reader);
-  if (file->handle != NULL)
+  if (!file->active_file)
     dict_destroy (file->dict);
-  case_destroy (&file->input);
   free (file->in_name);
+  case_destroy (&file->input);
   free (file);
   return ok;
 }
 
-/* Free all the data for the MATCH FILES procedure.
-   Returns true if successful, false if an I/O error
-   occurred. */
 static bool
-mtf_free (struct mtf_proc *mtf)
+mtf_close_all_files (struct mtf_proc *mtf) 
 {
   struct mtf_file *iter, *next;
   bool ok = true;
@@ -1243,9 +1097,22 @@ mtf_free (struct mtf_proc *mtf)
       if (!mtf_close_file (iter))
         ok = false;
     }
-  
-  if (mtf->dict)
-    dict_destroy (mtf->dict);
+  mtf->head = NULL;
+  return ok;
+}
+
+/* Free all the data for the MATCH FILES procedure.
+   Returns true if successful, false if an I/O error
+   occurred. */
+static bool
+mtf_free (struct mtf_proc *mtf)
+{
+  bool ok;
+
+  ok = mtf_close_all_files (mtf);
+
+  casewriter_destroy (mtf->output);
+  dict_destroy (mtf->dict);
   case_destroy (&mtf->mtf_case);
   free (mtf->seq_nums);
 
@@ -1291,21 +1158,22 @@ mtf_delete_file_in_place (struct mtf_proc *mtf, struct mtf_file **file)
   return mtf_close_file (f);
 }
 
-/* Read a record from every input file except the active file.
+/* Read a record from every input file.
    Returns true if successful, false if an I/O error occurred. */
 static bool
-mtf_read_nonactive_records (void *mtf_)
+mtf_read_records (struct mtf_proc *mtf)
 {
-  struct mtf_proc *mtf = mtf_;
   struct mtf_file *iter, *next;
   bool ok = true;
 
   for (iter = mtf->head; ok && iter != NULL; iter = next)
     {
       next = iter->next;
-      if (iter->handle && !any_reader_read (iter->reader, &iter->input)) 
-        if (!mtf_delete_file_in_place (mtf, &iter))
-          ok = false;
+      if (!casereader_read (iter->reader, &iter->input))
+        {
+          if (!mtf_delete_file_in_place (mtf, &iter))
+            ok = false; 
+        }
     }
   return ok;
 }
@@ -1314,46 +1182,67 @@ mtf_read_nonactive_records (void *mtf_)
    if A == B, 1 if A > B. */
 static inline int
 mtf_compare_BY_values (struct mtf_proc *mtf,
-                       struct mtf_file *a, struct mtf_file *b,
-                       const struct ccase *c)
+                       struct mtf_file *a, struct mtf_file *b)
 {
-  const struct ccase *ca = case_is_null (&a->input) ? c : &a->input;
-  const struct ccase *cb = case_is_null (&b->input) ? c : &b->input;
-  assert ((a == NULL) + (b == NULL) + (c == NULL) <= 1);
-  return case_compare_2dict (ca, cb, a->by, b->by, mtf->by_cnt);
+  return case_compare_2dict (&a->input, &b->input, a->by, b->by, mtf->by_cnt);
 }
 
 /* Perform one iteration of steps 3...7 above.
    Returns true if successful, false if an I/O error occurred. */
 static bool
-mtf_processing (const struct ccase *c, void *mtf_, const struct dataset *ds UNUSED)
+mtf_processing (struct mtf_proc *mtf)
 {
-  struct mtf_proc *mtf = mtf_;
-
-  /* Do we need another record from the active file? */
-  bool read_active_file;
+  struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
+  struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
+  struct mtf_file *iter, *next;
+  struct ccase out_case;
 
-  assert (mtf->head != NULL);
-  if (mtf->head->type == MTF_TABLE)
-    return true;
-  
-  do
+  /* 3. Find the FILE input record(s) that have minimum BY
+     values.  Store all the values from these input records into
+     the output record. */
+  min_head = min_tail = mtf->head;
+  max_head = max_tail = NULL;
+  for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
+       iter = iter->next) 
     {
-      struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
-      struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
-      struct mtf_file *iter, *next;
-
-      read_active_file = false;
+      int cmp = mtf_compare_BY_values (mtf, min_head, iter);
+      if (cmp < 0) 
+        {
+          if (max_head)
+            max_tail = max_tail->next_min = iter;
+          else
+            max_head = max_tail = iter;
+        }
+      else if (cmp == 0) 
+        min_tail = min_tail->next_min = iter;
+      else /* cmp > 0 */
+        {
+          if (max_head)
+            {
+              max_tail->next_min = min_head;
+              max_tail = min_tail;
+            }
+          else
+            {
+              max_head = min_head;
+              max_tail = min_tail;
+            }
+          min_head = min_tail = iter;
+        }
+    }
+      
+  /* 4. For every TABLE, read another record as long as the BY
+     values on the TABLE's input record are less than the FILEs'
+     BY values.  If an exact match is found, store all the values
+     from the TABLE input record into the output record. */
+  for (; iter != NULL; iter = next)
+    {
+      assert (iter->type == MTF_TABLE);
       
-      /* 3. Find the FILE input record(s) that have minimum BY
-         values.  Store all the values from these input records into
-         the output record. */
-      min_head = min_tail = mtf->head;
-      max_head = max_tail = NULL;
-      for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
-          iter = iter->next) 
+      next = iter->next;
+      for (;;) 
         {
-          int cmp = mtf_compare_BY_values (mtf, min_head, iter, c);
+          int cmp = mtf_compare_BY_values (mtf, min_head, iter);
           if (cmp < 0) 
             {
               if (max_head)
@@ -1361,142 +1250,95 @@ mtf_processing (const struct ccase *c, void *mtf_, const struct dataset *ds UNUS
               else
                 max_head = max_tail = iter;
             }
-          else if (cmp == 0) 
-           min_tail = min_tail->next_min = iter;
+          else if (cmp == 0)
+            min_tail = min_tail->next_min = iter;
           else /* cmp > 0 */
             {
-              if (max_head)
-                {
-                  max_tail->next_min = min_head;
-                  max_tail = min_tail;
-                }
-              else
-                {
-                  max_head = min_head;
-                  max_tail = min_tail;
-                }
-              min_head = min_tail = iter;
+              case_destroy (&iter->input);
+              if (casereader_read (iter->reader, &iter->input))
+                continue;
+              if (!mtf_delete_file_in_place (mtf, &iter))
+                return false;
             }
+          break;
         }
-      
-      /* 4. For every TABLE, read another record as long as the BY
-        values on the TABLE's input record are less than the FILEs'
-        BY values.  If an exact match is found, store all the values
-        from the TABLE input record into the output record. */
-      for (; iter != NULL; iter = next)
-       {
-         assert (iter->type == MTF_TABLE);
-      
-         next = iter->next;
-          for (;;) 
-            {
-              int cmp = mtf_compare_BY_values (mtf, min_head, iter, c);
-              if (cmp < 0) 
-                {
-                  if (max_head)
-                    max_tail = max_tail->next_min = iter;
-                  else
-                    max_head = max_tail = iter;
-                }
-              else if (cmp == 0)
-                min_tail = min_tail->next_min = iter;
-              else /* cmp > 0 */
-                {
-                  if (iter->handle == NULL)
-                    return true;
-                  if (any_reader_read (iter->reader, &iter->input))
-                    continue;
-                  if (!mtf_delete_file_in_place (mtf, &iter))
-                    return false;
-                }
-              break;
-            }
-       }
+    }
 
-      /* Next sequence number. */
-      mtf->seq_num++;
+  /* Next sequence number. */
+  mtf->seq_num++;
 
-      /* Store data to all the records we are using. */
-      if (min_tail)
-       min_tail->next_min = NULL;
-      for (iter = min_head; iter; iter = iter->next_min)
-       {
-         int i;
+  /* Store data to all the records we are using. */
+  if (min_tail)
+    min_tail->next_min = NULL;
+  for (iter = min_head; iter; iter = iter->next_min)
+    {
+      int i;
 
-         for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
-           {
-             struct variable *v = dict_get_var (iter->dict, i);
-              struct variable *mv = get_master (v);
-              size_t mv_index = mv ? var_get_dict_index (mv) : 0;
+      for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
+        {
+          struct variable *v = dict_get_var (iter->dict, i);
+          struct variable *mv = get_master (v);
+          size_t mv_index = mv ? var_get_dict_index (mv) : 0;
          
-             if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
-                {
-                  const struct ccase *record
-                    = case_is_null (&iter->input) ? c : &iter->input;
-                  union value *out = case_data_rw (&mtf->mtf_case, mv);
-
-                  mtf->seq_nums[mv_index] = mtf->seq_num;
-                  if (var_is_numeric (v))
-                    out->f = case_num (record, v);
-                  else
-                    memcpy (out->s, case_str (record, v), var_get_width (v));
-                } 
-            }
-          if (iter->in_var != NULL)
-            case_data_rw (&mtf->mtf_case, iter->in_var)->f = 1.;
+          if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
+            {
+              union value *out = case_data_rw (&mtf->mtf_case, mv);
 
-          if (iter->type == MTF_FILE && iter->handle == NULL)
-            read_active_file = true;
-       }
+              mtf->seq_nums[mv_index] = mtf->seq_num;
+              if (var_is_numeric (v))
+                out->f = case_num (&iter->input, v);
+              else
+                memcpy (out->s, case_str (&iter->input, v), var_get_width (v));
+            } 
+        }
+      if (iter->in_var != NULL)
+        case_data_rw (&mtf->mtf_case, iter->in_var)->f = 1.;
+    }
 
-      /* Store missing values to all the records we're not
-         using. */
-      if (max_tail)
-       max_tail->next_min = NULL;
-      for (iter = max_head; iter; iter = iter->next_min)
-       {
-         int i;
+  /* Store missing values to all the records we're not using. */
+  if (max_tail)
+    max_tail->next_min = NULL;
+  for (iter = max_head; iter; iter = iter->next_min)
+    {
+      int i;
 
-         for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
-           {
-             struct variable *v = dict_get_var (iter->dict, i);
-              struct variable *mv = get_master (v);
-              size_t mv_index = mv ? var_get_dict_index (mv) : 0;
+      for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
+        {
+          struct variable *v = dict_get_var (iter->dict, i);
+          struct variable *mv = get_master (v);
+          size_t mv_index = mv ? var_get_dict_index (mv) : 0;
 
-             if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
-                {
-                  union value *out = case_data_rw (&mtf->mtf_case, mv);
-                  mtf->seq_nums[mv_index] = mtf->seq_num;
+          if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
+            {
+              union value *out = case_data_rw (&mtf->mtf_case, mv);
+              mtf->seq_nums[mv_index] = mtf->seq_num;
 
-                  if (var_is_numeric (v))
-                    out->f = SYSMIS;
-                  else
-                    memset (out->s, ' ', var_get_width (v));
-                }
+              if (var_is_numeric (v))
+                out->f = SYSMIS;
+              else
+                memset (out->s, ' ', var_get_width (v));
             }
-          if (iter->in_var != NULL)
-            case_data_rw (&mtf->mtf_case, iter->in_var)->f = 0.;
-       }
+        }
+      if (iter->in_var != NULL)
+        case_data_rw (&mtf->mtf_case, iter->in_var)->f = 0.;
+    }
 
-      /* 5. Write the output record. */
-      casefile_append (mtf->output, &mtf->mtf_case);
+  /* 5. Write the output record. */
+  case_clone (&out_case, &mtf->mtf_case);
+  casewriter_write (mtf->output, &out_case);
 
-      /* 6. Read another record from each input file FILE and TABLE
-        that we stored values from above.  If we come to the end of
-        one of the input files, remove it from the list of input
-        files. */
-      for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
-       {
-         next = iter->next_min;
-         if (iter->reader != NULL
-              && !any_reader_read (iter->reader, &iter->input))
-            if (!mtf_delete_file_in_place (mtf, &iter))
-              return false;
-       }
+  /* 6. Read another record from each input file FILE and TABLE
+     that we stored values from above.  If we come to the end of
+     one of the input files, remove it from the list of input
+     files. */
+  for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
+    {
+      next = iter->next_min;
+      case_destroy (&iter->input);
+      if (!casereader_read (iter->reader, &iter->input))
+        if (!mtf_delete_file_in_place (mtf, &iter))
+          return false;
     }
-  while (!read_active_file
-         && mtf->head != NULL && mtf->head->type == MTF_FILE);
-
   return true;
 }
 
@@ -1519,13 +1361,7 @@ mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
         dict_set_documents (m, d_docs);
       else
         {
-          char *new_docs;
-          size_t new_len;
-
-          new_len = strlen (m_docs) + strlen (d_docs);
-          new_docs = xmalloc (new_len + 1);
-          strcpy (new_docs, m_docs);
-          strcat (new_docs, d_docs);
+          char *new_docs = xasprintf ("%s%s", m_docs, d_docs);
           dict_set_documents (m, new_docs);
           free (new_docs);
         }
@@ -1684,11 +1520,6 @@ map_case (const struct case_map *map,
 {
   size_t dst_idx;
 
-  assert (map != NULL);
-  assert (src != NULL);
-  assert (dst != NULL);
-  assert (src != dst);
-
   for (dst_idx = 0; dst_idx < map->value_cnt; dst_idx++)
     {
       int src_idx = map->map[dst_idx];