6eb9a3181a5f741281f480e9254baf4e48111130
[pspp] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/xalloc.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 enum comb_command_type
50   {
51     COMB_ADD,
52     COMB_MATCH,
53     COMB_UPDATE
54   };
55
56 /* File types. */
57 enum comb_file_type
58   {
59     COMB_FILE,                  /* Specified on FILE= subcommand. */
60     COMB_TABLE                  /* Specified on TABLE= subcommand. */
61   };
62
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
65   {
66     /* Basics. */
67     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
68
69     /* Variables. */
70     struct subcase by_vars;     /* BY variables in this input file. */
71     struct subcase src, dst;    /* Data to copy to output; where to put it. */
72     const struct missing_values **mv; /* Each variable's missing values. */
73
74     /* Input files. */
75     struct file_handle *handle; /* Input file handle. */
76     struct dictionary *dict;    /* Input file dictionary. */
77     struct casereader *reader;  /* Input data source. */
78     struct ccase *data;         /* The current input case. */
79     bool is_minimal;            /* Does 'data' have minimum BY values across
80                                    all input files? */
81     bool is_sorted;             /* Is file presorted on the BY variables? */
82
83     /* IN subcommand. */
84     char *in_name;
85     struct variable *in_var;
86   };
87
88 struct comb_proc
89   {
90     struct comb_file *files;    /* All the files being merged. */
91     size_t n_files;             /* Number of files. */
92
93     struct dictionary *dict;    /* Dictionary of output file. */
94     struct subcase by_vars;     /* BY variables in the output. */
95     struct casewriter *output;  /* Destination for output. */
96
97     struct case_matcher *matcher;
98
99     /* FIRST, LAST.
100        Only if "first" or "last" is nonnull are the remaining
101        members used. */
102     struct variable *first;     /* Variable specified on FIRST (if any). */
103     struct variable *last;      /* Variable specified on LAST (if any). */
104     struct ccase *buffered_case; /* Case ready for output except that we don't
105                                     know the value for the LAST var yet. */
106     union value *prev_BY;       /* Values of BY vars in buffered_case. */
107   };
108
109 static int combine_files (enum comb_command_type, struct lexer *,
110                           struct dataset *);
111 static void free_comb_proc (struct comb_proc *);
112
113 static void close_all_comb_files (struct comb_proc *);
114 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
115
116 static void execute_update (struct comb_proc *);
117 static void execute_match_files (struct comb_proc *);
118 static void execute_add_files (struct comb_proc *);
119
120 static bool create_flag_var (const char *subcommand_name, const char *var_name,
121                              struct dictionary *, struct variable **);
122 static void output_case (struct comb_proc *, struct ccase *, union value *by);
123 static void output_buffered_case (struct comb_proc *);
124
125 int
126 cmd_add_files (struct lexer *lexer, struct dataset *ds)
127 {
128   return combine_files (COMB_ADD, lexer, ds);
129 }
130
131 int
132 cmd_match_files (struct lexer *lexer, struct dataset *ds)
133 {
134   return combine_files (COMB_MATCH, lexer, ds);
135 }
136
137 int
138 cmd_update (struct lexer *lexer, struct dataset *ds)
139 {
140   return combine_files (COMB_UPDATE, lexer, ds);
141 }
142
143 static int
144 combine_files (enum comb_command_type command,
145                struct lexer *lexer, struct dataset *ds)
146 {
147   struct comb_proc proc;
148
149   bool saw_by = false;
150   bool saw_sort = false;
151   struct casereader *active_file = NULL;
152
153   char *first_name = NULL;
154   char *last_name = NULL;
155
156   struct taint *taint = NULL;
157
158   size_t n_tables = 0;
159   size_t allocated_files = 0;
160
161   size_t i;
162
163   proc.files = NULL;
164   proc.n_files = 0;
165   proc.dict = dict_create (get_default_encoding ());
166   proc.output = NULL;
167   proc.matcher = NULL;
168   subcase_init_empty (&proc.by_vars);
169   proc.first = NULL;
170   proc.last = NULL;
171   proc.buffered_case = NULL;
172   proc.prev_BY = NULL;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       struct comb_file *file;
180       enum comb_file_type type;
181
182       if (lex_match_id (lexer, "FILE"))
183         type = COMB_FILE;
184       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
185         {
186           type = COMB_TABLE;
187           n_tables++;
188         }
189       else
190         break;
191       lex_match (lexer, T_EQUALS);
192
193       if (proc.n_files >= allocated_files)
194         proc.files = x2nrealloc (proc.files, &allocated_files,
195                                 sizeof *proc.files);
196       file = &proc.files[proc.n_files++];
197       file->type = type;
198       subcase_init_empty (&file->by_vars);
199       subcase_init_empty (&file->src);
200       subcase_init_empty (&file->dst);
201       file->mv = NULL;
202       file->handle = NULL;
203       file->dict = NULL;
204       file->reader = NULL;
205       file->data = NULL;
206       file->is_sorted = true;
207       file->in_name = NULL;
208       file->in_var = NULL;
209
210       if (lex_match (lexer, T_ASTERISK))
211         {
212           if (!dataset_has_source (ds))
213             {
214               msg (SE, _("Cannot specify the active dataset since none "
215                          "has been defined."));
216               goto error;
217             }
218
219           if (proc_make_temporary_transformations_permanent (ds))
220             msg (SE, _("This command may not be used after TEMPORARY when "
221                        "the active dataset is an input source.  "
222                        "Temporary transformations will be made permanent."));
223
224           file->dict = dict_clone (dataset_dict (ds));
225         }
226       else
227         {
228           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
229           if (file->handle == NULL)
230             goto error;
231
232           file->reader = any_reader_open (file->handle, NULL, &file->dict);
233           if (file->reader == NULL)
234             goto error;
235         }
236
237       while (lex_match (lexer, T_SLASH))
238         if (lex_match_id (lexer, "RENAME"))
239           {
240             if (!parse_dict_rename (lexer, file->dict))
241               goto error;
242           }
243         else if (lex_match_id (lexer, "IN"))
244           {
245             lex_match (lexer, T_EQUALS);
246             if (lex_token (lexer) != T_ID)
247               {
248                 lex_error (lexer, NULL);
249                 goto error;
250               }
251
252             if (file->in_name)
253               {
254                 msg (SE, _("Multiple IN subcommands for a single FILE or "
255                            "TABLE."));
256                 goto error;
257               }
258             file->in_name = xstrdup (lex_tokcstr (lexer));
259             lex_get (lexer);
260           }
261         else if (lex_match_id (lexer, "SORT"))
262           {
263             file->is_sorted = false;
264             saw_sort = true;
265           }
266
267       if (!merge_dictionary (proc.dict, file))
268         goto error;
269     }
270
271   while (lex_token (lexer) != T_ENDCMD)
272     {
273       if (lex_match (lexer, T_BY))
274         {
275           const struct variable **by_vars;
276           size_t i;
277           bool ok;
278
279           if (saw_by)
280             {
281               lex_sbc_only_once ("BY");
282               goto error;
283             }
284           saw_by = true;
285
286           lex_match (lexer, T_EQUALS);
287           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
288                                     &by_vars, NULL))
289             goto error;
290
291           ok = true;
292           for (i = 0; i < proc.n_files; i++)
293             {
294               struct comb_file *file = &proc.files[i];
295               size_t j;
296
297               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
298                 {
299                   const char *name = var_get_name (by_vars[j]);
300                   struct variable *var = dict_lookup_var (file->dict, name);
301                   if (var != NULL)
302                     subcase_add_var (&file->by_vars, var,
303                                      subcase_get_direction (&proc.by_vars, j));
304                   else
305                     {
306                       if (file->handle != NULL)
307                         msg (SE, _("File %s lacks BY variable %s."),
308                              fh_get_name (file->handle), name);
309                       else
310                         msg (SE, _("Active dataset lacks BY variable %s."),
311                              name);
312                       ok = false;
313                     }
314                 }
315               assert (!ok || subcase_conformable (&file->by_vars,
316                                                   &proc.files[0].by_vars));
317             }
318           free (by_vars);
319
320           if (!ok)
321             goto error;
322         }
323       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
324         {
325           if (first_name != NULL)
326             {
327               lex_sbc_only_once ("FIRST");
328               goto error;
329             }
330
331           lex_match (lexer, T_EQUALS);
332           if (!lex_force_id (lexer))
333             goto error;
334           first_name = xstrdup (lex_tokcstr (lexer));
335           lex_get (lexer);
336         }
337       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
338         {
339           if (last_name != NULL)
340             {
341               lex_sbc_only_once ("LAST");
342               goto error;
343             }
344
345           lex_match (lexer, T_EQUALS);
346           if (!lex_force_id (lexer))
347             goto error;
348           last_name = xstrdup (lex_tokcstr (lexer));
349           lex_get (lexer);
350         }
351       else if (lex_match_id (lexer, "MAP"))
352         {
353           /* FIXME. */
354         }
355       else if (lex_match_id (lexer, "DROP"))
356         {
357           if (!parse_dict_drop (lexer, proc.dict))
358             goto error;
359         }
360       else if (lex_match_id (lexer, "KEEP"))
361         {
362           if (!parse_dict_keep (lexer, proc.dict))
363             goto error;
364         }
365       else
366         {
367           lex_error (lexer, NULL);
368           goto error;
369         }
370
371       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
372         {
373           lex_end_of_command (lexer);
374           goto error;
375         }
376     }
377
378   if (!saw_by)
379     {
380       if (command == COMB_UPDATE)
381         {
382           lex_sbc_missing ("BY");
383           goto error;
384         }
385       if (n_tables)
386         {
387           msg (SE, _("BY is required when %s is specified."), "TABLE");
388           goto error;
389         }
390       if (saw_sort)
391         {
392           msg (SE, _("BY is required when %s is specified."), "SORT");
393           goto error;
394         }
395     }
396
397   /* Add IN, FIRST, and LAST variables to master dictionary. */
398   for (i = 0; i < proc.n_files; i++)
399     {
400       struct comb_file *file = &proc.files[i];
401       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
402         goto error;
403     }
404   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
405       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
406     goto error;
407
408   dict_delete_scratch_vars (proc.dict);
409   dict_compact_values (proc.dict);
410
411   /* Set up mapping from each file's variables to master
412      variables. */
413   for (i = 0; i < proc.n_files; i++)
414     {
415       struct comb_file *file = &proc.files[i];
416       size_t src_var_cnt = dict_get_var_cnt (file->dict);
417       size_t j;
418
419       file->mv = xnmalloc (src_var_cnt, sizeof *file->mv);
420       for (j = 0; j < src_var_cnt; j++)
421         {
422           struct variable *src_var = dict_get_var (file->dict, j);
423           struct variable *dst_var = dict_lookup_var (proc.dict,
424                                                       var_get_name (src_var));
425           if (dst_var != NULL)
426             {
427               size_t n = subcase_get_n_fields (&file->src);
428               file->mv[n] = var_get_missing_values (src_var);
429               subcase_add_var (&file->src, src_var, SC_ASCEND);
430               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
431             }
432         }
433     }
434
435   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
436   taint = taint_clone (casewriter_get_taint (proc.output));
437
438   /* Set up case matcher. */
439   proc.matcher = case_matcher_create ();
440   for (i = 0; i < proc.n_files; i++)
441     {
442       struct comb_file *file = &proc.files[i];
443       if (file->reader == NULL)
444         {
445           if (active_file == NULL)
446             {
447               proc_discard_output (ds);
448               file->reader = active_file = proc_open_filtering (ds, false);
449             }
450           else
451             file->reader = casereader_clone (active_file);
452         }
453       if (!file->is_sorted)
454         file->reader = sort_execute (file->reader, &file->by_vars);
455       taint_propagate (casereader_get_taint (file->reader), taint);
456       file->data = casereader_read (file->reader);
457       if (file->type == COMB_FILE)
458         case_matcher_add_input (proc.matcher, &file->by_vars,
459                                 &file->data, &file->is_minimal);
460     }
461
462   if (command == COMB_ADD)
463     execute_add_files (&proc);
464   else if (command == COMB_MATCH)
465     execute_match_files (&proc);
466   else if (command == COMB_UPDATE)
467     execute_update (&proc);
468   else
469     NOT_REACHED ();
470
471   case_matcher_destroy (proc.matcher);
472   proc.matcher = NULL;
473   close_all_comb_files (&proc);
474   if (active_file != NULL)
475     proc_commit (ds);
476
477   dataset_set_dict (ds, proc.dict);
478   dataset_set_source (ds, casewriter_make_reader (proc.output));
479   proc.dict = NULL;
480   proc.output = NULL;
481
482   free_comb_proc (&proc);
483
484   free (first_name);
485   free (last_name);
486
487   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
488
489  error:
490   if (active_file != NULL)
491     proc_commit (ds);
492   free_comb_proc (&proc);
493   taint_destroy (taint);
494   free (first_name);
495   free (last_name);
496   return CMD_CASCADING_FAILURE;
497 }
498
499 /* Merge the dictionary for file F into master dictionary M. */
500 static bool
501 merge_dictionary (struct dictionary *const m, struct comb_file *f)
502 {
503   struct dictionary *d = f->dict;
504   const struct string_array *d_docs, *m_docs;
505   int i;
506
507   if (dict_get_label (m) == NULL)
508     dict_set_label (m, dict_get_label (d));
509
510   d_docs = dict_get_documents (d);
511   m_docs = dict_get_documents (m);
512
513
514   /* FIXME: If the input files have different encodings, then
515      the result is undefined.
516      The correct thing to do would be to convert to an encoding
517      which can cope with all the input files (eg UTF-8).
518    */
519   if ( 0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
520     msg (MW, _("Combining files with incompatible encodings. String data may "
521                "not be represented correctly."));
522
523   if (d_docs != NULL)
524     {
525       if (m_docs == NULL)
526         dict_set_documents (m, d_docs);
527       else
528         {
529           struct string_array new_docs;
530           size_t i;
531
532           new_docs.n = m_docs->n + d_docs->n;
533           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
534           for (i = 0; i < m_docs->n; i++)
535             new_docs.strings[i] = m_docs->strings[i];
536           for (i = 0; i < d_docs->n; i++)
537             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
538
539           dict_set_documents (m, &new_docs);
540
541           free (new_docs.strings);
542         }
543     }
544
545   for (i = 0; i < dict_get_var_cnt (d); i++)
546     {
547       struct variable *dv = dict_get_var (d, i);
548       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
549
550       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
551         continue;
552
553       if (mv != NULL)
554         {
555           if (var_get_width (mv) != var_get_width (dv))
556             {
557               const char *var_name = var_get_name (dv);
558               struct string s = DS_EMPTY_INITIALIZER;
559               const char *file_name;
560
561               file_name = f->handle ? fh_get_name (f->handle) : "*";
562               ds_put_format (&s,
563                              _("Variable %s in file %s has different "
564                                "type or width from the same variable in "
565                                "earlier file."),
566                              var_name, file_name);
567               ds_put_cstr (&s, "  ");
568               if (var_is_numeric (dv))
569                 ds_put_format (&s, _("In file %s, %s is numeric."),
570                                file_name, var_name);
571               else
572                 ds_put_format (&s, _("In file %s, %s is a string variable "
573                                      "with width %d."),
574                                file_name, var_name, var_get_width (dv));
575               ds_put_cstr (&s, "  ");
576               if (var_is_numeric (mv))
577                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
578                                var_name);
579               else
580                 ds_put_format (&s, _("In an earlier file, %s was a string "
581                                      "variable with width %d."),
582                                var_name, var_get_width (mv));
583               msg (SE, "%s", ds_cstr (&s));
584               ds_destroy (&s);
585               return false;
586             }
587
588           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
589             var_set_value_labels (mv, var_get_value_labels (dv));
590           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
591             var_set_missing_values (mv, var_get_missing_values (dv));
592           if (var_get_label (dv) && !var_get_label (mv))
593             var_set_label (mv, var_get_label (dv));
594         }
595       else
596         mv = dict_clone_var_assert (m, dv);
597     }
598
599   return true;
600 }
601
602 /* If VAR_NAME is non-NULL, attempts to create a
603    variable named VAR_NAME, with format F1.0, in DICT, and stores
604    a pointer to the variable in *VAR.  Returns true if
605    successful, false if the variable name is a duplicate (in
606    which case a message saying that the variable specified on the
607    given SUBCOMMAND is a duplicate is emitted).
608
609    Does nothing and returns true if VAR_NAME is null. */
610 static bool
611 create_flag_var (const char *subcommand, const char *var_name,
612                  struct dictionary *dict, struct variable **var)
613 {
614   if (var_name != NULL)
615     {
616       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
617       *var = dict_create_var (dict, var_name, 0);
618       if (*var == NULL)
619         {
620           msg (SE, _("Variable name %s specified on %s subcommand "
621                      "duplicates an existing variable name."),
622                subcommand, var_name);
623           return false;
624         }
625       var_set_both_formats (*var, &format);
626     }
627   else
628     *var = NULL;
629   return true;
630 }
631
632 /* Closes all the files in PROC and frees their associated data. */
633 static void
634 close_all_comb_files (struct comb_proc *proc)
635 {
636   size_t i;
637
638   for (i = 0; i < proc->n_files; i++)
639     {
640       struct comb_file *file = &proc->files[i];
641       subcase_destroy (&file->by_vars);
642       subcase_destroy (&file->src);
643       subcase_destroy (&file->dst);
644       free (file->mv);
645       fh_unref (file->handle);
646       dict_destroy (file->dict);
647       casereader_destroy (file->reader);
648       case_unref (file->data);
649       free (file->in_name);
650     }
651   free (proc->files);
652   proc->files = NULL;
653   proc->n_files = 0;
654 }
655
656 /* Frees all the data for the procedure. */
657 static void
658 free_comb_proc (struct comb_proc *proc)
659 {
660   close_all_comb_files (proc);
661   dict_destroy (proc->dict);
662   casewriter_destroy (proc->output);
663   case_matcher_destroy (proc->matcher);
664   if (proc->prev_BY)
665     {
666       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
667                                 proc->prev_BY);
668       free (proc->prev_BY);
669     }
670   subcase_destroy (&proc->by_vars);
671   case_unref (proc->buffered_case);
672 }
673 \f
674 static bool scan_table (struct comb_file *, union value by[]);
675 static struct ccase *create_output_case (const struct comb_proc *);
676 static void apply_case (const struct comb_file *, struct ccase *);
677 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
678 static void advance_file (struct comb_file *, union value by[]);
679 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
680 static void output_buffered_case (struct comb_proc *);
681
682 /* Executes the ADD FILES command. */
683 static void
684 execute_add_files (struct comb_proc *proc)
685 {
686   union value *by;
687
688   while (case_matcher_match (proc->matcher, &by))
689     {
690       size_t i;
691
692       for (i = 0; i < proc->n_files; i++)
693         {
694           struct comb_file *file = &proc->files[i];
695           while (file->is_minimal)
696             {
697               struct ccase *output = create_output_case (proc);
698               apply_case (file, output);
699               advance_file (file, by);
700               output_case (proc, output, by);
701             }
702         }
703     }
704   output_buffered_case (proc);
705 }
706
707 /* Executes the MATCH FILES command. */
708 static void
709 execute_match_files (struct comb_proc *proc)
710 {
711   union value *by;
712
713   while (case_matcher_match (proc->matcher, &by))
714     {
715       struct ccase *output;
716       size_t i;
717
718       output = create_output_case (proc);
719       for (i = proc->n_files; i-- > 0; )
720         {
721           struct comb_file *file = &proc->files[i];
722           if (file->type == COMB_FILE)
723             {
724               if (file->is_minimal)
725                 {
726                   apply_case (file, output);
727                   advance_file (file, NULL);
728                 }
729             }
730           else
731             {
732               if (scan_table (file, by))
733                 apply_case (file, output);
734             }
735         }
736       output_case (proc, output, by);
737     }
738   output_buffered_case (proc);
739 }
740
741 /* Executes the UPDATE command. */
742 static void
743 execute_update (struct comb_proc *proc)
744 {
745   union value *by;
746   size_t n_duplicates = 0;
747
748   while (case_matcher_match (proc->matcher, &by))
749     {
750       struct comb_file *first, *file;
751       struct ccase *output;
752
753       /* Find first nonnull case in array and make an output case
754          from it. */
755       output = create_output_case (proc);
756       for (first = &proc->files[0]; ; first++)
757         if (first->is_minimal)
758           break;
759       apply_case (first, output);
760       advance_file (first, by);
761
762       /* Read additional cases and update the output case from
763          them.  (Don't update the output case from any duplicate
764          cases in the master file.) */
765       for (file = first + (first == proc->files);
766            file < &proc->files[proc->n_files]; file++)
767         {
768           while (file->is_minimal)
769             {
770               apply_nonmissing_case (file, output);
771               advance_file (file, by);
772             }
773         }
774       casewriter_write (proc->output, output);
775
776       /* Write duplicate cases in the master file directly to the
777          output.  */
778       if (first == proc->files && first->is_minimal)
779         {
780           n_duplicates++;
781           while (first->is_minimal)
782             {
783               output = create_output_case (proc);
784               apply_case (first, output);
785               advance_file (first, by);
786               casewriter_write (proc->output, output);
787             }
788         }
789     }
790
791   if (n_duplicates)
792     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
793          n_duplicates);
794 }
795
796 /* Reads FILE, which must be of type COMB_TABLE, until it
797    encounters a case with BY or greater for its BY variables.
798    Returns true if a case with exactly BY for its BY variables
799    was found, otherwise false. */
800 static bool
801 scan_table (struct comb_file *file, union value by[])
802 {
803   while (file->data != NULL)
804     {
805       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
806       if (cmp > 0)
807         {
808           case_unref (file->data);
809           file->data = casereader_read (file->reader);
810         }
811       else
812         return cmp == 0;
813     }
814   return false;
815 }
816
817 /* Creates and returns an output case for PROC, initializing each
818    of its values to system-missing or blanks, except that the
819    values of IN variables are set to 0. */
820 static struct ccase *
821 create_output_case (const struct comb_proc *proc)
822 {
823   size_t n_vars = dict_get_var_cnt (proc->dict);
824   struct ccase *output;
825   size_t i;
826
827   output = case_create (dict_get_proto (proc->dict));
828   for (i = 0; i < n_vars; i++)
829     {
830       struct variable *v = dict_get_var (proc->dict, i);
831       value_set_missing (case_data_rw (output, v), var_get_width (v));
832     }
833   for (i = 0; i < proc->n_files; i++)
834     {
835       struct comb_file *file = &proc->files[i];
836       if (file->in_var != NULL)
837         case_data_rw (output, file->in_var)->f = false;
838     }
839   return output;
840 }
841
842 static void
843 mark_file_used (const struct comb_file *file, struct ccase *output)
844 {
845   if (file->in_var != NULL)
846     case_data_rw (output, file->in_var)->f = true;
847 }
848
849 /* Copies the data from FILE's case into output case OUTPUT.
850    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
851 static void
852 apply_case (const struct comb_file *file, struct ccase *output)
853 {
854   subcase_copy (&file->src, file->data, &file->dst, output);
855   mark_file_used (file, output);
856 }
857
858 /* Copies the data from FILE's case into output case OUTPUT,
859    skipping values that are missing or all spaces.
860
861    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
862 static void
863 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
864 {
865   size_t i;
866
867   for (i = 0; i < subcase_get_n_fields (&file->src); i++)
868     {
869       const struct subcase_field *src_field = &file->src.fields[i];
870       const struct subcase_field *dst_field = &file->dst.fields[i];
871       const union value *src_value
872         = case_data_idx (file->data, src_field->case_index);
873       int width = src_field->width;
874
875       if (!mv_is_value_missing (file->mv[i], src_value, MV_ANY)
876           && !(width > 0 && value_is_spaces (src_value, width)))
877         value_copy (case_data_rw_idx (output, dst_field->case_index),
878                     src_value, width);
879     }
880   mark_file_used (file, output);
881 }
882
883 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
884    member is updated based on whether the new case's BY values still match
885    those in BY. */
886 static void
887 advance_file (struct comb_file *file, union value by[])
888 {
889   case_unref (file->data);
890   file->data = casereader_read (file->reader);
891   if (by)
892     file->is_minimal = (file->data != NULL
893                         && subcase_equal_cx (&file->by_vars, file->data, by));
894 }
895
896 /* Writes OUTPUT, whose BY values has been extracted into BY, to
897    PROC's output file, first initializing any FIRST or LAST
898    variables in OUTPUT to the correct values. */
899 static void
900 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
901 {
902   if (proc->first == NULL && proc->last == NULL)
903     casewriter_write (proc->output, output);
904   else
905     {
906       /* It's harder with LAST, because we can't know whether
907          this case is the last in a group until we've prepared
908          the *next* case also.  Thus, we buffer the previous
909          output case until the next one is ready. */
910       bool new_BY;
911       if (proc->prev_BY != NULL)
912         {
913           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
914           if (proc->last != NULL)
915             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
916           casewriter_write (proc->output, proc->buffered_case);
917         }
918       else
919         new_BY = true;
920
921       proc->buffered_case = output;
922       if (proc->first != NULL)
923         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
924
925       if (new_BY)
926         {
927           size_t n_values = subcase_get_n_fields (&proc->by_vars);
928           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
929           if (proc->prev_BY == NULL)
930             {
931               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
932               caseproto_init_values (proto, proc->prev_BY);
933             }
934           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
935                           proc->prev_BY, by);
936         }
937     }
938 }
939
940 /* Writes a trailing buffered case to the output, if FIRST or
941    LAST is in use. */
942 static void
943 output_buffered_case (struct comb_proc *proc)
944 {
945   if (proc->prev_BY != NULL)
946     {
947       if (proc->last != NULL)
948         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
949       casewriter_write (proc->output, proc->buffered_case);
950       proc->buffered_case = NULL;
951     }
952 }