refactoring
[pspp] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/xalloc.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 enum comb_command_type
50   {
51     COMB_ADD,
52     COMB_MATCH,
53     COMB_UPDATE
54   };
55
56 /* File types. */
57 enum comb_file_type
58   {
59     COMB_FILE,                  /* Specified on FILE= subcommand. */
60     COMB_TABLE                  /* Specified on TABLE= subcommand. */
61   };
62
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
65   {
66     /* Basics. */
67     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
68
69     /* Variables. */
70     struct subcase by_vars;     /* BY variables in this input file. */
71     struct subcase src, dst;    /* Data to copy to output; where to put it. */
72     const struct missing_values **mv; /* Each variable's missing values. */
73
74     /* Input files. */
75     struct file_handle *handle; /* Input file handle. */
76     struct dictionary *dict;    /* Input file dictionary. */
77     struct casereader *reader;  /* Input data source. */
78     struct ccase *data;         /* The current input case. */
79     bool is_minimal;            /* Does 'data' have minimum BY values across
80                                    all input files? */
81     bool is_sorted;             /* Is file presorted on the BY variables? */
82
83     /* IN subcommand. */
84     char *in_name;
85     struct variable *in_var;
86   };
87
88 struct comb_proc
89   {
90     struct comb_file *files;    /* All the files being merged. */
91     size_t n_files;             /* Number of files. */
92
93     struct dictionary *dict;    /* Dictionary of output file. */
94     struct subcase by_vars;     /* BY variables in the output. */
95     struct casewriter *output;  /* Destination for output. */
96
97     struct case_matcher *matcher;
98
99     /* FIRST, LAST.
100        Only if "first" or "last" is nonnull are the remaining
101        members used. */
102     struct variable *first;     /* Variable specified on FIRST (if any). */
103     struct variable *last;      /* Variable specified on LAST (if any). */
104     struct ccase *buffered_case; /* Case ready for output except that we don't
105                                     know the value for the LAST var yet. */
106     union value *prev_BY;       /* Values of BY vars in buffered_case. */
107   };
108
109 static int combine_files (enum comb_command_type, struct lexer *,
110                           struct dataset *);
111 static void free_comb_proc (struct comb_proc *);
112
113 static void close_all_comb_files (struct comb_proc *);
114 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
115
116 static void execute_update (struct comb_proc *);
117 static void execute_match_files (struct comb_proc *);
118 static void execute_add_files (struct comb_proc *);
119
120 static bool create_flag_var (const char *subcommand_name, const char *var_name,
121                              struct dictionary *, struct variable **);
122 static void output_case (struct comb_proc *, struct ccase *, union value *by);
123 static void output_buffered_case (struct comb_proc *);
124
125 int
126 cmd_add_files (struct lexer *lexer, struct dataset *ds)
127 {
128   return combine_files (COMB_ADD, lexer, ds);
129 }
130
131 int
132 cmd_match_files (struct lexer *lexer, struct dataset *ds)
133 {
134   return combine_files (COMB_MATCH, lexer, ds);
135 }
136
137 int
138 cmd_update (struct lexer *lexer, struct dataset *ds)
139 {
140   return combine_files (COMB_UPDATE, lexer, ds);
141 }
142
143 static int
144 combine_files (enum comb_command_type command,
145                struct lexer *lexer, struct dataset *ds)
146 {
147   struct comb_proc proc;
148
149   bool saw_by = false;
150   bool saw_sort = false;
151   struct casereader *active_file = NULL;
152
153   char *first_name = NULL;
154   char *last_name = NULL;
155
156   struct taint *taint = NULL;
157
158   size_t n_tables = 0;
159   size_t allocated_files = 0;
160
161   size_t i;
162
163   proc.files = NULL;
164   proc.n_files = 0;
165   proc.dict = dict_create (get_default_encoding ());
166   proc.output = NULL;
167   proc.matcher = NULL;
168   subcase_init_empty (&proc.by_vars);
169   proc.first = NULL;
170   proc.last = NULL;
171   proc.buffered_case = NULL;
172   proc.prev_BY = NULL;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       struct comb_file *file;
180       enum comb_file_type type;
181
182       if (lex_match_id (lexer, "FILE"))
183         type = COMB_FILE;
184       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
185         {
186           type = COMB_TABLE;
187           n_tables++;
188         }
189       else
190         break;
191       lex_match (lexer, T_EQUALS);
192
193       if (proc.n_files >= allocated_files)
194         proc.files = x2nrealloc (proc.files, &allocated_files,
195                                 sizeof *proc.files);
196       file = &proc.files[proc.n_files++];
197       file->type = type;
198       subcase_init_empty (&file->by_vars);
199       subcase_init_empty (&file->src);
200       subcase_init_empty (&file->dst);
201       file->mv = NULL;
202       file->handle = NULL;
203       file->dict = NULL;
204       file->reader = NULL;
205       file->data = NULL;
206       file->is_sorted = true;
207       file->in_name = NULL;
208       file->in_var = NULL;
209
210       if (lex_match (lexer, T_ASTERISK))
211         {
212           if (!dataset_has_source (ds))
213             {
214               msg (SE, _("Cannot specify the active dataset since none "
215                          "has been defined."));
216               goto error;
217             }
218
219           if (proc_make_temporary_transformations_permanent (ds))
220             msg (SE, _("This command may not be used after TEMPORARY when "
221                        "the active dataset is an input source.  "
222                        "Temporary transformations will be made permanent."));
223
224           file->dict = dict_clone (dataset_dict (ds));
225         }
226       else
227         {
228           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
229           if (file->handle == NULL)
230             goto error;
231
232           file->reader = any_reader_open_and_decode (file->handle, NULL,
233                                                      &file->dict, NULL);
234           if (file->reader == NULL)
235             goto error;
236         }
237
238       while (lex_match (lexer, T_SLASH))
239         if (lex_match_id (lexer, "RENAME"))
240           {
241             if (!parse_dict_rename (lexer, file->dict, false))
242               goto error;
243           }
244         else if (lex_match_id (lexer, "IN"))
245           {
246             lex_match (lexer, T_EQUALS);
247             if (lex_token (lexer) != T_ID)
248               {
249                 lex_error (lexer, NULL);
250                 goto error;
251               }
252
253             if (file->in_name)
254               {
255                 msg (SE, _("Multiple IN subcommands for a single FILE or "
256                            "TABLE."));
257                 goto error;
258               }
259             file->in_name = xstrdup (lex_tokcstr (lexer));
260             lex_get (lexer);
261           }
262         else if (lex_match_id (lexer, "SORT"))
263           {
264             file->is_sorted = false;
265             saw_sort = true;
266           }
267
268       if (!merge_dictionary (proc.dict, file))
269         goto error;
270     }
271
272   while (lex_token (lexer) != T_ENDCMD)
273     {
274       if (lex_match (lexer, T_BY))
275         {
276           const struct variable **by_vars;
277           size_t i;
278           bool ok;
279
280           if (saw_by)
281             {
282               lex_sbc_only_once ("BY");
283               goto error;
284             }
285           saw_by = true;
286
287           lex_match (lexer, T_EQUALS);
288           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
289                                     &by_vars, NULL))
290             goto error;
291
292           ok = true;
293           for (i = 0; i < proc.n_files; i++)
294             {
295               struct comb_file *file = &proc.files[i];
296               size_t j;
297
298               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
299                 {
300                   const char *name = var_get_name (by_vars[j]);
301                   struct variable *var = dict_lookup_var (file->dict, name);
302                   if (var != NULL)
303                     subcase_add_var (&file->by_vars, var,
304                                      subcase_get_direction (&proc.by_vars, j));
305                   else
306                     {
307                       if (file->handle != NULL)
308                         msg (SE, _("File %s lacks BY variable %s."),
309                              fh_get_name (file->handle), name);
310                       else
311                         msg (SE, _("Active dataset lacks BY variable %s."),
312                              name);
313                       ok = false;
314                     }
315                 }
316               assert (!ok || subcase_conformable (&file->by_vars,
317                                                   &proc.files[0].by_vars));
318             }
319           free (by_vars);
320
321           if (!ok)
322             goto error;
323         }
324       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
325         {
326           if (first_name != NULL)
327             {
328               lex_sbc_only_once ("FIRST");
329               goto error;
330             }
331
332           lex_match (lexer, T_EQUALS);
333           if (!lex_force_id (lexer))
334             goto error;
335           first_name = xstrdup (lex_tokcstr (lexer));
336           lex_get (lexer);
337         }
338       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
339         {
340           if (last_name != NULL)
341             {
342               lex_sbc_only_once ("LAST");
343               goto error;
344             }
345
346           lex_match (lexer, T_EQUALS);
347           if (!lex_force_id (lexer))
348             goto error;
349           last_name = xstrdup (lex_tokcstr (lexer));
350           lex_get (lexer);
351         }
352       else if (lex_match_id (lexer, "MAP"))
353         {
354           /* FIXME. */
355         }
356       else if (lex_match_id (lexer, "DROP"))
357         {
358           if (!parse_dict_drop (lexer, proc.dict))
359             goto error;
360         }
361       else if (lex_match_id (lexer, "KEEP"))
362         {
363           if (!parse_dict_keep (lexer, proc.dict))
364             goto error;
365         }
366       else
367         {
368           lex_error (lexer, NULL);
369           goto error;
370         }
371
372       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
373         {
374           lex_end_of_command (lexer);
375           goto error;
376         }
377     }
378
379   if (!saw_by)
380     {
381       if (command == COMB_UPDATE)
382         {
383           lex_sbc_missing ("BY");
384           goto error;
385         }
386       if (n_tables)
387         {
388           msg (SE, _("BY is required when %s is specified."), "TABLE");
389           goto error;
390         }
391       if (saw_sort)
392         {
393           msg (SE, _("BY is required when %s is specified."), "SORT");
394           goto error;
395         }
396     }
397
398   /* Add IN, FIRST, and LAST variables to master dictionary. */
399   for (i = 0; i < proc.n_files; i++)
400     {
401       struct comb_file *file = &proc.files[i];
402       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
403         goto error;
404     }
405   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
406       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
407     goto error;
408
409   dict_delete_scratch_vars (proc.dict);
410   dict_compact_values (proc.dict);
411
412   /* Set up mapping from each file's variables to master
413      variables. */
414   for (i = 0; i < proc.n_files; i++)
415     {
416       struct comb_file *file = &proc.files[i];
417       size_t src_n_vars = dict_get_n_vars (file->dict);
418       size_t j;
419
420       file->mv = xnmalloc (src_n_vars, sizeof *file->mv);
421       for (j = 0; j < src_n_vars; j++)
422         {
423           struct variable *src_var = dict_get_var (file->dict, j);
424           struct variable *dst_var = dict_lookup_var (proc.dict,
425                                                       var_get_name (src_var));
426           if (dst_var != NULL)
427             {
428               size_t n = subcase_get_n_fields (&file->src);
429               file->mv[n] = var_get_missing_values (src_var);
430               subcase_add_var (&file->src, src_var, SC_ASCEND);
431               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
432             }
433         }
434     }
435
436   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
437   taint = taint_clone (casewriter_get_taint (proc.output));
438
439   /* Set up case matcher. */
440   proc.matcher = case_matcher_create ();
441   for (i = 0; i < proc.n_files; i++)
442     {
443       struct comb_file *file = &proc.files[i];
444       if (file->reader == NULL)
445         {
446           if (active_file == NULL)
447             {
448               proc_discard_output (ds);
449               file->reader = active_file = proc_open_filtering (ds, false);
450             }
451           else
452             file->reader = casereader_clone (active_file);
453         }
454       if (!file->is_sorted)
455         file->reader = sort_execute (file->reader, &file->by_vars);
456       taint_propagate (casereader_get_taint (file->reader), taint);
457       file->data = casereader_read (file->reader);
458       if (file->type == COMB_FILE)
459         case_matcher_add_input (proc.matcher, &file->by_vars,
460                                 &file->data, &file->is_minimal);
461     }
462
463   if (command == COMB_ADD)
464     execute_add_files (&proc);
465   else if (command == COMB_MATCH)
466     execute_match_files (&proc);
467   else if (command == COMB_UPDATE)
468     execute_update (&proc);
469   else
470     NOT_REACHED ();
471
472   case_matcher_destroy (proc.matcher);
473   proc.matcher = NULL;
474   close_all_comb_files (&proc);
475   if (active_file != NULL)
476     proc_commit (ds);
477
478   dataset_set_dict (ds, proc.dict);
479   dataset_set_source (ds, casewriter_make_reader (proc.output));
480   proc.dict = NULL;
481   proc.output = NULL;
482
483   free_comb_proc (&proc);
484
485   free (first_name);
486   free (last_name);
487
488   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
489
490  error:
491   if (active_file != NULL)
492     proc_commit (ds);
493   free_comb_proc (&proc);
494   taint_destroy (taint);
495   free (first_name);
496   free (last_name);
497   return CMD_CASCADING_FAILURE;
498 }
499
500 /* Merge the dictionary for file F into master dictionary M. */
501 static bool
502 merge_dictionary (struct dictionary *const m, struct comb_file *f)
503 {
504   struct dictionary *d = f->dict;
505   const struct string_array *d_docs, *m_docs;
506   int i;
507
508   if (dict_get_label (m) == NULL)
509     dict_set_label (m, dict_get_label (d));
510
511   d_docs = dict_get_documents (d);
512   m_docs = dict_get_documents (m);
513
514
515   /* FIXME: If the input files have different encodings, then
516      the result is undefined.
517      The correct thing to do would be to convert to an encoding
518      which can cope with all the input files (eg UTF-8).
519    */
520   if (0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
521     msg (MW, _("Combining files with incompatible encodings. String data may "
522                "not be represented correctly."));
523
524   if (d_docs != NULL)
525     {
526       if (m_docs == NULL)
527         dict_set_documents (m, d_docs);
528       else
529         {
530           struct string_array new_docs;
531           size_t i;
532
533           new_docs.n = m_docs->n + d_docs->n;
534           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
535           for (i = 0; i < m_docs->n; i++)
536             new_docs.strings[i] = m_docs->strings[i];
537           for (i = 0; i < d_docs->n; i++)
538             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
539
540           dict_set_documents (m, &new_docs);
541
542           free (new_docs.strings);
543         }
544     }
545
546   for (i = 0; i < dict_get_n_vars (d); i++)
547     {
548       struct variable *dv = dict_get_var (d, i);
549       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
550
551       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
552         continue;
553
554       if (mv != NULL)
555         {
556           if (var_get_width (mv) != var_get_width (dv))
557             {
558               const char *var_name = var_get_name (dv);
559               struct string s = DS_EMPTY_INITIALIZER;
560               const char *file_name;
561
562               file_name = f->handle ? fh_get_name (f->handle) : "*";
563               ds_put_format (&s,
564                              _("Variable %s in file %s has different "
565                                "type or width from the same variable in "
566                                "earlier file."),
567                              var_name, file_name);
568               ds_put_cstr (&s, "  ");
569               if (var_is_numeric (dv))
570                 ds_put_format (&s, _("In file %s, %s is numeric."),
571                                file_name, var_name);
572               else
573                 ds_put_format (&s, _("In file %s, %s is a string variable "
574                                      "with width %d."),
575                                file_name, var_name, var_get_width (dv));
576               ds_put_cstr (&s, "  ");
577               if (var_is_numeric (mv))
578                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
579                                var_name);
580               else
581                 ds_put_format (&s, _("In an earlier file, %s was a string "
582                                      "variable with width %d."),
583                                var_name, var_get_width (mv));
584               msg (SE, "%s", ds_cstr (&s));
585               ds_destroy (&s);
586               return false;
587             }
588
589           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
590             var_set_value_labels (mv, var_get_value_labels (dv));
591           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
592             var_set_missing_values (mv, var_get_missing_values (dv));
593           if (var_get_label (dv) && !var_get_label (mv))
594             var_set_label (mv, var_get_label (dv));
595         }
596       else
597         mv = dict_clone_var_assert (m, dv);
598     }
599
600   return true;
601 }
602
603 /* If VAR_NAME is non-NULL, attempts to create a
604    variable named VAR_NAME, with format F1.0, in DICT, and stores
605    a pointer to the variable in *VAR.  Returns true if
606    successful, false if the variable name is a duplicate (in
607    which case a message saying that the variable specified on the
608    given SUBCOMMAND is a duplicate is emitted).
609
610    Does nothing and returns true if VAR_NAME is null. */
611 static bool
612 create_flag_var (const char *subcommand, const char *var_name,
613                  struct dictionary *dict, struct variable **var)
614 {
615   if (var_name != NULL)
616     {
617       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
618       *var = dict_create_var (dict, var_name, 0);
619       if (*var == NULL)
620         {
621           msg (SE, _("Variable name %s specified on %s subcommand "
622                      "duplicates an existing variable name."),
623                subcommand, var_name);
624           return false;
625         }
626       var_set_both_formats (*var, &format);
627     }
628   else
629     *var = NULL;
630   return true;
631 }
632
633 /* Closes all the files in PROC and frees their associated data. */
634 static void
635 close_all_comb_files (struct comb_proc *proc)
636 {
637   size_t i;
638
639   for (i = 0; i < proc->n_files; i++)
640     {
641       struct comb_file *file = &proc->files[i];
642       subcase_uninit (&file->by_vars);
643       subcase_uninit (&file->src);
644       subcase_uninit (&file->dst);
645       free (file->mv);
646       fh_unref (file->handle);
647       dict_unref (file->dict);
648       casereader_destroy (file->reader);
649       case_unref (file->data);
650       free (file->in_name);
651     }
652   free (proc->files);
653   proc->files = NULL;
654   proc->n_files = 0;
655 }
656
657 /* Frees all the data for the procedure. */
658 static void
659 free_comb_proc (struct comb_proc *proc)
660 {
661   close_all_comb_files (proc);
662   dict_unref (proc->dict);
663   casewriter_destroy (proc->output);
664   case_matcher_destroy (proc->matcher);
665   if (proc->prev_BY)
666     {
667       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
668                                 proc->prev_BY);
669       free (proc->prev_BY);
670     }
671   subcase_uninit (&proc->by_vars);
672   case_unref (proc->buffered_case);
673 }
674 \f
675 static bool scan_table (struct comb_file *, union value by[]);
676 static struct ccase *create_output_case (const struct comb_proc *);
677 static void apply_case (const struct comb_file *, struct ccase *);
678 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
679 static void advance_file (struct comb_file *, union value by[]);
680 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
681 static void output_buffered_case (struct comb_proc *);
682
683 /* Executes the ADD FILES command. */
684 static void
685 execute_add_files (struct comb_proc *proc)
686 {
687   union value *by;
688
689   while (case_matcher_match (proc->matcher, &by))
690     {
691       size_t i;
692
693       for (i = 0; i < proc->n_files; i++)
694         {
695           struct comb_file *file = &proc->files[i];
696           while (file->is_minimal)
697             {
698               struct ccase *output = create_output_case (proc);
699               apply_case (file, output);
700               advance_file (file, by);
701               output_case (proc, output, by);
702             }
703         }
704     }
705   output_buffered_case (proc);
706 }
707
708 /* Executes the MATCH FILES command. */
709 static void
710 execute_match_files (struct comb_proc *proc)
711 {
712   union value *by;
713
714   while (case_matcher_match (proc->matcher, &by))
715     {
716       struct ccase *output;
717       size_t i;
718
719       output = create_output_case (proc);
720       for (i = proc->n_files; i-- > 0;)
721         {
722           struct comb_file *file = &proc->files[i];
723           if (file->type == COMB_FILE)
724             {
725               if (file->is_minimal)
726                 {
727                   apply_case (file, output);
728                   advance_file (file, NULL);
729                 }
730             }
731           else
732             {
733               if (scan_table (file, by))
734                 apply_case (file, output);
735             }
736         }
737       output_case (proc, output, by);
738     }
739   output_buffered_case (proc);
740 }
741
742 /* Executes the UPDATE command. */
743 static void
744 execute_update (struct comb_proc *proc)
745 {
746   union value *by;
747   size_t n_duplicates = 0;
748
749   while (case_matcher_match (proc->matcher, &by))
750     {
751       struct comb_file *first, *file;
752       struct ccase *output;
753
754       /* Find first nonnull case in array and make an output case
755          from it. */
756       output = create_output_case (proc);
757       for (first = &proc->files[0]; ; first++)
758         if (first->is_minimal)
759           break;
760       apply_case (first, output);
761       advance_file (first, by);
762
763       /* Read additional cases and update the output case from
764          them.  (Don't update the output case from any duplicate
765          cases in the master file.) */
766       for (file = first + (first == proc->files);
767            file < &proc->files[proc->n_files]; file++)
768         {
769           while (file->is_minimal)
770             {
771               apply_nonmissing_case (file, output);
772               advance_file (file, by);
773             }
774         }
775       casewriter_write (proc->output, output);
776
777       /* Write duplicate cases in the master file directly to the
778          output.  */
779       if (first == proc->files && first->is_minimal)
780         {
781           n_duplicates++;
782           while (first->is_minimal)
783             {
784               output = create_output_case (proc);
785               apply_case (first, output);
786               advance_file (first, by);
787               casewriter_write (proc->output, output);
788             }
789         }
790     }
791
792   if (n_duplicates)
793     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
794          n_duplicates);
795 }
796
797 /* Reads FILE, which must be of type COMB_TABLE, until it
798    encounters a case with BY or greater for its BY variables.
799    Returns true if a case with exactly BY for its BY variables
800    was found, otherwise false. */
801 static bool
802 scan_table (struct comb_file *file, union value by[])
803 {
804   while (file->data != NULL)
805     {
806       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
807       if (cmp > 0)
808         {
809           case_unref (file->data);
810           file->data = casereader_read (file->reader);
811         }
812       else
813         return cmp == 0;
814     }
815   return false;
816 }
817
818 /* Creates and returns an output case for PROC, initializing each
819    of its values to system-missing or blanks, except that the
820    values of IN variables are set to 0. */
821 static struct ccase *
822 create_output_case (const struct comb_proc *proc)
823 {
824   size_t n_vars = dict_get_n_vars (proc->dict);
825   struct ccase *output;
826   size_t i;
827
828   output = case_create (dict_get_proto (proc->dict));
829   for (i = 0; i < n_vars; i++)
830     {
831       struct variable *v = dict_get_var (proc->dict, i);
832       value_set_missing (case_data_rw (output, v), var_get_width (v));
833     }
834   for (i = 0; i < proc->n_files; i++)
835     {
836       struct comb_file *file = &proc->files[i];
837       if (file->in_var != NULL)
838         *case_num_rw (output, file->in_var) = false;
839     }
840   return output;
841 }
842
843 static void
844 mark_file_used (const struct comb_file *file, struct ccase *output)
845 {
846   if (file->in_var != NULL)
847     *case_num_rw (output, file->in_var) = true;
848 }
849
850 /* Copies the data from FILE's case into output case OUTPUT.
851    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
852 static void
853 apply_case (const struct comb_file *file, struct ccase *output)
854 {
855   subcase_copy (&file->src, file->data, &file->dst, output);
856   mark_file_used (file, output);
857 }
858
859 /* Copies the data from FILE's case into output case OUTPUT,
860    skipping values that are missing or all spaces.
861
862    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
863 static void
864 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
865 {
866   size_t i;
867
868   for (i = 0; i < subcase_get_n_fields (&file->src); i++)
869     {
870       const struct subcase_field *src_field = &file->src.fields[i];
871       const struct subcase_field *dst_field = &file->dst.fields[i];
872       const union value *src_value
873         = case_data_idx (file->data, src_field->case_index);
874       int width = src_field->width;
875
876       if (!mv_is_value_missing (file->mv[i], src_value)
877           && !(width > 0 && value_is_spaces (src_value, width)))
878         value_copy (case_data_rw_idx (output, dst_field->case_index),
879                     src_value, width);
880     }
881   mark_file_used (file, output);
882 }
883
884 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
885    member is updated based on whether the new case's BY values still match
886    those in BY. */
887 static void
888 advance_file (struct comb_file *file, union value by[])
889 {
890   case_unref (file->data);
891   file->data = casereader_read (file->reader);
892   if (by)
893     file->is_minimal = (file->data != NULL
894                         && subcase_equal_cx (&file->by_vars, file->data, by));
895 }
896
897 /* Writes OUTPUT, whose BY values has been extracted into BY, to
898    PROC's output file, first initializing any FIRST or LAST
899    variables in OUTPUT to the correct values. */
900 static void
901 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
902 {
903   if (proc->first == NULL && proc->last == NULL)
904     casewriter_write (proc->output, output);
905   else
906     {
907       /* It's harder with LAST, because we can't know whether
908          this case is the last in a group until we've prepared
909          the *next* case also.  Thus, we buffer the previous
910          output case until the next one is ready. */
911       bool new_BY;
912       if (proc->prev_BY != NULL)
913         {
914           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
915           if (proc->last != NULL)
916             *case_num_rw (proc->buffered_case, proc->last) = new_BY;
917           casewriter_write (proc->output, proc->buffered_case);
918         }
919       else
920         new_BY = true;
921
922       proc->buffered_case = output;
923       if (proc->first != NULL)
924         *case_num_rw (proc->buffered_case, proc->first) = new_BY;
925
926       if (new_BY)
927         {
928           size_t n_values = subcase_get_n_fields (&proc->by_vars);
929           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
930           if (proc->prev_BY == NULL)
931             {
932               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
933               caseproto_init_values (proto, proc->prev_BY);
934             }
935           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
936                           proc->prev_BY, by);
937         }
938     }
939 }
940
941 /* Writes a trailing buffered case to the output, if FIRST or
942    LAST is in use. */
943 static void
944 output_buffered_case (struct comb_proc *proc)
945 {
946   if (proc->prev_BY != NULL)
947     {
948       if (proc->last != NULL)
949         *case_num_rw (proc->buffered_case, proc->last) = 1.0;
950       casewriter_write (proc->output, proc->buffered_case);
951       proc->buffered_case = NULL;
952     }
953 }