Implement ADD FILES and UPDATE.
[pspp-builds.git] / src / language / data-io / match-files.c
index 8fe878b4f926d59ac63c749ad4dfe478206887d4..ada148243bd429e0ff2b2b4b119d8b5027e220d0 100644 (file)
 #include <stdlib.h>
 
 #include <data/any-reader.h>
+#include <data/case-matcher.h>
 #include <data/case.h>
 #include <data/casereader.h>
 #include <data/casewriter.h>
-#include <data/format.h>
 #include <data/dictionary.h>
+#include <data/format.h>
 #include <data/procedure.h>
+#include <data/subcase.h>
 #include <data/variable.h>
 #include <language/command.h>
 #include <language/data-io/file-handle.h>
 #include <language/data-io/trim.h>
 #include <language/lexer/lexer.h>
 #include <language/lexer/variable-parser.h>
+#include <language/stats/sort-criteria.h>
 #include <libpspp/assertion.h>
 #include <libpspp/message.h>
 #include <libpspp/taint.h>
+#include <math/sort.h>
 
 #include "xalloc.h"
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 
+enum command_type
+  {
+    ADD_FILES,
+    MATCH_FILES,
+    UPDATE
+  };
+
 /* File types. */
 enum mtf_type
   {
@@ -47,26 +58,25 @@ enum mtf_type
     MTF_TABLE                  /* Specified on TABLE= subcommand. */
   };
 
-/* One of the FILEs or TABLEs on MATCH FILES. */
+/* One FILE or TABLE subcommand. */
 struct mtf_file
   {
-    struct ll ll;               /* In list of all files and tables. */
-
     enum mtf_type type;
-    int sequence;
-
-    const struct variable **by; /* List of BY variables for this file. */
+    struct casereader *reader;
+    struct subcase by;
+    int idx;
     struct mtf_variable *vars;  /* Variables to copy to output. */
     size_t var_cnt;             /* Number of other variables. */
+    bool is_sorted;             /* Is presorted on the BY variables? */
 
     struct file_handle *handle; /* Input file handle. */
     struct dictionary *dict;   /* Input file dictionary. */
-    struct casereader *reader;  /* Input reader. */
-    struct ccase input;         /* Input record (null at end of file). */
 
-    /* IN subcommand. */
-    char *in_name;              /* Variable name. */
-    struct variable *in_var;    /* Variable (in master dictionary). */
+    /* Used by TABLE. */
+    struct ccase c;
+
+    char in_name[VAR_NAME_LEN + 1];
+    struct variable *in_var;
   };
 
 struct mtf_variable
@@ -75,18 +85,16 @@ struct mtf_variable
     struct variable *out_var;
   };
 
-/* MATCH FILES procedure. */
 struct mtf_proc
   {
-    struct ll_list files;       /* List of "struct mtf_file"s. */
-    int nonempty_files;         /* FILEs that are not at end-of-file. */
-
-    bool ok;                    /* False if I/O error occurs. */
+    struct mtf_file **files;    /* All the files being merged. */
+    size_t n_files;             /* Number of files. */
 
     struct dictionary *dict;    /* Dictionary of output file. */
-    struct casewriter *output;  /* MATCH FILES output. */
+    struct casewriter *output;  /* Destination for output. */
 
-    size_t by_cnt;              /* Number of variables on BY subcommand. */
+    struct case_matcher *matcher;
+    struct subcase by;
 
     /* FIRST, LAST.
        Only if "first" or "last" is nonnull are the remaining
@@ -95,31 +103,51 @@ struct mtf_proc
     struct variable *last;      /* Variable specified on LAST (if any). */
     struct ccase buffered_case; /* Case ready for output except that we don't
                                    know the value for the LAST variable yet. */
-    struct ccase prev_BY_case;  /* Case with values of last set of BY vars. */
-    const struct variable **prev_BY;  /* Last set of BY variables. */
+    union value *prev_BY;       /* Values of BY vars in buffered_case. */
   };
 
+static int combine_files (enum command_type, struct lexer *, struct dataset *);
 static void mtf_free (struct mtf_proc *);
 
 static bool mtf_close_all_files (struct mtf_proc *);
 static bool mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
-static bool mtf_read_record (struct mtf_proc *mtf, struct mtf_file *);
 
-static void mtf_process_case (struct mtf_proc *);
+static void process_update (struct mtf_proc *);
+static void process_match_files (struct mtf_proc *);
+static void process_add_files (struct mtf_proc *);
 
 static bool create_flag_var (const char *subcommand_name, const char *var_name,
                              struct dictionary *, struct variable **);
 static char *var_type_description (struct variable *);
+static void output_case (struct mtf_proc *, struct ccase *, union value *by);
+static void output_buffered_case (struct mtf_proc *);
+
+int
+cmd_add_files (struct lexer *lexer, struct dataset *ds)
+{
+  return combine_files (ADD_FILES, lexer, ds);
+}
 
-/* Parse and execute the MATCH FILES command. */
 int
 cmd_match_files (struct lexer *lexer, struct dataset *ds)
+{
+  return combine_files (MATCH_FILES, lexer, ds);
+}
+
+int
+cmd_update (struct lexer *lexer, struct dataset *ds)
+{
+  return combine_files (UPDATE, lexer, ds);
+}
+
+static int
+combine_files (enum command_type command,
+               struct lexer *lexer, struct dataset *ds)
 {
   struct mtf_proc mtf;
-  struct ll *first_table;
-  struct mtf_file *file, *next;
 
-  bool saw_in = false;
+  bool saw_by = false;
+  bool saw_sort = false;
   struct casereader *active_file = NULL;
 
   char first_name[VAR_NAME_LEN + 1] = "";
@@ -127,54 +155,56 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
 
   struct taint *taint = NULL;
 
+  size_t n_files = 0;
+  size_t n_tables = 0;
+  size_t allocated_files = 0;
+
   size_t i;
 
-  ll_init (&mtf.files);
-  mtf.nonempty_files = 0;
-  first_table = ll_null (&mtf.files);
+  mtf.files = NULL;
+  mtf.n_files = 0;
   mtf.dict = dict_create ();
   mtf.output = NULL;
-  mtf.by_cnt = 0;
-  mtf.first = mtf.last = NULL;
+  mtf.matcher = NULL;
+  subcase_init_empty (&mtf.by);
+  mtf.first = NULL;
+  mtf.last = NULL;
   case_nullify (&mtf.buffered_case);
-  case_nullify (&mtf.prev_BY_case);
   mtf.prev_BY = NULL;
 
   dict_set_case_limit (mtf.dict, dict_get_case_limit (dataset_dict (ds)));
 
   lex_match (lexer, '/');
-  while (lex_token (lexer) == T_ID
-         && (lex_id_match (ss_cstr ("FILE"), ss_cstr (lex_tokid (lexer)))
-             || lex_id_match (ss_cstr ("TABLE"), ss_cstr (lex_tokid (lexer)))))
+  for (;;)
     {
-      struct mtf_file *file = xmalloc (sizeof *file);
-      file->by = NULL;
-      file->handle = NULL;
-      file->reader = NULL;
-      file->dict = NULL;
-      file->in_name = NULL;
-      file->in_var = NULL;
-      file->var_cnt = 0;
-      file->vars = NULL;
-      case_nullify (&file->input);
+      struct mtf_file *file;
+      enum mtf_type type;
 
-      if (lex_match_id (lexer, "FILE"))
-        {
-          file->type = MTF_FILE;
-          ll_insert (first_table, &file->ll);
-          mtf.nonempty_files++;
-        }
-      else if (lex_match_id (lexer, "TABLE"))
-        {
-          file->type = MTF_TABLE;
-          ll_push_tail (&mtf.files, &file->ll);
-          if (first_table == ll_null (&mtf.files))
-            first_table = &file->ll;
-        }
+      if (lex_match_id (lexer, "FILE")) 
+        type = MTF_FILE;
+      else if (command == MATCH_FILES && lex_match_id (lexer, "TABLE")) 
+        type = MTF_TABLE;
       else
-        NOT_REACHED ();
+        break;
       lex_match (lexer, '=');
 
+      if (mtf.n_files >= allocated_files)
+        mtf.files = x2nrealloc (mtf.files, &allocated_files,
+                                sizeof *mtf.files);
+      mtf.files[mtf.n_files++] = file = xmalloc (sizeof *file);
+      file->type = type;
+      file->reader = NULL;
+      subcase_init_empty (&file->by);
+      file->idx = type == MTF_FILE ? n_files++ : n_tables++;
+      file->vars = NULL;
+      file->var_cnt = 0;
+      file->is_sorted = true;
+      file->handle = NULL;
+      file->dict = NULL;
+      case_nullify (&file->c);
+      file->in_name[0] = '\0';
+      file->in_var = NULL;
+
       if (lex_match (lexer, '*'))
         {
           if (!proc_has_active_file (ds))
@@ -186,7 +216,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
 
           if (proc_make_temporary_transformations_permanent (ds))
             msg (SE,
-                 _("MATCH FILES may not be used after TEMPORARY when "
+                 _("This command may not be used after TEMPORARY when "
                    "the active file is an input source.  "
                    "Temporary transformations will be made permanent."));
 
@@ -218,15 +248,19 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
                 goto error;
               }
 
-            if (file->in_name != NULL)
+            if (file->in_name[0])
               {
                 msg (SE, _("Multiple IN subcommands for a single FILE or "
                            "TABLE."));
                 goto error;
               }
-            file->in_name = xstrdup (lex_tokid (lexer));
+            strcpy (file->in_name, lex_tokid (lexer));
             lex_get (lexer);
-            saw_in = true;
+          }
+        else if (lex_match_id (lexer, "SORT"))
+          {
+            file->is_sorted = false;
+            saw_sort = true;
           }
 
       mtf_merge_dictionary (mtf.dict, file);
@@ -236,49 +270,53 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
     {
       if (lex_match (lexer, T_BY))
        {
-          struct mtf_file *file;
-          struct variable **by;
+          const struct variable **by_vars;
+          size_t i;
           bool ok;
 
-         if (mtf.by_cnt)
+         if (saw_by)
            {
               lex_sbc_only_once ("BY");
              goto error;
            }
+          saw_by = true;
 
          lex_match (lexer, '=');
-         if (!parse_variables (lexer, mtf.dict, &by, &mtf.by_cnt,
-                               PV_NO_DUPLICATE | PV_NO_SCRATCH))
+          if (!parse_sort_criteria (lexer, mtf.dict, &mtf.by, &by_vars, NULL))
            goto error;
 
           ok = true;
-          ll_for_each (file, struct mtf_file, ll, &mtf.files)
+          for (i = 0; i < mtf.n_files; i++)
             {
-              size_t i;
+              struct mtf_file *file = mtf.files[i];
+              size_t j;
 
-              file->by = xnmalloc (mtf.by_cnt, sizeof *file->by);
-              for (i = 0; i < mtf.by_cnt; i++)
+              for (j = 0; j < subcase_get_n_values (&mtf.by); j++)
                 {
-                  const char *var_name = var_get_name (by[i]);
-                  file->by[i] = dict_lookup_var (file->dict, var_name);
-                  if (file->by[i] == NULL)
+                  const char *name = var_get_name (by_vars[j]);
+                  struct variable *var = dict_lookup_var (file->dict, name);
+                  if (var != NULL)
+                    subcase_add_var (&file->by, var,
+                                     subcase_get_direction (&mtf.by, j));
+                  else
                     {
                       if (file->handle != NULL)
                         msg (SE, _("File %s lacks BY variable %s."),
-                             fh_get_name (file->handle), var_name);
+                             fh_get_name (file->handle), name);
                       else
-                        msg (SE, _("Active file lacks BY variable %s."),
-                             var_name);
+                        msg (SE, _("Active file lacks BY variable %s."), name);
                       ok = false;
                     }
                 }
+              assert (!ok || subcase_conformable (&file->by,
+                                                  &mtf.files[0]->by));
             }
-          free (by);
+          free (by_vars);
 
           if (!ok)
             goto error;
        }
-      else if (lex_match_id (lexer, "FIRST"))
+      else if (command != UPDATE && lex_match_id (lexer, "FIRST"))
         {
           if (first_name[0] != '\0')
             {
@@ -292,7 +330,7 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
           strcpy (first_name, lex_tokid (lexer));
           lex_get (lexer);
         }
-      else if (lex_match_id (lexer, "LAST"))
+      else if (command != UPDATE && lex_match_id (lexer, "LAST"))
         {
           if (last_name[0] != '\0')
             {
@@ -333,31 +371,38 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
         }
     }
 
-  if (mtf.by_cnt == 0)
+  if (!saw_by)
     {
-      if (first_table != ll_null (&mtf.files))
+      if (command == UPDATE) 
+        {
+          msg (SE, _("The BY subcommand is required."));
+          goto error;
+        }
+      if (n_tables)
         {
           msg (SE, _("BY is required when TABLE is specified."));
           goto error;
         }
-      if (saw_in)
+      if (saw_sort)
         {
-          msg (SE, _("BY is required when IN is specified."));
+          msg (SE, _("BY is required when SORT is specified."));
           goto error;
         }
     }
 
   /* Set up mapping from each file's variables to master
      variables. */
-  ll_for_each (file, struct mtf_file, ll, &mtf.files)
+  for (i = 0; i < mtf.n_files; i++)
     {
+      struct mtf_file *file = mtf.files[i];
       size_t in_var_cnt = dict_get_var_cnt (file->dict);
+      size_t j;
 
       file->vars = xnmalloc (in_var_cnt, sizeof *file->vars);
       file->var_cnt = 0;
-      for (i = 0; i < in_var_cnt; i++)
+      for (j = 0; j < in_var_cnt; j++)
         {
-          struct variable *in_var = dict_get_var (file->dict, i);
+          struct variable *in_var = dict_get_var (file->dict, j);
           struct variable *out_var = dict_lookup_var (mtf.dict,
                                                       var_get_name (in_var));
 
@@ -371,9 +416,12 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
     }
 
   /* Add IN, FIRST, and LAST variables to master dictionary. */
-  ll_for_each (file, struct mtf_file, ll, &mtf.files)
-    if (!create_flag_var ("IN", file->in_name, mtf.dict, &file->in_var))
-      goto error;
+  for (i = 0; i < mtf.n_files; i++)
+    {
+      struct mtf_file *file = mtf.files[i];
+      if (!create_flag_var ("IN", file->in_name, mtf.dict, &file->in_var))
+        goto error; 
+    }
   if (!create_flag_var ("FIRST", first_name, mtf.dict, &mtf.first)
       || !create_flag_var ("LAST", last_name, mtf.dict, &mtf.last))
     goto error;
@@ -383,8 +431,11 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
   mtf.output = autopaging_writer_create (dict_get_next_value_idx (mtf.dict));
   taint = taint_clone (casewriter_get_taint (mtf.output));
 
-  ll_for_each (file, struct mtf_file, ll, &mtf.files)
+  mtf.matcher = case_matcher_create ();
+  taint_propagate (case_matcher_get_taint (mtf.matcher), taint);
+  for (i = 0; i < mtf.n_files; i++)
     {
+      struct mtf_file *file = mtf.files[i];
       if (file->reader == NULL)
         {
           if (active_file == NULL)
@@ -395,20 +446,27 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
           else
             file->reader = casereader_clone (active_file);
         }
-      taint_propagate (casereader_get_taint (file->reader), taint);
+      if (!file->is_sorted) 
+        file->reader = sort_execute (file->reader, &file->by);
+      if (file->type == MTF_FILE)
+        case_matcher_add_input (mtf.matcher, file->reader, &file->by);
+      else 
+        {
+          casereader_read (file->reader, &file->c);
+          taint_propagate (casereader_get_taint (file->reader), taint);
+        }
     }
 
-  ll_for_each_safe (file, next, struct mtf_file, ll, &mtf.files)
-    mtf_read_record (&mtf, file);
-  while (mtf.nonempty_files > 0)
-    mtf_process_case (&mtf);
-  if ((mtf.first != NULL || mtf.last != NULL) && mtf.prev_BY != NULL)
-    {
-      if (mtf.last != NULL)
-        case_data_rw (&mtf.buffered_case, mtf.last)->f = 1.0;
-      casewriter_write (mtf.output, &mtf.buffered_case);
-      case_nullify (&mtf.buffered_case);
-    }
+  if (command == ADD_FILES)
+    process_add_files (&mtf);
+  else if (command == MATCH_FILES)
+    process_match_files (&mtf);
+  else if (command == UPDATE)
+    process_update (&mtf);
+  else
+    NOT_REACHED ();
+
+  case_matcher_destroy (mtf.matcher);
   mtf_close_all_files (&mtf);
   if (active_file != NULL)
     proc_commit (ds);
@@ -429,19 +487,18 @@ cmd_match_files (struct lexer *lexer, struct dataset *ds)
   return CMD_CASCADING_FAILURE;
 }
 
-/* If VAR_NAME is a nonnull pointer to a non-empty string,
-   attempts to create a variable named VAR_NAME, with format
-   F1.0, in DICT, and stores a pointer to the variable in *VAR.
-   Returns true if successful, false if the variable name is a
-   duplicate (in which case a message saying that the variable
-   specified on the given SUBCOMMAND is a duplicate is emitted).
-   Also returns true, without doing anything, if VAR_NAME is null
-   or empty. */
+/* If VAR_NAME is a non-empty string, attempts to create a
+   variable named VAR_NAME, with format F1.0, in DICT, and stores
+   a pointer to the variable in *VAR.  Returns true if
+   successful, false if the variable name is a duplicate (in
+   which case a message saying that the variable specified on the
+   given SUBCOMMAND is a duplicate is emitted).  Also returns
+   true, without doing anything, if VAR_NAME is null or empty. */
 static bool
 create_flag_var (const char *subcommand, const char *var_name,
                  struct dictionary *dict, struct variable **var)
 {
-  if (var_name != NULL && var_name[0] != '\0')
+  if (var_name[0] != '\0')
     {
       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
       *var = dict_create_var (dict, var_name, 0);
@@ -476,154 +533,228 @@ var_type_description (struct variable *v)
 static bool
 mtf_close_all_files (struct mtf_proc *mtf)
 {
-  struct mtf_file *file;
   bool ok = true;
+  size_t i;
 
-  ll_for_each_preremove (file, struct mtf_file, ll, &mtf->files)
+  for (i = 0; i < mtf->n_files; i++)
     {
+      struct mtf_file *file = mtf->files[i];
       fh_unref (file->handle);
-      casereader_destroy (file->reader);
-      free (file->by);
       dict_destroy (file->dict);
-      free (file->in_name);
-      case_destroy (&file->input);
+      subcase_destroy (&file->by);
+      if (file->type == MTF_TABLE)
+        casereader_destroy (file->reader);
       free (file->vars);
       free (file);
     }
+  free (mtf->files);
+  mtf->files = NULL;
+  mtf->n_files = 0;
 
   return ok;
 }
 
-/* Frees all the data for the MATCH FILES procedure. */
+/* Frees all the data for the procedure. */
 static void
 mtf_free (struct mtf_proc *mtf)
 {
   mtf_close_all_files (mtf);
   dict_destroy (mtf->dict);
+  subcase_destroy (&mtf->by);
   casewriter_destroy (mtf->output);
   case_destroy (&mtf->buffered_case);
-  case_destroy (&mtf->prev_BY_case);
+  free (mtf->prev_BY);
 }
 
-/* Reads the next record into FILE, if possible, and update MTF's
-   nonempty_files count if not. */
 static bool
-mtf_read_record (struct mtf_proc *mtf, struct mtf_file *file)
+scan_table (struct mtf_file *file, union value *by) 
 {
-  case_destroy (&file->input);
-  if (!casereader_read (file->reader, &file->input))
+  while (!case_is_null (&file->c))
     {
-      mtf->nonempty_files--;
-      return false;
+      int cmp = subcase_compare_3way_xc (&file->by, by, &file->c);
+      if (cmp > 0)
+        casereader_read (file->reader, &file->c);
+      else
+        return cmp == 0;
     }
-  else
-    return true;
+  return false;
 }
 
-/* Compare the BY variables for files A and B; return -1 if A <
-   B, 0 if A == B, 1 if A > B.  (If there are no BY variables,
-   then all records are equal.) */
-static inline int
-mtf_compare_BY_values (struct mtf_proc *mtf,
-                       struct mtf_file *a, struct mtf_file *b)
+static void
+create_output_case (const struct mtf_proc *mtf, struct ccase *c)
 {
-  return case_compare_2dict (&a->input, &b->input, a->by, b->by, mtf->by_cnt);
+  size_t i;
+
+  case_create (c, dict_get_next_value_idx (mtf->dict));
+  for (i = 0; i < dict_get_var_cnt (mtf->dict); i++)
+    {
+      struct variable *v = dict_get_var (mtf->dict, i);
+      value_set_missing (case_data_rw (c, v), var_get_width (v));
+    }
+  for (i = 0; i < mtf->n_files; i++)
+    {
+      struct mtf_file *file = mtf->files[i];
+      if (file->in_var != NULL)
+        case_data_rw (c, file->in_var)->f = false;
+    }
 }
 
-/* Processes input files and write one case to the output file. */
 static void
-mtf_process_case (struct mtf_proc *mtf)
+apply_case (const struct mtf_file *file, struct ccase *file_case,
+            struct ccase *c) 
+{
+  /* XXX subcases */
+  size_t j;
+  for (j = 0; j < file->var_cnt; j++)
+    {
+      const struct mtf_variable *mv = &file->vars[j];
+      const union value *in = case_data (file_case, mv->in_var);
+      union value *out = case_data_rw (c, mv->out_var);
+      value_copy (out, in, var_get_width (mv->in_var));
+    }
+  case_destroy (file_case);
+  if (file->in_var != NULL)
+    case_data_rw (c, file->in_var)->f = true; 
+}
+
+static size_t
+find_first_match (struct ccase *cases) 
 {
-  struct ccase c;
-  struct mtf_file *min;
-  struct mtf_file *file;
-  int min_sequence;
   size_t i;
+  for (i = 0; ; i++)
+    if (!case_is_null (&cases[i]))
+      return i;
+}
 
-  /* Find the set of one or more FILEs whose BY values are
-     minimal, as well as the set of zero or more TABLEs whose BY
-     values equal those of the minimum FILEs.
-
-     After each iteration of the loop, this invariant holds: the
-     FILEs with minimum BY values thus far have "sequence"
-     members equal to min_sequence, and "min" points to one of
-     the mtf_files whose case has those minimum BY values, and
-     similarly for TABLEs. */
-  min_sequence = 0;
-  min = NULL;
-  ll_for_each (file, struct mtf_file, ll, &mtf->files)
-    if (case_is_null (&file->input))
-      file->sequence = -1;
-    else if (file->type == MTF_FILE)
-      {
-        int cmp = min != NULL ? mtf_compare_BY_values (mtf, min, file) : 1;
-        if (cmp <= 0)
-          file->sequence = cmp < 0 ? -1 : min_sequence;
-        else
-          {
-            file->sequence = ++min_sequence;
-            min = file;
-          }
-      }
-    else
-      {
-        int cmp;
-        assert (min != NULL);
-        do
+static void
+process_update (struct mtf_proc *mtf)
+{
+  struct ccase *cases;
+  union value *by;
+
+  while (case_matcher_read (mtf->matcher, &cases, &by))
+    {
+      struct mtf_file *min;
+      struct ccase c;
+      size_t min_idx;
+      size_t i;
+
+      create_output_case (mtf, &c);
+      min_idx = find_first_match (cases);
+      min = mtf->files[min_idx];
+      apply_case (min, &cases[min_idx], &c);
+      case_matcher_advance (mtf->matcher, min_idx, &cases[min_idx]);
+      for (i = MAX (1, min_idx); i < mtf->n_files; i++)
+        while (!case_is_null (&cases[i]))
           {
-            cmp = mtf_compare_BY_values (mtf, min, file);
+            apply_case (mtf->files[i], &cases[i], &c);
+            case_matcher_advance (mtf->matcher, i, &cases[i]);
           }
-        while (cmp > 0 && mtf_read_record (mtf, file));
-        file->sequence = cmp == 0 ? min_sequence : -1;
-      }
+      casewriter_write (mtf->output, &c);
+      
+      if (min_idx == 0)
+        {
+          size_t n_dups;
 
-  /* Form the output case from the input cases. */
-  case_create (&c, dict_get_next_value_idx (mtf->dict));
-  for (i = 0; i < dict_get_var_cnt (mtf->dict); i++)
-    {
-      struct variable *v = dict_get_var (mtf->dict, i);
-      value_set_missing (case_data_rw (&c, v), var_get_width (v));
+          for (n_dups = 0; !case_is_null (&cases[0]); n_dups++)
+            {
+              create_output_case (mtf, &c);
+              apply_case (mtf->files[0], &cases[0], &c);
+              case_matcher_advance (mtf->matcher, 0, &cases[0]);
+              casewriter_write (mtf->output, &c);
+            }
+#if 0
+          if (n_dups > 0) 
+            msg (SW, _("Encountered %zu duplicates."), n_dups);
+#endif
+          /* XXX warn.  That's the whole point; otherwise we
+             don't need the 'if' statement at all. */
+        }
     }
-  ll_for_each_reverse (file, struct mtf_file, ll, &mtf->files)
+}
+
+/* Executes MATCH FILES for key-based matches. */
+static void
+process_match_files (struct mtf_proc *mtf)
+{
+  union value *by;
+  struct ccase *cases;
+
+  while (case_matcher_read (mtf->matcher, &cases, &by))
     {
-      bool include_file = file->sequence == min_sequence;
-      if (include_file)
-        for (i = 0; i < file->var_cnt; i++)
-          {
-            const struct mtf_variable *mv = &file->vars[i];
-            const union value *in = case_data (&file->input, mv->in_var);
-            union value *out = case_data_rw (&c, mv->out_var);
-            value_copy (out, in, var_get_width (mv->in_var));
-          }
-      if (file->in_var != NULL)
-        case_data_rw (&c, file->in_var)->f = include_file;
+      struct ccase c;
+      size_t i;
+
+      create_output_case (mtf, &c);
+      for (i = mtf->n_files; i-- > 0; )
+        {
+          struct mtf_file *file = mtf->files[i];
+          struct ccase *file_case;
+          bool include;
+          if (file->type == MTF_FILE) 
+            {
+              file_case = &cases[file->idx];
+              include = !case_is_null (file_case);
+              if (include)
+                case_matcher_advance (mtf->matcher, file->idx, NULL);
+            }
+          else
+            {
+              file_case = &file->c;
+              include = scan_table (file, by);
+              if (include)
+                case_clone (file_case, file_case);
+            }
+          if (include) 
+            apply_case (file, file_case, &c);
+        }
+      output_case (mtf, &c, by);
     }
+  output_buffered_case (mtf);
+}
 
-  /* Write the output case. */
-  if (mtf->first == NULL && mtf->last == NULL)
+/* Processes input files and write one case to the output file. */
+static void
+process_add_files (struct mtf_proc *mtf)
+{
+  union value *by;
+  struct ccase *cases;
+
+  while (case_matcher_read (mtf->matcher, &cases, &by))
     {
-      /* With no FIRST or LAST variables, it's trivial. */
-      casewriter_write (mtf->output, &c);
+      struct ccase c;
+      size_t i;
+
+      for (i = 0; i < mtf->n_files; i++)
+        {
+          struct mtf_file *file = mtf->files[i];
+          while (!case_is_null (&cases[i])) 
+            {
+              create_output_case (mtf, &c);
+              apply_case (file, &cases[i], &c);
+              case_matcher_advance (mtf->matcher, i, &cases[i]);
+              output_case (mtf, &c, by);
+            }
+        }
     }
+  output_buffered_case (mtf);
+}
+
+static void
+output_case (struct mtf_proc *mtf, struct ccase *c, union value *by)
+{
+  if (mtf->first == NULL && mtf->last == NULL)
+    casewriter_write (mtf->output, c);
   else
     {
       /* It's harder with LAST, because we can't know whether
          this case is the last in a group until we've prepared
          the *next* case also.  Thus, we buffer the previous
-         output case until the next one is ready.
-
-         We also have to save a copy of one of the previous input
-         cases, so that we can compare the BY variables.  We
-         can't compare the BY variables between the current
-         output case and the saved one because the BY variables
-         might not be in the output (the user is allowed to drop
-         them). */
+         output case until the next one is ready. */
       bool new_BY;
       if (mtf->prev_BY != NULL)
         {
-          new_BY = case_compare_2dict (&min->input, &mtf->prev_BY_case,
-                                       min->by, mtf->prev_BY,
-                                       mtf->by_cnt);
+          new_BY = !subcase_equal_xx (&mtf->by, mtf->prev_BY, by);
           if (mtf->last != NULL)
             case_data_rw (&mtf->buffered_case, mtf->last)->f = new_BY;
           casewriter_write (mtf->output, &mtf->buffered_case);
@@ -631,28 +762,30 @@ mtf_process_case (struct mtf_proc *mtf)
       else
         new_BY = true;
 
-      case_move (&mtf->buffered_case, &c);
+      case_move (&mtf->buffered_case, c);
       if (mtf->first != NULL)
         case_data_rw (&mtf->buffered_case, mtf->first)->f = new_BY;
 
       if (new_BY)
         {
-          mtf->prev_BY = min->by;
-          case_destroy (&mtf->prev_BY_case);
-          case_clone (&mtf->prev_BY_case, &min->input);
+          size_t n = subcase_get_n_values (&mtf->by) * sizeof (union value);
+          if (mtf->prev_BY == NULL)
+            mtf->prev_BY = xmalloc (n);
+          memcpy (mtf->prev_BY, by, n);
         }
     }
+}
 
-  /* Read another record from each input file FILE with minimum
-     values. */
-  ll_for_each (file, struct mtf_file, ll, &mtf->files)
-    if (file->type == MTF_FILE)
-      {
-        if (file->sequence == min_sequence)
-          mtf_read_record (mtf, file);
-      }
-    else
-      break;
+static void
+output_buffered_case (struct mtf_proc *mtf) 
+{
+  if (mtf->prev_BY != NULL)
+    {
+      if (mtf->last != NULL)
+        case_data_rw (&mtf->buffered_case, mtf->last)->f = 1.0;
+      casewriter_write (mtf->output, &mtf->buffered_case);
+      case_nullify (&mtf->buffered_case);
+    }
 }
 
 /* Merge the dictionary for file F into master dictionary M. */
@@ -704,14 +837,10 @@ mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
               return false;
             }
 
-          if (var_get_width (dv) == var_get_width (mv))
-            {
-              if (var_has_value_labels (dv) && !var_has_value_labels (mv))
-                var_set_value_labels (mv, var_get_value_labels (dv));
-              if (var_has_missing_values (dv) && !var_has_missing_values (mv))
-                var_set_missing_values (mv, var_get_missing_values (dv));
-            }
-
+          if (var_has_value_labels (dv) && !var_has_value_labels (mv))
+            var_set_value_labels (mv, var_get_value_labels (dv));
+          if (var_has_missing_values (dv) && !var_has_missing_values (mv))
+            var_set_missing_values (mv, var_get_missing_values (dv));
           if (var_get_label (dv) && !var_get_label (mv))
             var_set_label (mv, var_get_label (dv));
         }