4c7b4661f0ce52eb28d0c62f2b6426542159d257
[pspp] / src / language / commands / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/commands/file-handle.h"
33 #include "language/commands/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/commands/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 enum comb_command_type
51   {
52     COMB_ADD,
53     COMB_MATCH,
54     COMB_UPDATE
55   };
56
57 /* File types. */
58 enum comb_file_type
59   {
60     COMB_FILE,                  /* Specified on FILE= subcommand. */
61     COMB_TABLE                  /* Specified on TABLE= subcommand. */
62   };
63
64 /* One FILE or TABLE subcommand. */
65 struct comb_file
66   {
67     /* Basics. */
68     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
69     int start_ofs, end_ofs;     /* Lexer offsets. */
70
71     /* Variables. */
72     struct subcase by_vars;     /* BY variables in this input file. */
73     struct subcase src, dst;    /* Data to copy to output; where to put it. */
74     const struct missing_values **mv; /* Each variable's missing values. */
75
76     /* Input files. */
77     struct file_handle *handle; /* Input file handle. */
78     struct dictionary *dict;    /* Input file dictionary. */
79     struct casereader *reader;  /* Input data source. */
80     struct ccase *data;         /* The current input case. */
81     bool is_minimal;            /* Does 'data' have minimum BY values across
82                                    all input files? */
83     bool is_sorted;             /* Is file presorted on the BY variables? */
84
85     /* IN subcommand. */
86     char *in_name;
87     int in_ofs;
88     struct variable *in_var;
89   };
90
91 struct comb_proc
92   {
93     struct comb_file *files;    /* All the files being merged. */
94     size_t n_files;             /* Number of files. */
95
96     struct dictionary *dict;    /* Dictionary of output file. */
97     struct subcase by_vars;     /* BY variables in the output. */
98     struct casewriter *output;  /* Destination for output. */
99
100     size_t *var_sources;
101     size_t n_var_sources, allocated_var_sources;
102
103     struct case_matcher *matcher;
104
105     /* FIRST, LAST.
106        Only if "first" or "last" is nonnull are the remaining
107        members used. */
108     struct variable *first;     /* Variable specified on FIRST (if any). */
109     struct variable *last;      /* Variable specified on LAST (if any). */
110     struct ccase *buffered_case; /* Case ready for output except that we don't
111                                     know the value for the LAST var yet. */
112     union value *prev_BY;       /* Values of BY vars in buffered_case. */
113   };
114
115 static int combine_files (enum comb_command_type, struct lexer *,
116                           struct dataset *);
117 static void free_comb_proc (struct comb_proc *);
118
119 static void close_all_comb_files (struct comb_proc *);
120 static bool merge_dictionary (struct comb_proc *, struct lexer *,
121                               struct comb_file *);
122
123 static void execute_update (struct comb_proc *);
124 static void execute_match_files (struct comb_proc *);
125 static void execute_add_files (struct comb_proc *);
126
127 static bool create_flag_var (struct lexer *lexer, const char *subcommand_name,
128                              const char *var_name, int var_ofs,
129                              struct dictionary *, struct variable **);
130 static void output_case (struct comb_proc *, struct ccase *, union value *by);
131 static void output_buffered_case (struct comb_proc *);
132
133 int
134 cmd_add_files (struct lexer *lexer, struct dataset *ds)
135 {
136   return combine_files (COMB_ADD, lexer, ds);
137 }
138
139 int
140 cmd_match_files (struct lexer *lexer, struct dataset *ds)
141 {
142   return combine_files (COMB_MATCH, lexer, ds);
143 }
144
145 int
146 cmd_update (struct lexer *lexer, struct dataset *ds)
147 {
148   return combine_files (COMB_UPDATE, lexer, ds);
149 }
150
151 static int
152 combine_files (enum comb_command_type command,
153                struct lexer *lexer, struct dataset *ds)
154 {
155   struct comb_proc proc = {
156     .dict = dict_create (get_default_encoding ()),
157   };
158
159   bool saw_by = false;
160   bool saw_sort = false;
161   struct casereader *active_file = NULL;
162
163   char *first_name = NULL;
164   int first_ofs = 0;
165   char *last_name = NULL;
166   int last_ofs = 0;
167
168   struct taint *taint = NULL;
169
170   size_t table_idx = SIZE_MAX;
171   int sort_ofs = INT_MAX;
172   size_t allocated_files = 0;
173
174   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
175
176   lex_match (lexer, T_SLASH);
177   for (;;)
178     {
179       int start_ofs = lex_ofs (lexer);
180       enum comb_file_type type;
181       if (lex_match_id (lexer, "FILE"))
182         type = COMB_FILE;
183       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
184         {
185           type = COMB_TABLE;
186           table_idx = MIN (table_idx, proc.n_files);
187         }
188       else
189         break;
190       lex_match (lexer, T_EQUALS);
191
192       if (proc.n_files >= allocated_files)
193         proc.files = x2nrealloc (proc.files, &allocated_files,
194                                 sizeof *proc.files);
195       struct comb_file *file = &proc.files[proc.n_files++];
196       *file = (struct comb_file) {
197         .type = type,
198         .start_ofs = start_ofs,
199         .is_sorted = true,
200       };
201
202       if (lex_match (lexer, T_ASTERISK))
203         {
204           if (!dataset_has_source (ds))
205             {
206               lex_next_error (lexer, -1, -1,
207                               _("Cannot specify the active dataset since none "
208                                 "has been defined."));
209               goto error;
210             }
211
212           if (proc_make_temporary_transformations_permanent (ds))
213             lex_next_error (lexer, -1, -1,
214                             _("This command may not be used after TEMPORARY "
215                               "when the active dataset is an input source.  "
216                               "Temporary transformations will be made "
217                               "permanent."));
218
219           file->dict = dict_clone (dataset_dict (ds));
220         }
221       else
222         {
223           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
224           if (file->handle == NULL)
225             goto error;
226
227           file->reader = any_reader_open_and_decode (file->handle, NULL,
228                                                      &file->dict, NULL);
229           if (file->reader == NULL)
230             goto error;
231         }
232       file->end_ofs = lex_ofs (lexer) - 1;
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (!lex_force_id (lexer))
244               goto error;
245
246             if (file->in_name)
247               {
248                 lex_error (lexer, _("Multiple IN subcommands for a single FILE "
249                                     "or TABLE."));
250                 goto error;
251               }
252             file->in_name = xstrdup (lex_tokcstr (lexer));
253             file->in_ofs = lex_ofs (lexer);
254             lex_get (lexer);
255           }
256         else if (lex_match_id (lexer, "SORT"))
257           {
258             file->is_sorted = false;
259             saw_sort = true;
260             sort_ofs = MIN (sort_ofs, lex_ofs (lexer) - 1);
261           }
262
263       if (!merge_dictionary (&proc, lexer, file))
264         goto error;
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           if (saw_by)
272             {
273               lex_sbc_only_once (lexer, "BY");
274               goto error;
275             }
276           saw_by = true;
277
278           lex_match (lexer, T_EQUALS);
279
280           const struct variable **by_vars;
281           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
282                                     &by_vars, NULL))
283             goto error;
284
285           bool ok = true;
286           for (size_t i = 0; i < proc.n_files; i++)
287             {
288               struct comb_file *file = &proc.files[i];
289               for (size_t j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
290                 {
291                   const char *name = var_get_name (by_vars[j]);
292                   struct variable *var = dict_lookup_var (file->dict, name);
293                   if (var != NULL)
294                     subcase_add_var (&file->by_vars, var,
295                                      subcase_get_direction (&proc.by_vars, j));
296                   else
297                     {
298                       const char *fn
299                         = file->handle ? fh_get_name (file->handle) : "*";
300                       lex_ofs_error (lexer, file->start_ofs, file->end_ofs,
301                                      _("File %s lacks BY variable %s."),
302                                      fn, name);
303                       ok = false;
304                     }
305                 }
306               assert (!ok || subcase_conformable (&file->by_vars,
307                                                   &proc.files[0].by_vars));
308             }
309           free (by_vars);
310
311           if (!ok)
312             goto error;
313         }
314       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
315         {
316           if (first_name != NULL)
317             {
318               lex_sbc_only_once (lexer, "FIRST");
319               goto error;
320             }
321
322           lex_match (lexer, T_EQUALS);
323           if (!lex_force_id (lexer))
324             goto error;
325           first_name = xstrdup (lex_tokcstr (lexer));
326           first_ofs = lex_ofs (lexer);
327           lex_get (lexer);
328         }
329       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
330         {
331           if (last_name != NULL)
332             {
333               lex_sbc_only_once (lexer, "LAST");
334               goto error;
335             }
336
337           lex_match (lexer, T_EQUALS);
338           if (!lex_force_id (lexer))
339             goto error;
340           last_name = xstrdup (lex_tokcstr (lexer));
341           last_ofs = lex_ofs (lexer);
342           lex_get (lexer);
343         }
344       else if (lex_match_id (lexer, "MAP"))
345         {
346           /* FIXME. */
347         }
348       else if (lex_match_id (lexer, "DROP"))
349         {
350           if (!parse_dict_drop (lexer, proc.dict))
351             goto error;
352         }
353       else if (lex_match_id (lexer, "KEEP"))
354         {
355           if (!parse_dict_keep (lexer, proc.dict))
356             goto error;
357         }
358       else
359         {
360           if (command == COMB_UPDATE)
361             lex_error_expecting (lexer, "BY", "MAP", "DROP", "KEEP");
362           else
363             lex_error_expecting (lexer, "BY", "FIRST", "LAST",
364                                  "MAP", "DROP", "KEEP");
365           goto error;
366         }
367
368       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
369         {
370           lex_end_of_command (lexer);
371           goto error;
372         }
373     }
374
375   if (!saw_by)
376     {
377       if (command == COMB_UPDATE)
378         {
379           lex_sbc_missing (lexer, "BY");
380           goto error;
381         }
382       if (table_idx != SIZE_MAX)
383         {
384           const struct comb_file *table = &proc.files[table_idx];
385           lex_ofs_error (lexer, table->start_ofs, table->end_ofs,
386                          _("BY is required when %s is specified."), "TABLE");
387           goto error;
388         }
389       if (saw_sort)
390         {
391           lex_ofs_error (lexer, sort_ofs, sort_ofs,
392                          _("BY is required when %s is specified."), "SORT");
393           goto error;
394         }
395     }
396
397   /* Add IN, FIRST, and LAST variables to master dictionary. */
398   for (size_t i = 0; i < proc.n_files; i++)
399     {
400       struct comb_file *file = &proc.files[i];
401       if (!create_flag_var (lexer, "IN", file->in_name, file->in_ofs,
402                             proc.dict, &file->in_var))
403         goto error;
404     }
405   if (!create_flag_var (lexer, "FIRST", first_name, first_ofs, proc.dict, &proc.first)
406       || !create_flag_var (lexer, "LAST", last_name, last_ofs, proc.dict, &proc.last))
407     goto error;
408
409   dict_delete_scratch_vars (proc.dict);
410   dict_compact_values (proc.dict);
411
412   /* Set up mapping from each file's variables to master
413      variables. */
414   for (size_t i = 0; i < proc.n_files; i++)
415     {
416       struct comb_file *file = &proc.files[i];
417       size_t src_n_vars = dict_get_n_vars (file->dict);
418
419       file->mv = xnmalloc (src_n_vars, sizeof *file->mv);
420       for (size_t j = 0; j < src_n_vars; j++)
421         {
422           struct variable *src_var = dict_get_var (file->dict, j);
423           struct variable *dst_var = dict_lookup_var (proc.dict,
424                                                       var_get_name (src_var));
425           if (dst_var != NULL)
426             {
427               size_t n = subcase_get_n_fields (&file->src);
428               file->mv[n] = var_get_missing_values (src_var);
429               subcase_add_var (&file->src, src_var, SC_ASCEND);
430               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
431             }
432         }
433     }
434
435   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
436   taint = taint_clone (casewriter_get_taint (proc.output));
437
438   /* Set up case matcher. */
439   proc.matcher = case_matcher_create ();
440   for (size_t i = 0; i < proc.n_files; i++)
441     {
442       struct comb_file *file = &proc.files[i];
443       if (file->reader == NULL)
444         {
445           if (active_file == NULL)
446             {
447               proc_discard_output (ds);
448               file->reader = active_file = proc_open_filtering (ds, false);
449             }
450           else
451             file->reader = casereader_clone (active_file);
452         }
453       if (!file->is_sorted)
454         file->reader = sort_execute (file->reader, &file->by_vars);
455       taint_propagate (casereader_get_taint (file->reader), taint);
456       file->data = casereader_read (file->reader);
457       if (file->type == COMB_FILE)
458         case_matcher_add_input (proc.matcher, &file->by_vars,
459                                 &file->data, &file->is_minimal);
460     }
461
462   if (command == COMB_ADD)
463     execute_add_files (&proc);
464   else if (command == COMB_MATCH)
465     execute_match_files (&proc);
466   else if (command == COMB_UPDATE)
467     execute_update (&proc);
468   else
469     NOT_REACHED ();
470
471   case_matcher_destroy (proc.matcher);
472   proc.matcher = NULL;
473   close_all_comb_files (&proc);
474   if (active_file != NULL)
475     proc_commit (ds);
476
477   dataset_set_dict (ds, proc.dict);
478   dataset_set_source (ds, casewriter_make_reader (proc.output));
479   proc.dict = NULL;
480   proc.output = NULL;
481
482   free_comb_proc (&proc);
483
484   free (first_name);
485   free (last_name);
486
487   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
488
489  error:
490   if (active_file != NULL)
491     proc_commit (ds);
492   free_comb_proc (&proc);
493   taint_destroy (taint);
494   free (first_name);
495   free (last_name);
496   return CMD_CASCADING_FAILURE;
497 }
498
499 /* Merge the dictionary for file F into master dictionary for PROC. */
500 static bool
501 merge_dictionary (struct comb_proc *proc, struct lexer *lexer,
502                   struct comb_file *f)
503 {
504   struct dictionary *m = proc->dict;
505   struct dictionary *d = f->dict;
506
507   if (dict_get_label (m) == NULL)
508     dict_set_label (m, dict_get_label (d));
509
510   /* FIXME: If the input files have different encodings, then
511      the result is undefined.
512      The correct thing to do would be to convert to an encoding
513      which can cope with all the input files (eg UTF-8).
514    */
515   if (strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
516     msg (MW, _("Combining files with incompatible encodings. String data may "
517                "not be represented correctly."));
518
519   const struct string_array *d_docs = dict_get_documents (d);
520   const struct string_array *m_docs = dict_get_documents (m);
521   if (d_docs)
522     {
523       if (!m_docs)
524         dict_set_documents (m, d_docs);
525       else
526         {
527           size_t n = m_docs->n + d_docs->n;
528           struct string_array new_docs = {
529             .strings = xmalloc (n * sizeof *new_docs.strings),
530           };
531           for (size_t i = 0; i < m_docs->n; i++)
532             new_docs.strings[new_docs.n++] = m_docs->strings[i];
533           for (size_t i = 0; i < d_docs->n; i++)
534             new_docs.strings[new_docs.n++] = d_docs->strings[i];
535
536           dict_set_documents (m, &new_docs);
537
538           free (new_docs.strings);
539         }
540     }
541
542   for (size_t i = 0; i < dict_get_n_vars (d); i++)
543     {
544       struct variable *dv = dict_get_var (d, i);
545       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
546
547       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
548         continue;
549
550       if (!mv)
551         {
552           mv = dict_clone_var_assert (m, dv);
553           if (proc->n_var_sources >= proc->allocated_var_sources)
554             proc->var_sources = x2nrealloc (proc->var_sources,
555                                             &proc->allocated_var_sources,
556                                             sizeof *proc->var_sources);
557           proc->var_sources[proc->n_var_sources++] = f - proc->files;
558         }
559       else
560         {
561           if (var_get_width (mv) != var_get_width (dv))
562             {
563               const char *var_name = var_get_name (dv);
564               msg (SE, _("Variable %s has different type or width in different "
565                          "files."), var_name);
566
567               for (size_t j = 0; j < 2; j++)
568                 {
569                   const struct variable *ev = !j ? mv : dv;
570                   const struct comb_file *ef
571                     = !j ? &proc->files[proc->var_sources[var_get_dict_index (mv)]] : f;
572                   const char *fn = ef->handle ? fh_get_name (ef->handle) : "*";
573
574                   if (var_is_numeric (ev))
575                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
576                                  _("In file %s, %s is numeric."),
577                                  fn, var_name);
578                   else
579                     lex_ofs_msg (lexer, SN, ef->start_ofs, ef->end_ofs,
580                                  _("In file %s, %s is a string with width %d."),
581                                  fn, var_name, var_get_width (ev));
582                 }
583
584               return false;
585             }
586
587           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
588             var_set_value_labels (mv, var_get_value_labels (dv));
589           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
590             var_set_missing_values (mv, var_get_missing_values (dv));
591           if (var_get_label (dv) && !var_get_label (mv))
592             var_set_label (mv, var_get_label (dv));
593         }
594     }
595
596   return true;
597 }
598
599 /* If VAR_NAME is non-NULL, attempts to create a
600    variable named VAR_NAME, with format F1.0, in DICT, and stores
601    a pointer to the variable in *VAR.  Returns true if
602    successful, false if the variable name is a duplicate (in
603    which case a message saying that the variable specified on the
604    given SUBCOMMAND is a duplicate is emitted).
605
606    Does nothing and returns true if VAR_NAME is null. */
607 static bool
608 create_flag_var (struct lexer *lexer, const char *subcommand,
609                  const char *var_name, int var_ofs,
610                  struct dictionary *dict, struct variable **var)
611 {
612   if (var_name != NULL)
613     {
614       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
615       *var = dict_create_var (dict, var_name, 0);
616       if (*var == NULL)
617         {
618           lex_ofs_error (lexer, var_ofs, var_ofs,
619                          _("Variable name %s specified on %s subcommand "
620                            "duplicates an existing variable name."),
621                          var_name, subcommand);
622           return false;
623         }
624       var_set_both_formats (*var, &format);
625     }
626   else
627     *var = NULL;
628   return true;
629 }
630
631 /* Closes all the files in PROC and frees their associated data. */
632 static void
633 close_all_comb_files (struct comb_proc *proc)
634 {
635   for (size_t i = 0; i < proc->n_files; i++)
636     {
637       struct comb_file *file = &proc->files[i];
638       subcase_uninit (&file->by_vars);
639       subcase_uninit (&file->src);
640       subcase_uninit (&file->dst);
641       free (file->mv);
642       fh_unref (file->handle);
643       dict_unref (file->dict);
644       casereader_destroy (file->reader);
645       case_unref (file->data);
646       free (file->in_name);
647     }
648   free (proc->files);
649   proc->files = NULL;
650   proc->n_files = 0;
651 }
652
653 /* Frees all the data for the procedure. */
654 static void
655 free_comb_proc (struct comb_proc *proc)
656 {
657   close_all_comb_files (proc);
658   dict_unref (proc->dict);
659   casewriter_destroy (proc->output);
660   case_matcher_destroy (proc->matcher);
661   if (proc->prev_BY)
662     {
663       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
664                                 proc->prev_BY);
665       free (proc->prev_BY);
666     }
667   subcase_uninit (&proc->by_vars);
668   case_unref (proc->buffered_case);
669   free (proc->var_sources);
670 }
671 \f
672 static bool scan_table (struct comb_file *, union value by[]);
673 static struct ccase *create_output_case (const struct comb_proc *);
674 static void apply_case (const struct comb_file *, struct ccase *);
675 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
676 static void advance_file (struct comb_file *, union value by[]);
677 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
678 static void output_buffered_case (struct comb_proc *);
679
680 /* Executes the ADD FILES command. */
681 static void
682 execute_add_files (struct comb_proc *proc)
683 {
684   union value *by;
685
686   while (case_matcher_match (proc->matcher, &by))
687     for (size_t i = 0; i < proc->n_files; i++)
688       {
689         struct comb_file *file = &proc->files[i];
690         while (file->is_minimal)
691           {
692             struct ccase *output = create_output_case (proc);
693             apply_case (file, output);
694             advance_file (file, by);
695             output_case (proc, output, by);
696           }
697       }
698   output_buffered_case (proc);
699 }
700
701 /* Executes the MATCH FILES command. */
702 static void
703 execute_match_files (struct comb_proc *proc)
704 {
705   union value *by;
706
707   while (case_matcher_match (proc->matcher, &by))
708     {
709       struct ccase *output = create_output_case (proc);
710       for (size_t i = proc->n_files; i-- > 0;)
711         {
712           struct comb_file *file = &proc->files[i];
713           if (file->type == COMB_FILE)
714             {
715               if (file->is_minimal)
716                 {
717                   apply_case (file, output);
718                   advance_file (file, NULL);
719                 }
720             }
721           else
722             {
723               if (scan_table (file, by))
724                 apply_case (file, output);
725             }
726         }
727       output_case (proc, output, by);
728     }
729   output_buffered_case (proc);
730 }
731
732 /* Executes the UPDATE command. */
733 static void
734 execute_update (struct comb_proc *proc)
735 {
736   union value *by;
737   size_t n_duplicates = 0;
738
739   while (case_matcher_match (proc->matcher, &by))
740     {
741       struct comb_file *first, *file;
742       struct ccase *output;
743
744       /* Find first nonnull case in array and make an output case
745          from it. */
746       output = create_output_case (proc);
747       for (first = &proc->files[0]; ; first++)
748         if (first->is_minimal)
749           break;
750       apply_case (first, output);
751       advance_file (first, by);
752
753       /* Read additional cases and update the output case from
754          them.  (Don't update the output case from any duplicate
755          cases in the master file.) */
756       for (file = first + (first == proc->files);
757            file < &proc->files[proc->n_files]; file++)
758         {
759           while (file->is_minimal)
760             {
761               apply_nonmissing_case (file, output);
762               advance_file (file, by);
763             }
764         }
765       casewriter_write (proc->output, output);
766
767       /* Write duplicate cases in the master file directly to the
768          output.  */
769       if (first == proc->files && first->is_minimal)
770         {
771           n_duplicates++;
772           while (first->is_minimal)
773             {
774               output = create_output_case (proc);
775               apply_case (first, output);
776               advance_file (first, by);
777               casewriter_write (proc->output, output);
778             }
779         }
780     }
781
782   if (n_duplicates)
783     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
784          n_duplicates);
785 }
786
787 /* Reads FILE, which must be of type COMB_TABLE, until it
788    encounters a case with BY or greater for its BY variables.
789    Returns true if a case with exactly BY for its BY variables
790    was found, otherwise false. */
791 static bool
792 scan_table (struct comb_file *file, union value by[])
793 {
794   while (file->data != NULL)
795     {
796       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
797       if (cmp > 0)
798         {
799           case_unref (file->data);
800           file->data = casereader_read (file->reader);
801         }
802       else
803         return cmp == 0;
804     }
805   return false;
806 }
807
808 /* Creates and returns an output case for PROC, initializing each
809    of its values to system-missing or blanks, except that the
810    values of IN variables are set to 0. */
811 static struct ccase *
812 create_output_case (const struct comb_proc *proc)
813 {
814   size_t n_vars = dict_get_n_vars (proc->dict);
815   struct ccase *output = case_create (dict_get_proto (proc->dict));
816   for (size_t i = 0; i < n_vars; i++)
817     {
818       struct variable *v = dict_get_var (proc->dict, i);
819       value_set_missing (case_data_rw (output, v), var_get_width (v));
820     }
821   for (size_t i = 0; i < proc->n_files; i++)
822     {
823       struct comb_file *file = &proc->files[i];
824       if (file->in_var != NULL)
825         *case_num_rw (output, file->in_var) = false;
826     }
827   return output;
828 }
829
830 static void
831 mark_file_used (const struct comb_file *file, struct ccase *output)
832 {
833   if (file->in_var != NULL)
834     *case_num_rw (output, file->in_var) = true;
835 }
836
837 /* Copies the data from FILE's case into output case OUTPUT.
838    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
839 static void
840 apply_case (const struct comb_file *file, struct ccase *output)
841 {
842   subcase_copy (&file->src, file->data, &file->dst, output);
843   mark_file_used (file, output);
844 }
845
846 /* Copies the data from FILE's case into output case OUTPUT,
847    skipping values that are missing or all spaces.
848
849    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
850 static void
851 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
852 {
853   for (size_t i = 0; i < subcase_get_n_fields (&file->src); i++)
854     {
855       const struct subcase_field *src_field = &file->src.fields[i];
856       const struct subcase_field *dst_field = &file->dst.fields[i];
857       const union value *src_value
858         = case_data_idx (file->data, src_field->case_index);
859       int width = src_field->width;
860
861       if (!mv_is_value_missing (file->mv[i], src_value)
862           && !(width > 0 && value_is_spaces (src_value, width)))
863         value_copy (case_data_rw_idx (output, dst_field->case_index),
864                     src_value, width);
865     }
866   mark_file_used (file, output);
867 }
868
869 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
870    member is updated based on whether the new case's BY values still match
871    those in BY. */
872 static void
873 advance_file (struct comb_file *file, union value by[])
874 {
875   case_unref (file->data);
876   file->data = casereader_read (file->reader);
877   if (by)
878     file->is_minimal = (file->data != NULL
879                         && subcase_equal_cx (&file->by_vars, file->data, by));
880 }
881
882 /* Writes OUTPUT, whose BY values has been extracted into BY, to
883    PROC's output file, first initializing any FIRST or LAST
884    variables in OUTPUT to the correct values. */
885 static void
886 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
887 {
888   if (proc->first == NULL && proc->last == NULL)
889     casewriter_write (proc->output, output);
890   else
891     {
892       /* It's harder with LAST, because we can't know whether
893          this case is the last in a group until we've prepared
894          the *next* case also.  Thus, we buffer the previous
895          output case until the next one is ready. */
896       bool new_BY;
897       if (proc->prev_BY != NULL)
898         {
899           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
900           if (proc->last != NULL)
901             *case_num_rw (proc->buffered_case, proc->last) = new_BY;
902           casewriter_write (proc->output, proc->buffered_case);
903         }
904       else
905         new_BY = true;
906
907       proc->buffered_case = output;
908       if (proc->first != NULL)
909         *case_num_rw (proc->buffered_case, proc->first) = new_BY;
910
911       if (new_BY)
912         {
913           size_t n_values = subcase_get_n_fields (&proc->by_vars);
914           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
915           if (proc->prev_BY == NULL)
916             {
917               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
918               caseproto_init_values (proto, proc->prev_BY);
919             }
920           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
921                           proc->prev_BY, by);
922         }
923     }
924 }
925
926 /* Writes a trailing buffered case to the output, if FIRST or
927    LAST is in use. */
928 static void
929 output_buffered_case (struct comb_proc *proc)
930 {
931   if (proc->prev_BY != NULL)
932     {
933       if (proc->last != NULL)
934         *case_num_rw (proc->buffered_case, proc->last) = 1.0;
935       casewriter_write (proc->output, proc->buffered_case);
936       proc->buffered_case = NULL;
937     }
938 }