UPDATE: Do not update from missing values in transaction files.
[pspp] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/xalloc.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 enum comb_command_type
50   {
51     COMB_ADD,
52     COMB_MATCH,
53     COMB_UPDATE
54   };
55
56 /* File types. */
57 enum comb_file_type
58   {
59     COMB_FILE,                  /* Specified on FILE= subcommand. */
60     COMB_TABLE                  /* Specified on TABLE= subcommand. */
61   };
62
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
65   {
66     /* Basics. */
67     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
68
69     /* Variables. */
70     struct subcase by_vars;     /* BY variables in this input file. */
71     struct subcase src, dst;    /* Data to copy to output; where to put it. */
72     const struct missing_values **mv; /* Each variable's missing values. */
73
74     /* Input files. */
75     struct file_handle *handle; /* Input file handle. */
76     struct dictionary *dict;    /* Input file dictionary. */
77     struct casereader *reader;  /* Input data source. */
78     struct ccase *data;         /* The current input case. */
79     bool is_minimal;            /* Does 'data' have minimum BY values across
80                                    all input files? */
81     bool is_sorted;             /* Is file presorted on the BY variables? */
82
83     /* IN subcommand. */
84     char *in_name;
85     struct variable *in_var;
86   };
87
88 struct comb_proc
89   {
90     struct comb_file *files;    /* All the files being merged. */
91     size_t n_files;             /* Number of files. */
92
93     struct dictionary *dict;    /* Dictionary of output file. */
94     struct subcase by_vars;     /* BY variables in the output. */
95     struct casewriter *output;  /* Destination for output. */
96
97     struct case_matcher *matcher;
98
99     /* FIRST, LAST.
100        Only if "first" or "last" is nonnull are the remaining
101        members used. */
102     struct variable *first;     /* Variable specified on FIRST (if any). */
103     struct variable *last;      /* Variable specified on LAST (if any). */
104     struct ccase *buffered_case; /* Case ready for output except that we don't
105                                     know the value for the LAST var yet. */
106     union value *prev_BY;       /* Values of BY vars in buffered_case. */
107   };
108
109 static int combine_files (enum comb_command_type, struct lexer *,
110                           struct dataset *);
111 static void free_comb_proc (struct comb_proc *);
112
113 static void close_all_comb_files (struct comb_proc *);
114 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
115
116 static void execute_update (struct comb_proc *);
117 static void execute_match_files (struct comb_proc *);
118 static void execute_add_files (struct comb_proc *);
119
120 static bool create_flag_var (const char *subcommand_name, const char *var_name,
121                              struct dictionary *, struct variable **);
122 static void output_case (struct comb_proc *, struct ccase *, union value *by);
123 static void output_buffered_case (struct comb_proc *);
124
125 int
126 cmd_add_files (struct lexer *lexer, struct dataset *ds)
127 {
128   return combine_files (COMB_ADD, lexer, ds);
129 }
130
131 int
132 cmd_match_files (struct lexer *lexer, struct dataset *ds)
133 {
134   return combine_files (COMB_MATCH, lexer, ds);
135 }
136
137 int
138 cmd_update (struct lexer *lexer, struct dataset *ds)
139 {
140   return combine_files (COMB_UPDATE, lexer, ds);
141 }
142
143 static int
144 combine_files (enum comb_command_type command,
145                struct lexer *lexer, struct dataset *ds)
146 {
147   struct comb_proc proc;
148
149   bool saw_by = false;
150   bool saw_sort = false;
151   struct casereader *active_file = NULL;
152
153   char *first_name = NULL;
154   char *last_name = NULL;
155
156   struct taint *taint = NULL;
157
158   size_t n_tables = 0;
159   size_t allocated_files = 0;
160
161   size_t i;
162
163   proc.files = NULL;
164   proc.n_files = 0;
165   proc.dict = dict_create (get_default_encoding ());
166   proc.output = NULL;
167   proc.matcher = NULL;
168   subcase_init_empty (&proc.by_vars);
169   proc.first = NULL;
170   proc.last = NULL;
171   proc.buffered_case = NULL;
172   proc.prev_BY = NULL;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       struct comb_file *file;
180       enum comb_file_type type;
181
182       if (lex_match_id (lexer, "FILE"))
183         type = COMB_FILE;
184       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
185         {
186           type = COMB_TABLE;
187           n_tables++;
188         }
189       else
190         break;
191       lex_match (lexer, T_EQUALS);
192
193       if (proc.n_files >= allocated_files)
194         proc.files = x2nrealloc (proc.files, &allocated_files,
195                                 sizeof *proc.files);
196       file = &proc.files[proc.n_files++];
197       file->type = type;
198       subcase_init_empty (&file->by_vars);
199       subcase_init_empty (&file->src);
200       subcase_init_empty (&file->dst);
201       file->mv = NULL;
202       file->handle = NULL;
203       file->dict = NULL;
204       file->reader = NULL;
205       file->data = NULL;
206       file->is_sorted = true;
207       file->in_name = NULL;
208       file->in_var = NULL;
209
210       if (lex_match (lexer, T_ASTERISK))
211         {
212           if (!dataset_has_source (ds))
213             {
214               msg (SE, _("Cannot specify the active dataset since none "
215                          "has been defined."));
216               goto error;
217             }
218
219           if (proc_make_temporary_transformations_permanent (ds))
220             msg (SE, _("This command may not be used after TEMPORARY when "
221                        "the active dataset is an input source.  "
222                        "Temporary transformations will be made permanent."));
223
224           file->dict = dict_clone (dataset_dict (ds));
225         }
226       else
227         {
228           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
229           if (file->handle == NULL)
230             goto error;
231
232           file->reader = any_reader_open (file->handle, NULL, &file->dict);
233           if (file->reader == NULL)
234             goto error;
235         }
236
237       while (lex_match (lexer, T_SLASH))
238         if (lex_match_id (lexer, "RENAME"))
239           {
240             if (!parse_dict_rename (lexer, file->dict))
241               goto error;
242           }
243         else if (lex_match_id (lexer, "IN"))
244           {
245             lex_match (lexer, T_EQUALS);
246             if (lex_token (lexer) != T_ID)
247               {
248                 lex_error (lexer, NULL);
249                 goto error;
250               }
251
252             if (file->in_name)
253               {
254                 msg (SE, _("Multiple IN subcommands for a single FILE or "
255                            "TABLE."));
256                 goto error;
257               }
258             file->in_name = xstrdup (lex_tokcstr (lexer));
259             lex_get (lexer);
260           }
261         else if (lex_match_id (lexer, "SORT"))
262           {
263             file->is_sorted = false;
264             saw_sort = true;
265           }
266
267       merge_dictionary (proc.dict, file);
268     }
269
270   while (lex_token (lexer) != T_ENDCMD)
271     {
272       if (lex_match (lexer, T_BY))
273         {
274           const struct variable **by_vars;
275           size_t i;
276           bool ok;
277
278           if (saw_by)
279             {
280               lex_sbc_only_once ("BY");
281               goto error;
282             }
283           saw_by = true;
284
285           lex_match (lexer, T_EQUALS);
286           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
287                                     &by_vars, NULL))
288             goto error;
289
290           ok = true;
291           for (i = 0; i < proc.n_files; i++)
292             {
293               struct comb_file *file = &proc.files[i];
294               size_t j;
295
296               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
297                 {
298                   const char *name = var_get_name (by_vars[j]);
299                   struct variable *var = dict_lookup_var (file->dict, name);
300                   if (var != NULL)
301                     subcase_add_var (&file->by_vars, var,
302                                      subcase_get_direction (&proc.by_vars, j));
303                   else
304                     {
305                       if (file->handle != NULL)
306                         msg (SE, _("File %s lacks BY variable %s."),
307                              fh_get_name (file->handle), name);
308                       else
309                         msg (SE, _("Active dataset lacks BY variable %s."),
310                              name);
311                       ok = false;
312                     }
313                 }
314               assert (!ok || subcase_conformable (&file->by_vars,
315                                                   &proc.files[0].by_vars));
316             }
317           free (by_vars);
318
319           if (!ok)
320             goto error;
321         }
322       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
323         {
324           if (first_name != NULL)
325             {
326               lex_sbc_only_once ("FIRST");
327               goto error;
328             }
329
330           lex_match (lexer, T_EQUALS);
331           if (!lex_force_id (lexer))
332             goto error;
333           first_name = xstrdup (lex_tokcstr (lexer));
334           lex_get (lexer);
335         }
336       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
337         {
338           if (last_name != NULL)
339             {
340               lex_sbc_only_once ("LAST");
341               goto error;
342             }
343
344           lex_match (lexer, T_EQUALS);
345           if (!lex_force_id (lexer))
346             goto error;
347           last_name = xstrdup (lex_tokcstr (lexer));
348           lex_get (lexer);
349         }
350       else if (lex_match_id (lexer, "MAP"))
351         {
352           /* FIXME. */
353         }
354       else if (lex_match_id (lexer, "DROP"))
355         {
356           if (!parse_dict_drop (lexer, proc.dict))
357             goto error;
358         }
359       else if (lex_match_id (lexer, "KEEP"))
360         {
361           if (!parse_dict_keep (lexer, proc.dict))
362             goto error;
363         }
364       else
365         {
366           lex_error (lexer, NULL);
367           goto error;
368         }
369
370       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
371         {
372           lex_end_of_command (lexer);
373           goto error;
374         }
375     }
376
377   if (!saw_by)
378     {
379       if (command == COMB_UPDATE)
380         {
381           lex_sbc_missing ("BY");
382           goto error;
383         }
384       if (n_tables)
385         {
386           msg (SE, _("BY is required when %s is specified."), "TABLE");
387           goto error;
388         }
389       if (saw_sort)
390         {
391           msg (SE, _("BY is required when %s is specified."), "SORT");
392           goto error;
393         }
394     }
395
396   /* Add IN, FIRST, and LAST variables to master dictionary. */
397   for (i = 0; i < proc.n_files; i++)
398     {
399       struct comb_file *file = &proc.files[i];
400       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
401         goto error;
402     }
403   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
404       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
405     goto error;
406
407   dict_delete_scratch_vars (proc.dict);
408   dict_compact_values (proc.dict);
409
410   /* Set up mapping from each file's variables to master
411      variables. */
412   for (i = 0; i < proc.n_files; i++)
413     {
414       struct comb_file *file = &proc.files[i];
415       size_t src_var_cnt = dict_get_var_cnt (file->dict);
416       size_t j;
417
418       file->mv = xnmalloc (src_var_cnt, sizeof *file->mv);
419       for (j = 0; j < src_var_cnt; j++)
420         {
421           struct variable *src_var = dict_get_var (file->dict, j);
422           struct variable *dst_var = dict_lookup_var (proc.dict,
423                                                       var_get_name (src_var));
424           if (dst_var != NULL)
425             {
426               size_t n = subcase_get_n_fields (&file->src);
427               file->mv[n] = var_get_missing_values (src_var);
428               subcase_add_var (&file->src, src_var, SC_ASCEND);
429               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
430             }
431         }
432     }
433
434   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
435   taint = taint_clone (casewriter_get_taint (proc.output));
436
437   /* Set up case matcher. */
438   proc.matcher = case_matcher_create ();
439   for (i = 0; i < proc.n_files; i++)
440     {
441       struct comb_file *file = &proc.files[i];
442       if (file->reader == NULL)
443         {
444           if (active_file == NULL)
445             {
446               proc_discard_output (ds);
447               file->reader = active_file = proc_open (ds);
448             }
449           else
450             file->reader = casereader_clone (active_file);
451         }
452       if (!file->is_sorted)
453         file->reader = sort_execute (file->reader, &file->by_vars);
454       taint_propagate (casereader_get_taint (file->reader), taint);
455       file->data = casereader_read (file->reader);
456       if (file->type == COMB_FILE)
457         case_matcher_add_input (proc.matcher, &file->by_vars,
458                                 &file->data, &file->is_minimal);
459     }
460
461   if (command == COMB_ADD)
462     execute_add_files (&proc);
463   else if (command == COMB_MATCH)
464     execute_match_files (&proc);
465   else if (command == COMB_UPDATE)
466     execute_update (&proc);
467   else
468     NOT_REACHED ();
469
470   case_matcher_destroy (proc.matcher);
471   proc.matcher = NULL;
472   close_all_comb_files (&proc);
473   if (active_file != NULL)
474     proc_commit (ds);
475
476   dataset_set_dict (ds, proc.dict);
477   dataset_set_source (ds, casewriter_make_reader (proc.output));
478   proc.dict = NULL;
479   proc.output = NULL;
480
481   free_comb_proc (&proc);
482
483   free (first_name);
484   free (last_name);
485
486   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
487
488  error:
489   if (active_file != NULL)
490     proc_commit (ds);
491   free_comb_proc (&proc);
492   taint_destroy (taint);
493   free (first_name);
494   free (last_name);
495   return CMD_CASCADING_FAILURE;
496 }
497
498 /* Merge the dictionary for file F into master dictionary M. */
499 static bool
500 merge_dictionary (struct dictionary *const m, struct comb_file *f)
501 {
502   struct dictionary *d = f->dict;
503   const struct string_array *d_docs, *m_docs;
504   int i;
505
506   if (dict_get_label (m) == NULL)
507     dict_set_label (m, dict_get_label (d));
508
509   d_docs = dict_get_documents (d);
510   m_docs = dict_get_documents (m);
511
512
513   /* FIXME: If the input files have different encodings, then
514      the result is undefined.
515      The correct thing to do would be to convert to an encoding
516      which can cope with all the input files (eg UTF-8).
517    */
518   if ( 0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
519     msg (MW, _("Combining files with incompatible encodings. String data may "
520                "not be represented correctly."));
521
522   if (d_docs != NULL)
523     {
524       if (m_docs == NULL)
525         dict_set_documents (m, d_docs);
526       else
527         {
528           struct string_array new_docs;
529           size_t i;
530
531           new_docs.n = m_docs->n + d_docs->n;
532           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
533           for (i = 0; i < m_docs->n; i++)
534             new_docs.strings[i] = m_docs->strings[i];
535           for (i = 0; i < d_docs->n; i++)
536             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
537
538           dict_set_documents (m, &new_docs);
539
540           free (new_docs.strings);
541         }
542     }
543
544   for (i = 0; i < dict_get_var_cnt (d); i++)
545     {
546       struct variable *dv = dict_get_var (d, i);
547       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
548
549       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
550         continue;
551
552       if (mv != NULL)
553         {
554           if (var_get_width (mv) != var_get_width (dv))
555             {
556               const char *var_name = var_get_name (dv);
557               const char *file_name = fh_get_name (f->handle);
558               struct string s = DS_EMPTY_INITIALIZER;
559               ds_put_format (&s,
560                              _("Variable %s in file %s has different "
561                                "type or width from the same variable in "
562                                "earlier file."),
563                              var_name, file_name);
564               ds_put_cstr (&s, "  ");
565               if (var_is_numeric (dv))
566                 ds_put_format (&s, _("In file %s, %s is numeric."),
567                                file_name, var_name);
568               else
569                 ds_put_format (&s, _("In file %s, %s is a string variable "
570                                      "with width %d."),
571                                file_name, var_name, var_get_width (dv));
572               ds_put_cstr (&s, "  ");
573               if (var_is_numeric (mv))
574                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
575                                var_name);
576               else
577                 ds_put_format (&s, _("In an earlier file, %s was a string "
578                                      "variable with width %d."),
579                                var_name, var_get_width (mv));
580               msg (SE, "%s", ds_cstr (&s));
581               ds_destroy (&s);
582               return false;
583             }
584
585           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
586             var_set_value_labels (mv, var_get_value_labels (dv));
587           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
588             var_set_missing_values (mv, var_get_missing_values (dv));
589           if (var_get_label (dv) && !var_get_label (mv))
590             var_set_label (mv, var_get_label (dv), false);
591         }
592       else
593         mv = dict_clone_var_assert (m, dv);
594     }
595
596   return true;
597 }
598
599 /* If VAR_NAME is non-NULL, attempts to create a
600    variable named VAR_NAME, with format F1.0, in DICT, and stores
601    a pointer to the variable in *VAR.  Returns true if
602    successful, false if the variable name is a duplicate (in
603    which case a message saying that the variable specified on the
604    given SUBCOMMAND is a duplicate is emitted).
605
606    Does nothing and returns true if VAR_NAME is null. */
607 static bool
608 create_flag_var (const char *subcommand, const char *var_name,
609                  struct dictionary *dict, struct variable **var)
610 {
611   if (var_name != NULL)
612     {
613       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
614       *var = dict_create_var (dict, var_name, 0);
615       if (*var == NULL)
616         {
617           msg (SE, _("Variable name %s specified on %s subcommand "
618                      "duplicates an existing variable name."),
619                subcommand, var_name);
620           return false;
621         }
622       var_set_both_formats (*var, &format);
623     }
624   else
625     *var = NULL;
626   return true;
627 }
628
629 /* Closes all the files in PROC and frees their associated data. */
630 static void
631 close_all_comb_files (struct comb_proc *proc)
632 {
633   size_t i;
634
635   for (i = 0; i < proc->n_files; i++)
636     {
637       struct comb_file *file = &proc->files[i];
638       subcase_destroy (&file->by_vars);
639       subcase_destroy (&file->src);
640       subcase_destroy (&file->dst);
641       free (file->mv);
642       fh_unref (file->handle);
643       dict_destroy (file->dict);
644       casereader_destroy (file->reader);
645       case_unref (file->data);
646       free (file->in_name);
647     }
648   free (proc->files);
649   proc->files = NULL;
650   proc->n_files = 0;
651 }
652
653 /* Frees all the data for the procedure. */
654 static void
655 free_comb_proc (struct comb_proc *proc)
656 {
657   close_all_comb_files (proc);
658   dict_destroy (proc->dict);
659   casewriter_destroy (proc->output);
660   case_matcher_destroy (proc->matcher);
661   if (proc->prev_BY)
662     {
663       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
664                                 proc->prev_BY);
665       free (proc->prev_BY);
666     }
667   subcase_destroy (&proc->by_vars);
668   case_unref (proc->buffered_case);
669 }
670 \f
671 static bool scan_table (struct comb_file *, union value by[]);
672 static struct ccase *create_output_case (const struct comb_proc *);
673 static void apply_case (const struct comb_file *, struct ccase *);
674 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
675 static void advance_file (struct comb_file *, union value by[]);
676 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
677 static void output_buffered_case (struct comb_proc *);
678
679 /* Executes the ADD FILES command. */
680 static void
681 execute_add_files (struct comb_proc *proc)
682 {
683   union value *by;
684
685   while (case_matcher_match (proc->matcher, &by))
686     {
687       size_t i;
688
689       for (i = 0; i < proc->n_files; i++)
690         {
691           struct comb_file *file = &proc->files[i];
692           while (file->is_minimal)
693             {
694               struct ccase *output = create_output_case (proc);
695               apply_case (file, output);
696               advance_file (file, by);
697               output_case (proc, output, by);
698             }
699         }
700     }
701   output_buffered_case (proc);
702 }
703
704 /* Executes the MATCH FILES command. */
705 static void
706 execute_match_files (struct comb_proc *proc)
707 {
708   union value *by;
709
710   while (case_matcher_match (proc->matcher, &by))
711     {
712       struct ccase *output;
713       size_t i;
714
715       output = create_output_case (proc);
716       for (i = proc->n_files; i-- > 0; )
717         {
718           struct comb_file *file = &proc->files[i];
719           if (file->type == COMB_FILE)
720             {
721               if (file->is_minimal)
722                 {
723                   apply_case (file, output);
724                   advance_file (file, NULL);
725                 }
726             }
727           else
728             {
729               if (scan_table (file, by))
730                 apply_case (file, output);
731             }
732         }
733       output_case (proc, output, by);
734     }
735   output_buffered_case (proc);
736 }
737
738 /* Executes the UPDATE command. */
739 static void
740 execute_update (struct comb_proc *proc)
741 {
742   union value *by;
743   size_t n_duplicates = 0;
744
745   while (case_matcher_match (proc->matcher, &by))
746     {
747       struct comb_file *first, *file;
748       struct ccase *output;
749
750       /* Find first nonnull case in array and make an output case
751          from it. */
752       output = create_output_case (proc);
753       for (first = &proc->files[0]; ; first++)
754         if (first->is_minimal)
755           break;
756       apply_case (first, output);
757       advance_file (first, by);
758
759       /* Read additional cases and update the output case from
760          them.  (Don't update the output case from any duplicate
761          cases in the master file.) */
762       for (file = first + (first == proc->files);
763            file < &proc->files[proc->n_files]; file++)
764         {
765           while (file->is_minimal)
766             {
767               apply_nonmissing_case (file, output);
768               advance_file (file, by);
769             }
770         }
771       casewriter_write (proc->output, output);
772
773       /* Write duplicate cases in the master file directly to the
774          output.  */
775       if (first == proc->files && first->is_minimal)
776         {
777           n_duplicates++;
778           while (first->is_minimal)
779             {
780               output = create_output_case (proc);
781               apply_case (first, output);
782               advance_file (first, by);
783               casewriter_write (proc->output, output);
784             }
785         }
786     }
787
788   if (n_duplicates)
789     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
790          n_duplicates);
791 }
792
793 /* Reads FILE, which must be of type COMB_TABLE, until it
794    encounters a case with BY or greater for its BY variables.
795    Returns true if a case with exactly BY for its BY variables
796    was found, otherwise false. */
797 static bool
798 scan_table (struct comb_file *file, union value by[])
799 {
800   while (file->data != NULL)
801     {
802       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
803       if (cmp > 0)
804         {
805           case_unref (file->data);
806           file->data = casereader_read (file->reader);
807         }
808       else
809         return cmp == 0;
810     }
811   return false;
812 }
813
814 /* Creates and returns an output case for PROC, initializing each
815    of its values to system-missing or blanks, except that the
816    values of IN variables are set to 0. */
817 static struct ccase *
818 create_output_case (const struct comb_proc *proc)
819 {
820   size_t n_vars = dict_get_var_cnt (proc->dict);
821   struct ccase *output;
822   size_t i;
823
824   output = case_create (dict_get_proto (proc->dict));
825   for (i = 0; i < n_vars; i++)
826     {
827       struct variable *v = dict_get_var (proc->dict, i);
828       value_set_missing (case_data_rw (output, v), var_get_width (v));
829     }
830   for (i = 0; i < proc->n_files; i++)
831     {
832       struct comb_file *file = &proc->files[i];
833       if (file->in_var != NULL)
834         case_data_rw (output, file->in_var)->f = false;
835     }
836   return output;
837 }
838
839 static void
840 mark_file_used (const struct comb_file *file, struct ccase *output)
841 {
842   if (file->in_var != NULL)
843     case_data_rw (output, file->in_var)->f = true;
844 }
845
846 /* Copies the data from FILE's case into output case OUTPUT.
847    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
848 static void
849 apply_case (const struct comb_file *file, struct ccase *output)
850 {
851   subcase_copy (&file->src, file->data, &file->dst, output);
852   mark_file_used (file, output);
853 }
854
855 /* Copies the data from FILE's case into output case OUTPUT,
856    skipping values that are missing or all spaces.
857
858    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
859 static void
860 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
861 {
862   size_t i;
863
864   for (i = 0; i < subcase_get_n_fields (&file->src); i++)
865     {
866       const struct subcase_field *src_field = &file->src.fields[i];
867       const struct subcase_field *dst_field = &file->dst.fields[i];
868       const union value *src_value
869         = case_data_idx (file->data, src_field->case_index);
870       int width = src_field->width;
871
872       if (!mv_is_value_missing (file->mv[i], src_value, MV_ANY)
873           && !(width > 0 && value_is_spaces (src_value, width)))
874         value_copy (case_data_rw_idx (output, dst_field->case_index),
875                     src_value, width);
876     }
877   mark_file_used (file, output);
878 }
879
880 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
881    member is updated based on whether the new case's BY values still match
882    those in BY. */
883 static void
884 advance_file (struct comb_file *file, union value by[])
885 {
886   case_unref (file->data);
887   file->data = casereader_read (file->reader);
888   if (by)
889     file->is_minimal = (file->data != NULL
890                         && subcase_equal_cx (&file->by_vars, file->data, by));
891 }
892
893 /* Writes OUTPUT, whose BY values has been extracted into BY, to
894    PROC's output file, first initializing any FIRST or LAST
895    variables in OUTPUT to the correct values. */
896 static void
897 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
898 {
899   if (proc->first == NULL && proc->last == NULL)
900     casewriter_write (proc->output, output);
901   else
902     {
903       /* It's harder with LAST, because we can't know whether
904          this case is the last in a group until we've prepared
905          the *next* case also.  Thus, we buffer the previous
906          output case until the next one is ready. */
907       bool new_BY;
908       if (proc->prev_BY != NULL)
909         {
910           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
911           if (proc->last != NULL)
912             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
913           casewriter_write (proc->output, proc->buffered_case);
914         }
915       else
916         new_BY = true;
917
918       proc->buffered_case = output;
919       if (proc->first != NULL)
920         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
921
922       if (new_BY)
923         {
924           size_t n_values = subcase_get_n_fields (&proc->by_vars);
925           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
926           if (proc->prev_BY == NULL)
927             {
928               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
929               caseproto_init_values (proto, proc->prev_BY);
930             }
931           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
932                           proc->prev_BY, by);
933         }
934     }
935 }
936
937 /* Writes a trailing buffered case to the output, if FIRST or
938    LAST is in use. */
939 static void
940 output_buffered_case (struct comb_proc *proc)
941 {
942   if (proc->prev_BY != NULL)
943     {
944       if (proc->last != NULL)
945         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
946       casewriter_write (proc->output, proc->buffered_case);
947       proc->buffered_case = NULL;
948     }
949 }