dictionary: Always compact immediately upon deletion of a variable.
[pspp] / src / language / commands / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/commands/file-handle.h"
33 #include "language/commands/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/commands/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 enum comb_command_type
51   {
52     COMB_ADD,
53     COMB_MATCH,
54     COMB_UPDATE
55   };
56
57 /* File types. */
58 enum comb_file_type
59   {
60     COMB_FILE,                  /* Specified on FILE= subcommand. */
61     COMB_TABLE                  /* Specified on TABLE= subcommand. */
62   };
63
64 /* One FILE or TABLE subcommand. */
65 struct comb_file
66   {
67     /* Basics. */
68     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
69     int start_ofs, end_ofs;     /* Lexer offsets. */
70
71     /* Variables. */
72     struct subcase by_vars;     /* BY variables in this input file. */
73     struct subcase src, dst;    /* Data to copy to output; where to put it. */
74     const struct missing_values **mv; /* Each variable's missing values. */
75
76     /* Input files. */
77     struct file_handle *handle; /* Input file handle. */
78     struct dictionary *dict;    /* Input file dictionary. */
79     struct casereader *reader;  /* Input data source. */
80     struct ccase *data;         /* The current input case. */
81     bool is_minimal;            /* Does 'data' have minimum BY values across
82                                    all input files? */
83     bool is_sorted;             /* Is file presorted on the BY variables? */
84
85     /* IN subcommand. */
86     char *in_name;
87     int in_ofs;
88     struct variable *in_var;
89   };
90
91 struct comb_proc
92   {
93     struct comb_file *files;    /* All the files being merged. */
94     size_t n_files;             /* Number of files. */
95
96     struct dictionary *dict;    /* Dictionary of output file. */
97     struct subcase by_vars;     /* BY variables in the output. */
98     struct casewriter *output;  /* Destination for output. */
99
100     size_t *var_sources;
101     size_t n_var_sources, allocated_var_sources;
102
103     struct case_matcher *matcher;
104
105     /* FIRST, LAST.
106        Only if "first" or "last" is nonnull are the remaining
107        members used. */
108     struct variable *first;     /* Variable specified on FIRST (if any). */
109     struct variable *last;      /* Variable specified on LAST (if any). */
110     struct ccase *buffered_case; /* Case ready for output except that we don't
111                                     know the value for the LAST var yet. */
112     union value *prev_BY;       /* Values of BY vars in buffered_case. */
113   };
114
115 static int combine_files (enum comb_command_type, struct lexer *,
116                           struct dataset *);
117 static void free_comb_proc (struct comb_proc *);
118
119 static void close_all_comb_files (struct comb_proc *);
120 static bool merge_dictionary (struct comb_proc *, struct lexer *,
121                               struct comb_file *);
122
123 static void execute_update (struct comb_proc *);
124 static void execute_match_files (struct comb_proc *);
125 static void execute_add_files (struct comb_proc *);
126
127 static bool create_flag_var (struct lexer *lexer, const char *subcommand_name,
128                              const char *var_name, int var_ofs,
129                              struct dictionary *, struct variable **);
130 static void output_case (struct comb_proc *, struct ccase *, union value *by);
131 static void output_buffered_case (struct comb_proc *);
132
133 int
134 cmd_add_files (struct lexer *lexer, struct dataset *ds)
135 {
136   return combine_files (COMB_ADD, lexer, ds);
137 }
138
139 int
140 cmd_match_files (struct lexer *lexer, struct dataset *ds)
141 {
142   return combine_files (COMB_MATCH, lexer, ds);
143 }
144
145 int
146 cmd_update (struct lexer *lexer, struct dataset *ds)
147 {
148   return combine_files (COMB_UPDATE, lexer, ds);
149 }
150
151 static int
152 combine_files (enum comb_command_type command,
153                struct lexer *lexer, struct dataset *ds)
154 {
155   struct comb_proc proc = {
156     .dict = dict_create (get_default_encoding ()),
157   };
158
159   bool saw_by = false;
160   bool saw_sort = false;
161   struct casereader *active_file = NULL;
162
163   char *first_name = NULL;
164   int first_ofs = 0;
165   char *last_name = NULL;
166   int last_ofs = 0;
167
168   struct taint *taint = NULL;
169
170   size_t table_idx = SIZE_MAX;
171   int sort_ofs = INT_MAX;
172   size_t allocated_files = 0;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       int start_ofs = lex_ofs (lexer);
180       enum comb_file_type type;
181       if (lex_match_id (lexer, "FILE"))
182         type = COMB_FILE;
183       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
184         {
185           type = COMB_TABLE;
186           table_idx = MIN (table_idx, proc.n_files);
187         }
188       else
189         break;
190       lex_match (lexer, T_EQUALS);
191
192       if (proc.n_files >= allocated_files)
193         proc.files = x2nrealloc (proc.files, &allocated_files,
194                                 sizeof *proc.files);
195       struct comb_file *file = &proc.files[proc.n_files++];
196       *file = (struct comb_file) {
197         .type = type,
198         .start_ofs = start_ofs,
199         .is_sorted = true,
200       };
201
202       if (lex_match (lexer, T_ASTERISK))
203         {
204           if (!dataset_has_source (ds))
205             {
206               lex_next_error (lexer, -1, -1,
207                               _("Cannot specify the active dataset since none "
208                                 "has been defined."));
209               goto error;
210             }
211
212           if (proc_make_temporary_transformations_permanent (ds))
213             lex_next_error (lexer, -1, -1,
214                             _("This command may not be used after TEMPORARY "
215                               "when the active dataset is an input source.  "
216                               "Temporary transformations will be made "
217                               "permanent."));
218
219           file->dict = dict_clone (dataset_dict (ds));
220         }
221       else
222         {
223           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
224           if (file->handle == NULL)
225             goto error;
226
227           file->reader = any_reader_open_and_decode (file->handle, NULL,
228                                                      &file->dict, NULL);
229           if (file->reader == NULL)
230             goto error;
231         }
232       file->end_ofs = lex_ofs (lexer) - 1;
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (!lex_force_id (lexer))
244               goto error;
245
246             if (file->in_name)
247               {
248                 lex_error (lexer, _("Multiple IN subcommands for a single FILE "
249                                     "or TABLE."));
250                 goto error;
251               }
252             file->in_name = xstrdup (lex_tokcstr (lexer));
253             file->in_ofs = lex_ofs (lexer);
254             lex_get (lexer);
255           }
256         else if (lex_match_id (lexer, "SORT"))
257           {
258             file->is_sorted = false;
259             saw_sort = true;
260             sort_ofs = MIN (sort_ofs, lex_ofs (lexer) - 1);
261           }
262
263       if (!merge_dictionary (&proc, lexer, file))
264         goto error;
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           if (saw_by)
272             {
273               lex_sbc_only_once (lexer, "BY");
274               goto error;
275             }
276           saw_by = true;
277
278           lex_match (lexer, T_EQUALS);
279
280           const struct variable **by_vars;
281           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
282                                     &by_vars, NULL))
283             goto error;
284
285           bool ok = true;
286           for (size_t i = 0; i < proc.n_files; i++)
287             {
288               struct comb_file *file = &proc.files[i];
289               for (size_t j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
290                 {
291                   const char *name = var_get_name (by_vars[j]);
292                   struct variable *var = dict_lookup_var (file->dict, name);
293                   if (var != NULL)
294                     subcase_add_var (&file->by_vars, var,
295                                      subcase_get_direction (&proc.by_vars, j));
296                   else
297                     {
298                       const char *fn
299                         = file->handle ? fh_get_name (file->handle) : "*";
300                       lex_ofs_error (lexer, file->start_ofs, file->end_ofs,
301                                      _("File %s lacks BY variable %s."),
302                                      fn, name);
303                       ok = false;
304                     }
305                 }
306               assert (!ok || subcase_conformable (&file->by_vars,
307                                                   &proc.files[0].by_vars));
308             }
309           free (by_vars);
310
311           if (!ok)
312             goto error;
313         }
314       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
315         {
316           if (first_name != NULL)
317             {
318               lex_sbc_only_once (lexer, "FIRST");
319               goto error;
320             }
321
322           lex_match (lexer, T_EQUALS);
323           if (!lex_force_id (lexer))
324             goto error;
325           first_name = xstrdup (lex_tokcstr (lexer));
326           first_ofs = lex_ofs (lexer);
327           lex_get (lexer);
328         }
329       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
330         {
331           if (last_name != NULL)
332             {
333               lex_sbc_only_once (lexer, "LAST");
334               goto error;
335             }
336
337           lex_match (lexer, T_EQUALS);
338           if (!lex_force_id (lexer))
339             goto error;
340           last_name = xstrdup (lex_tokcstr (lexer));
341           last_ofs = lex_ofs (lexer);
342           lex_get (lexer);
343         }
344       else if (lex_match_id (lexer, "MAP"))
345         {
346           /* FIXME. */
347         }
348       else if (lex_match_id (lexer, "DROP"))
349         {
350           if (!parse_dict_drop (lexer, proc.dict))
351             goto error;
352         }
353       else if (lex_match_id (lexer, "KEEP"))
354         {
355           if (!parse_dict_keep (lexer, proc.dict))
356             goto error;
357         }
358       else
359         {
360           if (command == COMB_UPDATE)
361             lex_error_expecting (lexer, "BY", "MAP", "DROP", "KEEP");
362           else
363             lex_error_expecting (lexer, "BY", "FIRST", "LAST",
364                                  "MAP", "DROP", "KEEP");
365           goto error;
366         }
367
368       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
369         {
370           lex_end_of_command (lexer);
371           goto error;
372         }
373     }
374
375   if (!saw_by)
376     {
377       if (command == COMB_UPDATE)
378         {
379           lex_sbc_missing (lexer, "BY");
380           goto error;
381         }
382       if (table_idx != SIZE_MAX)
383         {
384           const struct comb_file *table = &proc.files[table_idx];
385           lex_ofs_error (lexer, table->start_ofs, table->end_ofs,
386                          _("BY is required when %s is specified."), "TABLE");
387           goto error;
388         }
389       if (saw_sort)
390         {
391           lex_ofs_error (lexer, sort_ofs, sort_ofs,
392                          _("BY is required when %s is specified."), "SORT");
393           goto error;
394         }
395     }
396
397   /* Add IN, FIRST, and LAST variables to master dictionary. */
398   for (size_t i = 0; i < proc.n_files; i++)
399     {
400       struct comb_file *file = &proc.files[i];
401       if (!create_flag_var (lexer, "IN", file->in_name, file->in_ofs,
402                             proc.dict, &file->in_var))
403         goto error;
404     }
405   if (!create_flag_var (lexer, "FIRST", first_name, first_ofs, proc.dict, &proc.first)
406       || !create_flag_var (lexer, "LAST", last_name, last_ofs, proc.dict, &proc.last))
407     goto error;
408
409   dict_delete_scratch_vars (proc.dict);
410
411   /* Set up mapping from each file's variables to master
412      variables. */
413   for (size_t i = 0; i < proc.n_files; i++)
414     {
415       struct comb_file *file = &proc.files[i];
416       size_t src_n_vars = dict_get_n_vars (file->dict);
417
418       file->mv = xnmalloc (src_n_vars, sizeof *file->mv);
419       for (size_t j = 0; j < src_n_vars; j++)
420         {
421           struct variable *src_var = dict_get_var (file->dict, j);
422           struct variable *dst_var = dict_lookup_var (proc.dict,
423                                                       var_get_name (src_var));
424           if (dst_var != NULL)
425             {
426               size_t n = subcase_get_n_fields (&file->src);
427               file->mv[n] = var_get_missing_values (src_var);
428               subcase_add_var (&file->src, src_var, SC_ASCEND);
429               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
430             }
431         }
432     }
433
434   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
435   taint = taint_clone (casewriter_get_taint (proc.output));
436
437   /* Set up case matcher. */
438   proc.matcher = case_matcher_create ();
439   for (size_t i = 0; i < proc.n_files; i++)
440     {
441       struct comb_file *file = &proc.files[i];
442       if (file->reader == NULL)
443         {
444           if (active_file == NULL)
445             {
446               proc_discard_output (ds);
447               file->reader = active_file = proc_open_filtering (ds, false);
448             }
449           else
450             file->reader = casereader_clone (active_file);
451         }
452       if (!file->is_sorted)
453         file->reader = sort_execute (file->reader, &file->by_vars);
454       taint_propagate (casereader_get_taint (file->reader), taint);
455       file->data = casereader_read (file->reader);
456       if (file->type == COMB_FILE)
457         case_matcher_add_input (proc.matcher, &file->by_vars,
458                                 &file->data, &file->is_minimal);
459     }
460
461   if (command == COMB_ADD)
462     execute_add_files (&proc);
463   else if (command == COMB_MATCH)
464     execute_match_files (&proc);
465   else if (command == COMB_UPDATE)
466     execute_update (&proc);
467   else
468     NOT_REACHED ();
469
470   case_matcher_destroy (proc.matcher);
471   proc.matcher = NULL;
472   close_all_comb_files (&proc);
473   if (active_file != NULL)
474     proc_commit (ds);
475
476   dataset_set_dict (ds, proc.dict);
477   dataset_set_source (ds, casewriter_make_reader (proc.output));
478   proc.dict = NULL;
479   proc.output = NULL;
480
481   free_comb_proc (&proc);
482
483   free (first_name);
484   free (last_name);
485
486   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
487
488  error:
489   if (active_file != NULL)
490     proc_commit (ds);
491   free_comb_proc (&proc);
492   taint_destroy (taint);
493   free (first_name);
494   free (last_name);
495   return CMD_CASCADING_FAILURE;
496 }
497
498 /* Merge the dictionary for file F into master dictionary for PROC. */
499 static bool
500 merge_dictionary (struct comb_proc *proc, struct lexer *lexer,
501                   struct comb_file *f)
502 {
503   struct dictionary *m = proc->dict;
504   struct dictionary *d = f->dict;
505
506   if (dict_get_label (m) == NULL)
507     dict_set_label (m, dict_get_label (d));
508
509   /* FIXME: If the input files have different encodings, then
510      the result is undefined.
511      The correct thing to do would be to convert to an encoding
512      which can cope with all the input files (eg UTF-8).
513    */
514   if (strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
515     msg (MW, _("Combining files with incompatible encodings. String data may "
516                "not be represented correctly."));
517
518   const struct string_array *d_docs = dict_get_documents (d);
519   const struct string_array *m_docs = dict_get_documents (m);
520   if (d_docs)
521     {
522       if (!m_docs)
523         dict_set_documents (m, d_docs);
524       else
525         {
526           size_t n = m_docs->n + d_docs->n;
527           struct string_array new_docs = {
528             .strings = xmalloc (n * sizeof *new_docs.strings),
529           };
530           for (size_t i = 0; i < m_docs->n; i++)
531             new_docs.strings[new_docs.n++] = m_docs->strings[i];
532           for (size_t i = 0; i < d_docs->n; i++)
533             new_docs.strings[new_docs.n++] = d_docs->strings[i];
534
535           dict_set_documents (m, &new_docs);
536
537           free (new_docs.strings);
538         }
539     }
540
541   for (size_t i = 0; i < dict_get_n_vars (d); i++)
542     {
543       struct variable *dv = dict_get_var (d, i);
544       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
545
546       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
547         continue;
548
549       if (!mv)
550         {
551           mv = dict_clone_var_assert (m, dv);
552           if (proc->n_var_sources >= proc->allocated_var_sources)
553             proc->var_sources = x2nrealloc (proc->var_sources,
554                                             &proc->allocated_var_sources,
555                                             sizeof *proc->var_sources);
556           proc->var_sources[proc->n_var_sources++] = f - proc->files;
557         }
558       else
559         {
560           if (var_get_width (mv) != var_get_width (dv))
561             {
562               const char *var_name = var_get_name (dv);
563               msg (SE, _("Variable %s has different type or width in different "
564                          "files."), var_name);
565
566               for (size_t j = 0; j < 2; j++)
567                 {
568                   const struct variable *ev = !j ? mv : dv;
569                   const struct comb_file *ef
570                     = !j ? &proc->files[proc->var_sources[var_get_dict_index (mv)]] : f;
571                   const char *fn = ef->handle ? fh_get_name (ef->handle) : "*";
572
573                   if (var_is_numeric (ev))
574                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
575                                  _("In file %s, %s is numeric."),
576                                  fn, var_name);
577                   else
578                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
579                                  _("In file %s, %s is a string with width %d."),
580                                  fn, var_name, var_get_width (ev));
581                 }
582
583               return false;
584             }
585
586           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
587             var_set_value_labels (mv, var_get_value_labels (dv));
588           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
589             var_set_missing_values (mv, var_get_missing_values (dv));
590           if (var_get_label (dv) && !var_get_label (mv))
591             var_set_label (mv, var_get_label (dv));
592         }
593     }
594
595   return true;
596 }
597
598 /* If VAR_NAME is non-NULL, attempts to create a
599    variable named VAR_NAME, with format F1.0, in DICT, and stores
600    a pointer to the variable in *VAR.  Returns true if
601    successful, false if the variable name is a duplicate (in
602    which case a message saying that the variable specified on the
603    given SUBCOMMAND is a duplicate is emitted).
604
605    Does nothing and returns true if VAR_NAME is null. */
606 static bool
607 create_flag_var (struct lexer *lexer, const char *subcommand,
608                  const char *var_name, int var_ofs,
609                  struct dictionary *dict, struct variable **var)
610 {
611   if (var_name != NULL)
612     {
613       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
614       *var = dict_create_var (dict, var_name, 0);
615       if (*var == NULL)
616         {
617           lex_ofs_error (lexer, var_ofs, var_ofs,
618                          _("Variable name %s specified on %s subcommand "
619                            "duplicates an existing variable name."),
620                          var_name, subcommand);
621           return false;
622         }
623       var_set_both_formats (*var, format);
624     }
625   else
626     *var = NULL;
627   return true;
628 }
629
630 /* Closes all the files in PROC and frees their associated data. */
631 static void
632 close_all_comb_files (struct comb_proc *proc)
633 {
634   for (size_t i = 0; i < proc->n_files; i++)
635     {
636       struct comb_file *file = &proc->files[i];
637       subcase_uninit (&file->by_vars);
638       subcase_uninit (&file->src);
639       subcase_uninit (&file->dst);
640       free (file->mv);
641       fh_unref (file->handle);
642       dict_unref (file->dict);
643       casereader_destroy (file->reader);
644       case_unref (file->data);
645       free (file->in_name);
646     }
647   free (proc->files);
648   proc->files = NULL;
649   proc->n_files = 0;
650 }
651
652 /* Frees all the data for the procedure. */
653 static void
654 free_comb_proc (struct comb_proc *proc)
655 {
656   close_all_comb_files (proc);
657   dict_unref (proc->dict);
658   casewriter_destroy (proc->output);
659   case_matcher_destroy (proc->matcher);
660   if (proc->prev_BY)
661     {
662       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
663                                 proc->prev_BY);
664       free (proc->prev_BY);
665     }
666   subcase_uninit (&proc->by_vars);
667   case_unref (proc->buffered_case);
668   free (proc->var_sources);
669 }
670 \f
671 static bool scan_table (struct comb_file *, union value by[]);
672 static struct ccase *create_output_case (const struct comb_proc *);
673 static void apply_case (const struct comb_file *, struct ccase *);
674 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
675 static void advance_file (struct comb_file *, union value by[]);
676 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
677 static void output_buffered_case (struct comb_proc *);
678
679 /* Executes the ADD FILES command. */
680 static void
681 execute_add_files (struct comb_proc *proc)
682 {
683   union value *by;
684
685   while (case_matcher_match (proc->matcher, &by))
686     for (size_t i = 0; i < proc->n_files; i++)
687       {
688         struct comb_file *file = &proc->files[i];
689         while (file->is_minimal)
690           {
691             struct ccase *output = create_output_case (proc);
692             apply_case (file, output);
693             advance_file (file, by);
694             output_case (proc, output, by);
695           }
696       }
697   output_buffered_case (proc);
698 }
699
700 /* Executes the MATCH FILES command. */
701 static void
702 execute_match_files (struct comb_proc *proc)
703 {
704   union value *by;
705
706   while (case_matcher_match (proc->matcher, &by))
707     {
708       struct ccase *output = create_output_case (proc);
709       for (size_t i = proc->n_files; i-- > 0;)
710         {
711           struct comb_file *file = &proc->files[i];
712           if (file->type == COMB_FILE)
713             {
714               if (file->is_minimal)
715                 {
716                   apply_case (file, output);
717                   advance_file (file, NULL);
718                 }
719             }
720           else
721             {
722               if (scan_table (file, by))
723                 apply_case (file, output);
724             }
725         }
726       output_case (proc, output, by);
727     }
728   output_buffered_case (proc);
729 }
730
731 /* Executes the UPDATE command. */
732 static void
733 execute_update (struct comb_proc *proc)
734 {
735   union value *by;
736   size_t n_duplicates = 0;
737
738   while (case_matcher_match (proc->matcher, &by))
739     {
740       struct comb_file *first, *file;
741       struct ccase *output;
742
743       /* Find first nonnull case in array and make an output case
744          from it. */
745       output = create_output_case (proc);
746       for (first = &proc->files[0]; ; first++)
747         if (first->is_minimal)
748           break;
749       apply_case (first, output);
750       advance_file (first, by);
751
752       /* Read additional cases and update the output case from
753          them.  (Don't update the output case from any duplicate
754          cases in the master file.) */
755       for (file = first + (first == proc->files);
756            file < &proc->files[proc->n_files]; file++)
757         {
758           while (file->is_minimal)
759             {
760               apply_nonmissing_case (file, output);
761               advance_file (file, by);
762             }
763         }
764       casewriter_write (proc->output, output);
765
766       /* Write duplicate cases in the master file directly to the
767          output.  */
768       if (first == proc->files && first->is_minimal)
769         {
770           n_duplicates++;
771           while (first->is_minimal)
772             {
773               output = create_output_case (proc);
774               apply_case (first, output);
775               advance_file (first, by);
776               casewriter_write (proc->output, output);
777             }
778         }
779     }
780
781   if (n_duplicates)
782     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
783          n_duplicates);
784 }
785
786 /* Reads FILE, which must be of type COMB_TABLE, until it
787    encounters a case with BY or greater for its BY variables.
788    Returns true if a case with exactly BY for its BY variables
789    was found, otherwise false. */
790 static bool
791 scan_table (struct comb_file *file, union value by[])
792 {
793   while (file->data != NULL)
794     {
795       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
796       if (cmp > 0)
797         {
798           case_unref (file->data);
799           file->data = casereader_read (file->reader);
800         }
801       else
802         return cmp == 0;
803     }
804   return false;
805 }
806
807 /* Creates and returns an output case for PROC, initializing each
808    of its values to system-missing or blanks, except that the
809    values of IN variables are set to 0. */
810 static struct ccase *
811 create_output_case (const struct comb_proc *proc)
812 {
813   size_t n_vars = dict_get_n_vars (proc->dict);
814   struct ccase *output = case_create (dict_get_proto (proc->dict));
815   for (size_t i = 0; i < n_vars; i++)
816     {
817       struct variable *v = dict_get_var (proc->dict, i);
818       value_set_missing (case_data_rw (output, v), var_get_width (v));
819     }
820   for (size_t i = 0; i < proc->n_files; i++)
821     {
822       struct comb_file *file = &proc->files[i];
823       if (file->in_var != NULL)
824         *case_num_rw (output, file->in_var) = false;
825     }
826   return output;
827 }
828
829 static void
830 mark_file_used (const struct comb_file *file, struct ccase *output)
831 {
832   if (file->in_var != NULL)
833     *case_num_rw (output, file->in_var) = true;
834 }
835
836 /* Copies the data from FILE's case into output case OUTPUT.
837    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
838 static void
839 apply_case (const struct comb_file *file, struct ccase *output)
840 {
841   subcase_copy (&file->src, file->data, &file->dst, output);
842   mark_file_used (file, output);
843 }
844
845 /* Copies the data from FILE's case into output case OUTPUT,
846    skipping values that are missing or all spaces.
847
848    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
849 static void
850 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
851 {
852   for (size_t i = 0; i < subcase_get_n_fields (&file->src); i++)
853     {
854       const struct subcase_field *src_field = &file->src.fields[i];
855       const struct subcase_field *dst_field = &file->dst.fields[i];
856       const union value *src_value
857         = case_data_idx (file->data, src_field->case_index);
858       int width = src_field->width;
859
860       if (!mv_is_value_missing (file->mv[i], src_value)
861           && !(width > 0 && value_is_spaces (src_value, width)))
862         value_copy (case_data_rw_idx (output, dst_field->case_index),
863                     src_value, width);
864     }
865   mark_file_used (file, output);
866 }
867
868 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
869    member is updated based on whether the new case's BY values still match
870    those in BY. */
871 static void
872 advance_file (struct comb_file *file, union value by[])
873 {
874   case_unref (file->data);
875   file->data = casereader_read (file->reader);
876   if (by)
877     file->is_minimal = (file->data != NULL
878                         && subcase_equal_cx (&file->by_vars, file->data, by));
879 }
880
881 /* Writes OUTPUT, whose BY values has been extracted into BY, to
882    PROC's output file, first initializing any FIRST or LAST
883    variables in OUTPUT to the correct values. */
884 static void
885 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
886 {
887   if (proc->first == NULL && proc->last == NULL)
888     casewriter_write (proc->output, output);
889   else
890     {
891       /* It's harder with LAST, because we can't know whether
892          this case is the last in a group until we've prepared
893          the *next* case also.  Thus, we buffer the previous
894          output case until the next one is ready. */
895       bool new_BY;
896       if (proc->prev_BY != NULL)
897         {
898           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
899           if (proc->last != NULL)
900             *case_num_rw (proc->buffered_case, proc->last) = new_BY;
901           casewriter_write (proc->output, proc->buffered_case);
902         }
903       else
904         new_BY = true;
905
906       proc->buffered_case = output;
907       if (proc->first != NULL)
908         *case_num_rw (proc->buffered_case, proc->first) = new_BY;
909
910       if (new_BY)
911         {
912           size_t n_values = subcase_get_n_fields (&proc->by_vars);
913           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
914           if (proc->prev_BY == NULL)
915             {
916               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
917               caseproto_init_values (proto, proc->prev_BY);
918             }
919           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
920                           proc->prev_BY, by);
921         }
922     }
923 }
924
925 /* Writes a trailing buffered case to the output, if FIRST or
926    LAST is in use. */
927 static void
928 output_buffered_case (struct comb_proc *proc)
929 {
930   if (proc->prev_BY != NULL)
931     {
932       if (proc->last != NULL)
933         *case_num_rw (proc->buffered_case, proc->last) = 1.0;
934       casewriter_write (proc->output, proc->buffered_case);
935       proc->buffered_case = NULL;
936     }
937 }