dde261a68f5727d5d1b6c9ac95e6c0d64614e37d
[pspp] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 enum comb_command_type
51   {
52     COMB_ADD,
53     COMB_MATCH,
54     COMB_UPDATE
55   };
56
57 /* File types. */
58 enum comb_file_type
59   {
60     COMB_FILE,                  /* Specified on FILE= subcommand. */
61     COMB_TABLE                  /* Specified on TABLE= subcommand. */
62   };
63
64 /* One FILE or TABLE subcommand. */
65 struct comb_file
66   {
67     /* Basics. */
68     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
69     int start_ofs, end_ofs;     /* Lexer offsets. */
70
71     /* Variables. */
72     struct subcase by_vars;     /* BY variables in this input file. */
73     struct subcase src, dst;    /* Data to copy to output; where to put it. */
74     const struct missing_values **mv; /* Each variable's missing values. */
75
76     /* Input files. */
77     struct file_handle *handle; /* Input file handle. */
78     struct dictionary *dict;    /* Input file dictionary. */
79     struct casereader *reader;  /* Input data source. */
80     struct ccase *data;         /* The current input case. */
81     bool is_minimal;            /* Does 'data' have minimum BY values across
82                                    all input files? */
83     bool is_sorted;             /* Is file presorted on the BY variables? */
84
85     /* IN subcommand. */
86     char *in_name;
87     int in_ofs;
88     struct variable *in_var;
89   };
90
91 struct comb_proc
92   {
93     struct comb_file *files;    /* All the files being merged. */
94     size_t n_files;             /* Number of files. */
95
96     struct dictionary *dict;    /* Dictionary of output file. */
97     struct subcase by_vars;     /* BY variables in the output. */
98     struct casewriter *output;  /* Destination for output. */
99
100     size_t *var_sources;
101     size_t n_var_sources, allocated_var_sources;
102
103     struct case_matcher *matcher;
104
105     /* FIRST, LAST.
106        Only if "first" or "last" is nonnull are the remaining
107        members used. */
108     struct variable *first;     /* Variable specified on FIRST (if any). */
109     struct variable *last;      /* Variable specified on LAST (if any). */
110     struct ccase *buffered_case; /* Case ready for output except that we don't
111                                     know the value for the LAST var yet. */
112     union value *prev_BY;       /* Values of BY vars in buffered_case. */
113   };
114
115 static int combine_files (enum comb_command_type, struct lexer *,
116                           struct dataset *);
117 static void free_comb_proc (struct comb_proc *);
118
119 static void close_all_comb_files (struct comb_proc *);
120 static bool merge_dictionary (struct comb_proc *, struct lexer *,
121                               struct comb_file *);
122
123 static void execute_update (struct comb_proc *);
124 static void execute_match_files (struct comb_proc *);
125 static void execute_add_files (struct comb_proc *);
126
127 static bool create_flag_var (struct lexer *lexer, const char *subcommand_name,
128                              const char *var_name, int var_ofs,
129                              struct dictionary *, struct variable **);
130 static void output_case (struct comb_proc *, struct ccase *, union value *by);
131 static void output_buffered_case (struct comb_proc *);
132
133 int
134 cmd_add_files (struct lexer *lexer, struct dataset *ds)
135 {
136   return combine_files (COMB_ADD, lexer, ds);
137 }
138
139 int
140 cmd_match_files (struct lexer *lexer, struct dataset *ds)
141 {
142   return combine_files (COMB_MATCH, lexer, ds);
143 }
144
145 int
146 cmd_update (struct lexer *lexer, struct dataset *ds)
147 {
148   return combine_files (COMB_UPDATE, lexer, ds);
149 }
150
151 static int
152 combine_files (enum comb_command_type command,
153                struct lexer *lexer, struct dataset *ds)
154 {
155   struct comb_proc proc = {
156     .dict = dict_create (get_default_encoding ()),
157   };
158
159   bool saw_by = false;
160   bool saw_sort = false;
161   struct casereader *active_file = NULL;
162
163   char *first_name = NULL;
164   int first_ofs = 0;
165   char *last_name = NULL;
166   int last_ofs = 0;
167
168   struct taint *taint = NULL;
169
170   size_t table_idx = SIZE_MAX;
171   int sort_ofs = INT_MAX;
172   size_t allocated_files = 0;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       int start_ofs = lex_ofs (lexer);
180       enum comb_file_type type;
181       if (lex_match_id (lexer, "FILE"))
182         type = COMB_FILE;
183       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
184         {
185           type = COMB_TABLE;
186           table_idx = MIN (table_idx, proc.n_files);
187         }
188       else
189         break;
190       lex_match (lexer, T_EQUALS);
191
192       if (proc.n_files >= allocated_files)
193         proc.files = x2nrealloc (proc.files, &allocated_files,
194                                 sizeof *proc.files);
195       struct comb_file *file = &proc.files[proc.n_files++];
196       *file = (struct comb_file) {
197         .type = type,
198         .start_ofs = start_ofs,
199         .is_sorted = true,
200       };
201
202       if (lex_match (lexer, T_ASTERISK))
203         {
204           if (!dataset_has_source (ds))
205             {
206               lex_next_error (lexer, -1, -1,
207                               _("Cannot specify the active dataset since none "
208                                 "has been defined."));
209               goto error;
210             }
211
212           if (proc_make_temporary_transformations_permanent (ds))
213             lex_next_error (lexer, -1, -1,
214                             _("This command may not be used after TEMPORARY "
215                               "when the active dataset is an input source.  "
216                               "Temporary transformations will be made "
217                               "permanent."));
218
219           file->dict = dict_clone (dataset_dict (ds));
220         }
221       else
222         {
223           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
224           if (file->handle == NULL)
225             goto error;
226
227           file->reader = any_reader_open_and_decode (file->handle, NULL,
228                                                      &file->dict, NULL);
229           if (file->reader == NULL)
230             goto error;
231         }
232       file->end_ofs = lex_ofs (lexer) - 1;
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (!lex_force_id (lexer))
244               goto error;
245
246             if (file->in_name)
247               {
248                 lex_error (lexer, _("Multiple IN subcommands for a single FILE "
249                                     "or TABLE."));
250                 goto error;
251               }
252             file->in_name = xstrdup (lex_tokcstr (lexer));
253             file->in_ofs = lex_ofs (lexer);
254             lex_get (lexer);
255           }
256         else if (lex_match_id (lexer, "SORT"))
257           {
258             file->is_sorted = false;
259             saw_sort = true;
260             sort_ofs = MIN (sort_ofs, lex_ofs (lexer) - 1);
261           }
262
263       if (!merge_dictionary (&proc, lexer, file))
264         goto error;
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           if (saw_by)
272             {
273               lex_sbc_only_once (lexer, "BY");
274               goto error;
275             }
276           saw_by = true;
277
278           lex_match (lexer, T_EQUALS);
279
280           const struct variable **by_vars;
281           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
282                                     &by_vars, NULL))
283             goto error;
284
285           bool ok = true;
286           for (size_t i = 0; i < proc.n_files; i++)
287             {
288               struct comb_file *file = &proc.files[i];
289               for (size_t j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
290                 {
291                   const char *name = var_get_name (by_vars[j]);
292                   struct variable *var = dict_lookup_var (file->dict, name);
293                   if (var != NULL)
294                     subcase_add_var (&file->by_vars, var,
295                                      subcase_get_direction (&proc.by_vars, j));
296                   else
297                     {
298                       const char *fn
299                         = file->handle ? fh_get_name (file->handle) : "*";
300                       lex_ofs_error (lexer, file->start_ofs, file->end_ofs,
301                                      _("File %s lacks BY variable %s."),
302                                      fn, name);
303                       ok = false;
304                     }
305                 }
306               assert (!ok || subcase_conformable (&file->by_vars,
307                                                   &proc.files[0].by_vars));
308             }
309           free (by_vars);
310
311           if (!ok)
312             goto error;
313         }
314       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
315         {
316           if (first_name != NULL)
317             {
318               lex_sbc_only_once (lexer, "FIRST");
319               goto error;
320             }
321
322           lex_match (lexer, T_EQUALS);
323           if (!lex_force_id (lexer))
324             goto error;
325           first_name = xstrdup (lex_tokcstr (lexer));
326           first_ofs = lex_ofs (lexer);
327           lex_get (lexer);
328         }
329       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
330         {
331           if (last_name != NULL)
332             {
333               lex_sbc_only_once (lexer, "LAST");
334               goto error;
335             }
336
337           lex_match (lexer, T_EQUALS);
338           if (!lex_force_id (lexer))
339             goto error;
340           last_name = xstrdup (lex_tokcstr (lexer));
341           last_ofs = lex_ofs (lexer);
342           lex_get (lexer);
343         }
344       else if (lex_match_id (lexer, "MAP"))
345         {
346           /* FIXME. */
347         }
348       else if (lex_match_id (lexer, "DROP"))
349         {
350           if (!parse_dict_drop (lexer, proc.dict))
351             goto error;
352         }
353       else if (lex_match_id (lexer, "KEEP"))
354         {
355           if (!parse_dict_keep (lexer, proc.dict))
356             goto error;
357         }
358       else
359         {
360           lex_error (lexer, NULL);
361           goto error;
362         }
363
364       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
365         {
366           lex_end_of_command (lexer);
367           goto error;
368         }
369     }
370
371   if (!saw_by)
372     {
373       if (command == COMB_UPDATE)
374         {
375           lex_sbc_missing (lexer, "BY");
376           goto error;
377         }
378       if (table_idx != SIZE_MAX)
379         {
380           const struct comb_file *table = &proc.files[table_idx];
381           lex_ofs_error (lexer, table->start_ofs, table->end_ofs,
382                          _("BY is required when %s is specified."), "TABLE");
383           goto error;
384         }
385       if (saw_sort)
386         {
387           lex_ofs_error (lexer, sort_ofs, sort_ofs,
388                          _("BY is required when %s is specified."), "SORT");
389           goto error;
390         }
391     }
392
393   /* Add IN, FIRST, and LAST variables to master dictionary. */
394   for (size_t i = 0; i < proc.n_files; i++)
395     {
396       struct comb_file *file = &proc.files[i];
397       if (!create_flag_var (lexer, "IN", file->in_name, file->in_ofs,
398                             proc.dict, &file->in_var))
399         goto error;
400     }
401   if (!create_flag_var (lexer, "FIRST", first_name, first_ofs, proc.dict, &proc.first)
402       || !create_flag_var (lexer, "LAST", last_name, last_ofs, proc.dict, &proc.last))
403     goto error;
404
405   dict_delete_scratch_vars (proc.dict);
406   dict_compact_values (proc.dict);
407
408   /* Set up mapping from each file's variables to master
409      variables. */
410   for (size_t i = 0; i < proc.n_files; i++)
411     {
412       struct comb_file *file = &proc.files[i];
413       size_t src_n_vars = dict_get_n_vars (file->dict);
414
415       file->mv = xnmalloc (src_n_vars, sizeof *file->mv);
416       for (size_t j = 0; j < src_n_vars; j++)
417         {
418           struct variable *src_var = dict_get_var (file->dict, j);
419           struct variable *dst_var = dict_lookup_var (proc.dict,
420                                                       var_get_name (src_var));
421           if (dst_var != NULL)
422             {
423               size_t n = subcase_get_n_fields (&file->src);
424               file->mv[n] = var_get_missing_values (src_var);
425               subcase_add_var (&file->src, src_var, SC_ASCEND);
426               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
427             }
428         }
429     }
430
431   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
432   taint = taint_clone (casewriter_get_taint (proc.output));
433
434   /* Set up case matcher. */
435   proc.matcher = case_matcher_create ();
436   for (size_t i = 0; i < proc.n_files; i++)
437     {
438       struct comb_file *file = &proc.files[i];
439       if (file->reader == NULL)
440         {
441           if (active_file == NULL)
442             {
443               proc_discard_output (ds);
444               file->reader = active_file = proc_open_filtering (ds, false);
445             }
446           else
447             file->reader = casereader_clone (active_file);
448         }
449       if (!file->is_sorted)
450         file->reader = sort_execute (file->reader, &file->by_vars);
451       taint_propagate (casereader_get_taint (file->reader), taint);
452       file->data = casereader_read (file->reader);
453       if (file->type == COMB_FILE)
454         case_matcher_add_input (proc.matcher, &file->by_vars,
455                                 &file->data, &file->is_minimal);
456     }
457
458   if (command == COMB_ADD)
459     execute_add_files (&proc);
460   else if (command == COMB_MATCH)
461     execute_match_files (&proc);
462   else if (command == COMB_UPDATE)
463     execute_update (&proc);
464   else
465     NOT_REACHED ();
466
467   case_matcher_destroy (proc.matcher);
468   proc.matcher = NULL;
469   close_all_comb_files (&proc);
470   if (active_file != NULL)
471     proc_commit (ds);
472
473   dataset_set_dict (ds, proc.dict);
474   dataset_set_source (ds, casewriter_make_reader (proc.output));
475   proc.dict = NULL;
476   proc.output = NULL;
477
478   free_comb_proc (&proc);
479
480   free (first_name);
481   free (last_name);
482
483   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
484
485  error:
486   if (active_file != NULL)
487     proc_commit (ds);
488   free_comb_proc (&proc);
489   taint_destroy (taint);
490   free (first_name);
491   free (last_name);
492   return CMD_CASCADING_FAILURE;
493 }
494
495 /* Merge the dictionary for file F into master dictionary for PROC. */
496 static bool
497 merge_dictionary (struct comb_proc *proc, struct lexer *lexer,
498                   struct comb_file *f)
499 {
500   struct dictionary *m = proc->dict;
501   struct dictionary *d = f->dict;
502
503   if (dict_get_label (m) == NULL)
504     dict_set_label (m, dict_get_label (d));
505
506   /* FIXME: If the input files have different encodings, then
507      the result is undefined.
508      The correct thing to do would be to convert to an encoding
509      which can cope with all the input files (eg UTF-8).
510    */
511   if (strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
512     msg (MW, _("Combining files with incompatible encodings. String data may "
513                "not be represented correctly."));
514
515   const struct string_array *d_docs = dict_get_documents (d);
516   const struct string_array *m_docs = dict_get_documents (m);
517   if (d_docs)
518     {
519       if (!m_docs)
520         dict_set_documents (m, d_docs);
521       else
522         {
523           size_t n = m_docs->n + d_docs->n;
524           struct string_array new_docs = {
525             .strings = xmalloc (n * sizeof *new_docs.strings),
526           };
527           for (size_t i = 0; i < m_docs->n; i++)
528             new_docs.strings[new_docs.n++] = m_docs->strings[i];
529           for (size_t i = 0; i < d_docs->n; i++)
530             new_docs.strings[new_docs.n++] = d_docs->strings[i];
531
532           dict_set_documents (m, &new_docs);
533
534           free (new_docs.strings);
535         }
536     }
537
538   for (size_t i = 0; i < dict_get_n_vars (d); i++)
539     {
540       struct variable *dv = dict_get_var (d, i);
541       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
542
543       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
544         continue;
545
546       if (!mv)
547         {
548           mv = dict_clone_var_assert (m, dv);
549           if (proc->n_var_sources >= proc->allocated_var_sources)
550             proc->var_sources = x2nrealloc (proc->var_sources,
551                                             &proc->allocated_var_sources,
552                                             sizeof *proc->var_sources);
553           proc->var_sources[proc->n_var_sources++] = f - proc->files;
554         }
555       else
556         {
557           if (var_get_width (mv) != var_get_width (dv))
558             {
559               const char *var_name = var_get_name (dv);
560               msg (SE, _("Variable %s has different type or width in different "
561                          "files."), var_name);
562
563               for (size_t j = 0; j < 2; j++)
564                 {
565                   const struct variable *ev = !j ? mv : dv;
566                   const struct comb_file *ef
567                     = !j ? &proc->files[proc->var_sources[var_get_dict_index (mv)]] : f;
568                   const char *fn = ef->handle ? fh_get_name (ef->handle) : "*";
569
570                   if (var_is_numeric (ev))
571                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
572                                  _("In file %s, %s is numeric."),
573                                  fn, var_name);
574                   else
575                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
576                                  _("In file %s, %s is a string with width %d."),
577                                  fn, var_name, var_get_width (ev));
578                 }
579
580               return false;
581             }
582
583           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
584             var_set_value_labels (mv, var_get_value_labels (dv));
585           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
586             var_set_missing_values (mv, var_get_missing_values (dv));
587           if (var_get_label (dv) && !var_get_label (mv))
588             var_set_label (mv, var_get_label (dv));
589         }
590     }
591
592   return true;
593 }
594
595 /* If VAR_NAME is non-NULL, attempts to create a
596    variable named VAR_NAME, with format F1.0, in DICT, and stores
597    a pointer to the variable in *VAR.  Returns true if
598    successful, false if the variable name is a duplicate (in
599    which case a message saying that the variable specified on the
600    given SUBCOMMAND is a duplicate is emitted).
601
602    Does nothing and returns true if VAR_NAME is null. */
603 static bool
604 create_flag_var (struct lexer *lexer, const char *subcommand,
605                  const char *var_name, int var_ofs,
606                  struct dictionary *dict, struct variable **var)
607 {
608   if (var_name != NULL)
609     {
610       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
611       *var = dict_create_var (dict, var_name, 0);
612       if (*var == NULL)
613         {
614           lex_ofs_error (lexer, var_ofs, var_ofs,
615                          _("Variable name %s specified on %s subcommand "
616                            "duplicates an existing variable name."),
617                          var_name, subcommand);
618           return false;
619         }
620       var_set_both_formats (*var, &format);
621     }
622   else
623     *var = NULL;
624   return true;
625 }
626
627 /* Closes all the files in PROC and frees their associated data. */
628 static void
629 close_all_comb_files (struct comb_proc *proc)
630 {
631   for (size_t i = 0; i < proc->n_files; i++)
632     {
633       struct comb_file *file = &proc->files[i];
634       subcase_uninit (&file->by_vars);
635       subcase_uninit (&file->src);
636       subcase_uninit (&file->dst);
637       free (file->mv);
638       fh_unref (file->handle);
639       dict_unref (file->dict);
640       casereader_destroy (file->reader);
641       case_unref (file->data);
642       free (file->in_name);
643     }
644   free (proc->files);
645   proc->files = NULL;
646   proc->n_files = 0;
647 }
648
649 /* Frees all the data for the procedure. */
650 static void
651 free_comb_proc (struct comb_proc *proc)
652 {
653   close_all_comb_files (proc);
654   dict_unref (proc->dict);
655   casewriter_destroy (proc->output);
656   case_matcher_destroy (proc->matcher);
657   if (proc->prev_BY)
658     {
659       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
660                                 proc->prev_BY);
661       free (proc->prev_BY);
662     }
663   subcase_uninit (&proc->by_vars);
664   case_unref (proc->buffered_case);
665   free (proc->var_sources);
666 }
667 \f
668 static bool scan_table (struct comb_file *, union value by[]);
669 static struct ccase *create_output_case (const struct comb_proc *);
670 static void apply_case (const struct comb_file *, struct ccase *);
671 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
672 static void advance_file (struct comb_file *, union value by[]);
673 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
674 static void output_buffered_case (struct comb_proc *);
675
676 /* Executes the ADD FILES command. */
677 static void
678 execute_add_files (struct comb_proc *proc)
679 {
680   union value *by;
681
682   while (case_matcher_match (proc->matcher, &by))
683     for (size_t i = 0; i < proc->n_files; i++)
684       {
685         struct comb_file *file = &proc->files[i];
686         while (file->is_minimal)
687           {
688             struct ccase *output = create_output_case (proc);
689             apply_case (file, output);
690             advance_file (file, by);
691             output_case (proc, output, by);
692           }
693       }
694   output_buffered_case (proc);
695 }
696
697 /* Executes the MATCH FILES command. */
698 static void
699 execute_match_files (struct comb_proc *proc)
700 {
701   union value *by;
702
703   while (case_matcher_match (proc->matcher, &by))
704     {
705       struct ccase *output = create_output_case (proc);
706       for (size_t i = proc->n_files; i-- > 0;)
707         {
708           struct comb_file *file = &proc->files[i];
709           if (file->type == COMB_FILE)
710             {
711               if (file->is_minimal)
712                 {
713                   apply_case (file, output);
714                   advance_file (file, NULL);
715                 }
716             }
717           else
718             {
719               if (scan_table (file, by))
720                 apply_case (file, output);
721             }
722         }
723       output_case (proc, output, by);
724     }
725   output_buffered_case (proc);
726 }
727
728 /* Executes the UPDATE command. */
729 static void
730 execute_update (struct comb_proc *proc)
731 {
732   union value *by;
733   size_t n_duplicates = 0;
734
735   while (case_matcher_match (proc->matcher, &by))
736     {
737       struct comb_file *first, *file;
738       struct ccase *output;
739
740       /* Find first nonnull case in array and make an output case
741          from it. */
742       output = create_output_case (proc);
743       for (first = &proc->files[0]; ; first++)
744         if (first->is_minimal)
745           break;
746       apply_case (first, output);
747       advance_file (first, by);
748
749       /* Read additional cases and update the output case from
750          them.  (Don't update the output case from any duplicate
751          cases in the master file.) */
752       for (file = first + (first == proc->files);
753            file < &proc->files[proc->n_files]; file++)
754         {
755           while (file->is_minimal)
756             {
757               apply_nonmissing_case (file, output);
758               advance_file (file, by);
759             }
760         }
761       casewriter_write (proc->output, output);
762
763       /* Write duplicate cases in the master file directly to the
764          output.  */
765       if (first == proc->files && first->is_minimal)
766         {
767           n_duplicates++;
768           while (first->is_minimal)
769             {
770               output = create_output_case (proc);
771               apply_case (first, output);
772               advance_file (first, by);
773               casewriter_write (proc->output, output);
774             }
775         }
776     }
777
778   if (n_duplicates)
779     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
780          n_duplicates);
781 }
782
783 /* Reads FILE, which must be of type COMB_TABLE, until it
784    encounters a case with BY or greater for its BY variables.
785    Returns true if a case with exactly BY for its BY variables
786    was found, otherwise false. */
787 static bool
788 scan_table (struct comb_file *file, union value by[])
789 {
790   while (file->data != NULL)
791     {
792       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
793       if (cmp > 0)
794         {
795           case_unref (file->data);
796           file->data = casereader_read (file->reader);
797         }
798       else
799         return cmp == 0;
800     }
801   return false;
802 }
803
804 /* Creates and returns an output case for PROC, initializing each
805    of its values to system-missing or blanks, except that the
806    values of IN variables are set to 0. */
807 static struct ccase *
808 create_output_case (const struct comb_proc *proc)
809 {
810   size_t n_vars = dict_get_n_vars (proc->dict);
811   struct ccase *output = case_create (dict_get_proto (proc->dict));
812   for (size_t i = 0; i < n_vars; i++)
813     {
814       struct variable *v = dict_get_var (proc->dict, i);
815       value_set_missing (case_data_rw (output, v), var_get_width (v));
816     }
817   for (size_t i = 0; i < proc->n_files; i++)
818     {
819       struct comb_file *file = &proc->files[i];
820       if (file->in_var != NULL)
821         *case_num_rw (output, file->in_var) = false;
822     }
823   return output;
824 }
825
826 static void
827 mark_file_used (const struct comb_file *file, struct ccase *output)
828 {
829   if (file->in_var != NULL)
830     *case_num_rw (output, file->in_var) = true;
831 }
832
833 /* Copies the data from FILE's case into output case OUTPUT.
834    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
835 static void
836 apply_case (const struct comb_file *file, struct ccase *output)
837 {
838   subcase_copy (&file->src, file->data, &file->dst, output);
839   mark_file_used (file, output);
840 }
841
842 /* Copies the data from FILE's case into output case OUTPUT,
843    skipping values that are missing or all spaces.
844
845    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
846 static void
847 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
848 {
849   for (size_t i = 0; i < subcase_get_n_fields (&file->src); i++)
850     {
851       const struct subcase_field *src_field = &file->src.fields[i];
852       const struct subcase_field *dst_field = &file->dst.fields[i];
853       const union value *src_value
854         = case_data_idx (file->data, src_field->case_index);
855       int width = src_field->width;
856
857       if (!mv_is_value_missing (file->mv[i], src_value)
858           && !(width > 0 && value_is_spaces (src_value, width)))
859         value_copy (case_data_rw_idx (output, dst_field->case_index),
860                     src_value, width);
861     }
862   mark_file_used (file, output);
863 }
864
865 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
866    member is updated based on whether the new case's BY values still match
867    those in BY. */
868 static void
869 advance_file (struct comb_file *file, union value by[])
870 {
871   case_unref (file->data);
872   file->data = casereader_read (file->reader);
873   if (by)
874     file->is_minimal = (file->data != NULL
875                         && subcase_equal_cx (&file->by_vars, file->data, by));
876 }
877
878 /* Writes OUTPUT, whose BY values has been extracted into BY, to
879    PROC's output file, first initializing any FIRST or LAST
880    variables in OUTPUT to the correct values. */
881 static void
882 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
883 {
884   if (proc->first == NULL && proc->last == NULL)
885     casewriter_write (proc->output, output);
886   else
887     {
888       /* It's harder with LAST, because we can't know whether
889          this case is the last in a group until we've prepared
890          the *next* case also.  Thus, we buffer the previous
891          output case until the next one is ready. */
892       bool new_BY;
893       if (proc->prev_BY != NULL)
894         {
895           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
896           if (proc->last != NULL)
897             *case_num_rw (proc->buffered_case, proc->last) = new_BY;
898           casewriter_write (proc->output, proc->buffered_case);
899         }
900       else
901         new_BY = true;
902
903       proc->buffered_case = output;
904       if (proc->first != NULL)
905         *case_num_rw (proc->buffered_case, proc->first) = new_BY;
906
907       if (new_BY)
908         {
909           size_t n_values = subcase_get_n_fields (&proc->by_vars);
910           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
911           if (proc->prev_BY == NULL)
912             {
913               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
914               caseproto_init_values (proto, proc->prev_BY);
915             }
916           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
917                           proc->prev_BY, by);
918         }
919     }
920 }
921
922 /* Writes a trailing buffered case to the output, if FIRST or
923    LAST is in use. */
924 static void
925 output_buffered_case (struct comb_proc *proc)
926 {
927   if (proc->prev_BY != NULL)
928     {
929       if (proc->last != NULL)
930         *case_num_rw (proc->buffered_case, proc->last) = 1.0;
931       casewriter_write (proc->output, proc->buffered_case);
932       proc->buffered_case = NULL;
933     }
934 }