0f09e735d9eb345a901920636d6850cc8a7071b0
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/message.h"
39 #include "libpspp/string-array.h"
40 #include "libpspp/taint.h"
41 #include "math/sort.h"
42
43 #include "gl/xalloc.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 enum comb_command_type
49   {
50     COMB_ADD,
51     COMB_MATCH,
52     COMB_UPDATE
53   };
54
55 /* File types. */
56 enum comb_file_type
57   {
58     COMB_FILE,                  /* Specified on FILE= subcommand. */
59     COMB_TABLE                  /* Specified on TABLE= subcommand. */
60   };
61
62 /* One FILE or TABLE subcommand. */
63 struct comb_file
64   {
65     /* Basics. */
66     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
67
68     /* Variables. */
69     struct subcase by_vars;     /* BY variables in this input file. */
70     struct subcase src, dst;    /* Data to copy to output; where to put it. */
71
72     /* Input files. */
73     struct file_handle *handle; /* Input file handle. */
74     struct dictionary *dict;    /* Input file dictionary. */
75     struct casereader *reader;  /* Input data source. */
76     struct ccase *data;         /* The current input case. */
77     bool is_minimal;            /* Does 'data' have minimum BY values across
78                                    all input files? */
79     bool is_sorted;             /* Is file presorted on the BY variables? */
80
81     /* IN subcommand. */
82     char *in_name;
83     struct variable *in_var;
84   };
85
86 struct comb_proc
87   {
88     struct comb_file *files;    /* All the files being merged. */
89     size_t n_files;             /* Number of files. */
90
91     struct dictionary *dict;    /* Dictionary of output file. */
92     struct subcase by_vars;     /* BY variables in the output. */
93     struct casewriter *output;  /* Destination for output. */
94
95     struct case_matcher *matcher;
96
97     /* FIRST, LAST.
98        Only if "first" or "last" is nonnull are the remaining
99        members used. */
100     struct variable *first;     /* Variable specified on FIRST (if any). */
101     struct variable *last;      /* Variable specified on LAST (if any). */
102     struct ccase *buffered_case; /* Case ready for output except that we don't
103                                     know the value for the LAST var yet. */
104     union value *prev_BY;       /* Values of BY vars in buffered_case. */
105   };
106
107 static int combine_files (enum comb_command_type, struct lexer *,
108                           struct dataset *);
109 static void free_comb_proc (struct comb_proc *);
110
111 static void close_all_comb_files (struct comb_proc *);
112 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
113
114 static void execute_update (struct comb_proc *);
115 static void execute_match_files (struct comb_proc *);
116 static void execute_add_files (struct comb_proc *);
117
118 static bool create_flag_var (const char *subcommand_name, const char *var_name,
119                              struct dictionary *, struct variable **);
120 static void output_case (struct comb_proc *, struct ccase *, union value *by);
121 static void output_buffered_case (struct comb_proc *);
122
123 int
124 cmd_add_files (struct lexer *lexer, struct dataset *ds)
125 {
126   return combine_files (COMB_ADD, lexer, ds);
127 }
128
129 int
130 cmd_match_files (struct lexer *lexer, struct dataset *ds)
131 {
132   return combine_files (COMB_MATCH, lexer, ds);
133 }
134
135 int
136 cmd_update (struct lexer *lexer, struct dataset *ds)
137 {
138   return combine_files (COMB_UPDATE, lexer, ds);
139 }
140
141 static int
142 combine_files (enum comb_command_type command,
143                struct lexer *lexer, struct dataset *ds)
144 {
145   struct comb_proc proc;
146
147   bool saw_by = false;
148   bool saw_sort = false;
149   struct casereader *active_file = NULL;
150
151   char *first_name = NULL;
152   char *last_name = NULL;
153
154   struct taint *taint = NULL;
155
156   size_t n_tables = 0;
157   size_t allocated_files = 0;
158
159   size_t i;
160
161   proc.files = NULL;
162   proc.n_files = 0;
163   proc.dict = dict_create ();
164   proc.output = NULL;
165   proc.matcher = NULL;
166   subcase_init_empty (&proc.by_vars);
167   proc.first = NULL;
168   proc.last = NULL;
169   proc.buffered_case = NULL;
170   proc.prev_BY = NULL;
171
172   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
173
174   lex_match (lexer, T_SLASH);
175   for (;;)
176     {
177       struct comb_file *file;
178       enum comb_file_type type;
179
180       if (lex_match_id (lexer, "FILE"))
181         type = COMB_FILE;
182       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
183         {
184           type = COMB_TABLE;
185           n_tables++;
186         }
187       else
188         break;
189       lex_match (lexer, T_EQUALS);
190
191       if (proc.n_files >= allocated_files)
192         proc.files = x2nrealloc (proc.files, &allocated_files,
193                                 sizeof *proc.files);
194       file = &proc.files[proc.n_files++];
195       file->type = type;
196       subcase_init_empty (&file->by_vars);
197       subcase_init_empty (&file->src);
198       subcase_init_empty (&file->dst);
199       file->handle = NULL;
200       file->dict = NULL;
201       file->reader = NULL;
202       file->data = NULL;
203       file->is_sorted = true;
204       file->in_name = NULL;
205       file->in_var = NULL;
206
207       if (lex_match (lexer, T_ASTERISK))
208         {
209           if (!dataset_has_source (ds))
210             {
211               msg (SE, _("Cannot specify the active dataset since none "
212                          "has been defined."));
213               goto error;
214             }
215
216           if (proc_make_temporary_transformations_permanent (ds))
217             msg (SE, _("This command may not be used after TEMPORARY when "
218                        "the active dataset is an input source.  "
219                        "Temporary transformations will be made permanent."));
220
221           file->dict = dict_clone (dataset_dict (ds));
222         }
223       else
224         {
225           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
226           if (file->handle == NULL)
227             goto error;
228
229           file->reader = any_reader_open (file->handle, &file->dict);
230           if (file->reader == NULL)
231             goto error;
232         }
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (lex_token (lexer) != T_ID)
244               {
245                 lex_error (lexer, NULL);
246                 goto error;
247               }
248
249             if (file->in_name)
250               {
251                 msg (SE, _("Multiple IN subcommands for a single FILE or "
252                            "TABLE."));
253                 goto error;
254               }
255             file->in_name = xstrdup (lex_tokcstr (lexer));
256             lex_get (lexer);
257           }
258         else if (lex_match_id (lexer, "SORT"))
259           {
260             file->is_sorted = false;
261             saw_sort = true;
262           }
263
264       merge_dictionary (proc.dict, file);
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           const struct variable **by_vars;
272           size_t i;
273           bool ok;
274
275           if (saw_by)
276             {
277               lex_sbc_only_once ("BY");
278               goto error;
279             }
280           saw_by = true;
281
282           lex_match (lexer, T_EQUALS);
283           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
284                                     &by_vars, NULL))
285             goto error;
286
287           ok = true;
288           for (i = 0; i < proc.n_files; i++)
289             {
290               struct comb_file *file = &proc.files[i];
291               size_t j;
292
293               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
294                 {
295                   const char *name = var_get_name (by_vars[j]);
296                   struct variable *var = dict_lookup_var (file->dict, name);
297                   if (var != NULL)
298                     subcase_add_var (&file->by_vars, var,
299                                      subcase_get_direction (&proc.by_vars, j));
300                   else
301                     {
302                       if (file->handle != NULL)
303                         msg (SE, _("File %s lacks BY variable %s."),
304                              fh_get_name (file->handle), name);
305                       else
306                         msg (SE, _("Active dataset lacks BY variable %s."),
307                              name);
308                       ok = false;
309                     }
310                 }
311               assert (!ok || subcase_conformable (&file->by_vars,
312                                                   &proc.files[0].by_vars));
313             }
314           free (by_vars);
315
316           if (!ok)
317             goto error;
318         }
319       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
320         {
321           if (first_name != NULL)
322             {
323               lex_sbc_only_once ("FIRST");
324               goto error;
325             }
326
327           lex_match (lexer, T_EQUALS);
328           if (!lex_force_id (lexer))
329             goto error;
330           first_name = xstrdup (lex_tokcstr (lexer));
331           lex_get (lexer);
332         }
333       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
334         {
335           if (last_name != NULL)
336             {
337               lex_sbc_only_once ("LAST");
338               goto error;
339             }
340
341           lex_match (lexer, T_EQUALS);
342           if (!lex_force_id (lexer))
343             goto error;
344           last_name = xstrdup (lex_tokcstr (lexer));
345           lex_get (lexer);
346         }
347       else if (lex_match_id (lexer, "MAP"))
348         {
349           /* FIXME. */
350         }
351       else if (lex_match_id (lexer, "DROP"))
352         {
353           if (!parse_dict_drop (lexer, proc.dict))
354             goto error;
355         }
356       else if (lex_match_id (lexer, "KEEP"))
357         {
358           if (!parse_dict_keep (lexer, proc.dict))
359             goto error;
360         }
361       else
362         {
363           lex_error (lexer, NULL);
364           goto error;
365         }
366
367       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
368         {
369           lex_end_of_command (lexer);
370           goto error;
371         }
372     }
373
374   if (!saw_by)
375     {
376       if (command == COMB_UPDATE)
377         {
378           msg (SE, _("The BY subcommand is required."));
379           goto error;
380         }
381       if (n_tables)
382         {
383           msg (SE, _("BY is required when %s is specified."), "TABLE");
384           goto error;
385         }
386       if (saw_sort)
387         {
388           msg (SE, _("BY is required when %s is specified."), "SORT");
389           goto error;
390         }
391     }
392
393   /* Add IN, FIRST, and LAST variables to master dictionary. */
394   for (i = 0; i < proc.n_files; i++)
395     {
396       struct comb_file *file = &proc.files[i];
397       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
398         goto error;
399     }
400   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
401       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
402     goto error;
403
404   dict_delete_scratch_vars (proc.dict);
405   dict_compact_values (proc.dict);
406
407   /* Set up mapping from each file's variables to master
408      variables. */
409   for (i = 0; i < proc.n_files; i++)
410     {
411       struct comb_file *file = &proc.files[i];
412       size_t src_var_cnt = dict_get_var_cnt (file->dict);
413       size_t j;
414
415       for (j = 0; j < src_var_cnt; j++)
416         {
417           struct variable *src_var = dict_get_var (file->dict, j);
418           struct variable *dst_var = dict_lookup_var (proc.dict,
419                                                       var_get_name (src_var));
420           if (dst_var != NULL)
421             {
422               subcase_add_var (&file->src, src_var, SC_ASCEND);
423               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
424             }
425         }
426     }
427
428   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
429   taint = taint_clone (casewriter_get_taint (proc.output));
430
431   /* Set up case matcher. */
432   proc.matcher = case_matcher_create ();
433   for (i = 0; i < proc.n_files; i++)
434     {
435       struct comb_file *file = &proc.files[i];
436       if (file->reader == NULL)
437         {
438           if (active_file == NULL)
439             {
440               proc_discard_output (ds);
441               file->reader = active_file = proc_open (ds);
442             }
443           else
444             file->reader = casereader_clone (active_file);
445         }
446       if (!file->is_sorted)
447         file->reader = sort_execute (file->reader, &file->by_vars);
448       taint_propagate (casereader_get_taint (file->reader), taint);
449       file->data = casereader_read (file->reader);
450       if (file->type == COMB_FILE)
451         case_matcher_add_input (proc.matcher, &file->by_vars,
452                                 &file->data, &file->is_minimal);
453     }
454
455   if (command == COMB_ADD)
456     execute_add_files (&proc);
457   else if (command == COMB_MATCH)
458     execute_match_files (&proc);
459   else if (command == COMB_UPDATE)
460     execute_update (&proc);
461   else
462     NOT_REACHED ();
463
464   case_matcher_destroy (proc.matcher);
465   proc.matcher = NULL;
466   close_all_comb_files (&proc);
467   if (active_file != NULL)
468     proc_commit (ds);
469
470   dataset_set_dict (ds, proc.dict);
471   dataset_set_source (ds, casewriter_make_reader (proc.output));
472   proc.dict = NULL;
473   proc.output = NULL;
474
475   free_comb_proc (&proc);
476
477   free (first_name);
478   free (last_name);
479
480   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
481
482  error:
483   if (active_file != NULL)
484     proc_commit (ds);
485   free_comb_proc (&proc);
486   taint_destroy (taint);
487   free (first_name);
488   free (last_name);
489   return CMD_CASCADING_FAILURE;
490 }
491
492 /* Merge the dictionary for file F into master dictionary M. */
493 static bool
494 merge_dictionary (struct dictionary *const m, struct comb_file *f)
495 {
496   struct dictionary *d = f->dict;
497   const struct string_array *d_docs, *m_docs;
498   int i;
499   const char *file_encoding;
500
501   if (dict_get_label (m) == NULL)
502     dict_set_label (m, dict_get_label (d));
503
504   d_docs = dict_get_documents (d);
505   m_docs = dict_get_documents (m);
506
507
508   /* FIXME: If the input files have different encodings, then
509      the result is undefined.
510      The correct thing to do would be to convert to an encoding
511      which can cope with all the input files (eg UTF-8).
512    */
513   file_encoding = dict_get_encoding (f->dict);
514   if ( file_encoding != NULL)
515     {
516       if ( dict_get_encoding (m) == NULL)
517         dict_set_encoding (m, file_encoding);
518       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
519         {
520           msg (MW,
521                _("Combining files with incompatible encodings. String data may not be represented correctly."));
522         }
523     }
524
525   if (d_docs != NULL)
526     {
527       if (m_docs == NULL)
528         dict_set_documents (m, d_docs);
529       else
530         {
531           struct string_array new_docs;
532           size_t i;
533
534           new_docs.n = m_docs->n + d_docs->n;
535           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
536           for (i = 0; i < m_docs->n; i++)
537             new_docs.strings[i] = m_docs->strings[i];
538           for (i = 0; i < d_docs->n; i++)
539             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
540
541           dict_set_documents (m, &new_docs);
542
543           free (new_docs.strings);
544         }
545     }
546
547   for (i = 0; i < dict_get_var_cnt (d); i++)
548     {
549       struct variable *dv = dict_get_var (d, i);
550       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
551
552       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
553         continue;
554
555       if (mv != NULL)
556         {
557           if (var_get_width (mv) != var_get_width (dv))
558             {
559               const char *var_name = var_get_name (dv);
560               const char *file_name = fh_get_name (f->handle);
561               struct string s = DS_EMPTY_INITIALIZER;
562               ds_put_format (&s,
563                              _("Variable %s in file %s has different "
564                                "type or width from the same variable in "
565                                "earlier file."),
566                              var_name, file_name);
567               ds_put_cstr (&s, "  ");
568               if (var_is_numeric (dv))
569                 ds_put_format (&s, _("In file %s, %s is numeric."),
570                                file_name, var_name);
571               else
572                 ds_put_format (&s, _("In file %s, %s is a string variable "
573                                      "with width %d."),
574                                file_name, var_name, var_get_width (dv));
575               ds_put_cstr (&s, "  ");
576               if (var_is_numeric (mv))
577                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
578                                var_name);
579               else
580                 ds_put_format (&s, _("In an earlier file, %s was a string "
581                                      "variable with width %d."),
582                                var_name, var_get_width (mv));
583               msg (SE, "%s", ds_cstr (&s));
584               ds_destroy (&s);
585               return false;
586             }
587
588           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
589             var_set_value_labels (mv, var_get_value_labels (dv));
590           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
591             var_set_missing_values (mv, var_get_missing_values (dv));
592           if (var_get_label (dv) && !var_get_label (mv))
593             var_set_label (mv, var_get_label (dv), file_encoding, false);
594         }
595       else
596         mv = dict_clone_var_assert (m, dv);
597     }
598
599   return true;
600 }
601
602 /* If VAR_NAME is non-NULL, attempts to create a
603    variable named VAR_NAME, with format F1.0, in DICT, and stores
604    a pointer to the variable in *VAR.  Returns true if
605    successful, false if the variable name is a duplicate (in
606    which case a message saying that the variable specified on the
607    given SUBCOMMAND is a duplicate is emitted).
608
609    Does nothing and returns true if VAR_NAME is null. */
610 static bool
611 create_flag_var (const char *subcommand, const char *var_name,
612                  struct dictionary *dict, struct variable **var)
613 {
614   if (var_name != NULL)
615     {
616       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
617       *var = dict_create_var (dict, var_name, 0);
618       if (*var == NULL)
619         {
620           msg (SE, _("Variable name %s specified on %s subcommand "
621                      "duplicates an existing variable name."),
622                subcommand, var_name);
623           return false;
624         }
625       var_set_both_formats (*var, &format);
626     }
627   else
628     *var = NULL;
629   return true;
630 }
631
632 /* Closes all the files in PROC and frees their associated data. */
633 static void
634 close_all_comb_files (struct comb_proc *proc)
635 {
636   size_t i;
637
638   for (i = 0; i < proc->n_files; i++)
639     {
640       struct comb_file *file = &proc->files[i];
641       subcase_destroy (&file->by_vars);
642       subcase_destroy (&file->src);
643       subcase_destroy (&file->dst);
644       fh_unref (file->handle);
645       dict_destroy (file->dict);
646       casereader_destroy (file->reader);
647       case_unref (file->data);
648       free (file->in_name);
649     }
650   free (proc->files);
651   proc->files = NULL;
652   proc->n_files = 0;
653 }
654
655 /* Frees all the data for the procedure. */
656 static void
657 free_comb_proc (struct comb_proc *proc)
658 {
659   close_all_comb_files (proc);
660   dict_destroy (proc->dict);
661   casewriter_destroy (proc->output);
662   case_matcher_destroy (proc->matcher);
663   if (proc->prev_BY)
664     {
665       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
666                                 proc->prev_BY);
667       free (proc->prev_BY);
668     }
669   subcase_destroy (&proc->by_vars);
670   case_unref (proc->buffered_case);
671 }
672 \f
673 static bool scan_table (struct comb_file *, union value by[]);
674 static struct ccase *create_output_case (const struct comb_proc *);
675 static void apply_case (const struct comb_file *, struct ccase *);
676 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
677                                          union value by[]);
678 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
679 static void output_buffered_case (struct comb_proc *);
680
681 /* Executes the ADD FILES command. */
682 static void
683 execute_add_files (struct comb_proc *proc)
684 {
685   union value *by;
686
687   while (case_matcher_match (proc->matcher, &by))
688     {
689       size_t i;
690
691       for (i = 0; i < proc->n_files; i++)
692         {
693           struct comb_file *file = &proc->files[i];
694           while (file->is_minimal)
695             {
696               struct ccase *output = create_output_case (proc);
697               apply_file_case_and_advance (file, output, by);
698               output_case (proc, output, by);
699             }
700         }
701     }
702   output_buffered_case (proc);
703 }
704
705 /* Executes the MATCH FILES command. */
706 static void
707 execute_match_files (struct comb_proc *proc)
708 {
709   union value *by;
710
711   while (case_matcher_match (proc->matcher, &by))
712     {
713       struct ccase *output;
714       size_t i;
715
716       output = create_output_case (proc);
717       for (i = proc->n_files; i-- > 0; )
718         {
719           struct comb_file *file = &proc->files[i];
720           if (file->type == COMB_FILE)
721             {
722               if (file->is_minimal)
723                 apply_file_case_and_advance (file, output, NULL);
724             }
725           else
726             {
727               if (scan_table (file, by))
728                 apply_case (file, output);
729             }
730         }
731       output_case (proc, output, by);
732     }
733   output_buffered_case (proc);
734 }
735
736 /* Executes the UPDATE command. */
737 static void
738 execute_update (struct comb_proc *proc)
739 {
740   union value *by;
741   size_t n_duplicates = 0;
742
743   while (case_matcher_match (proc->matcher, &by))
744     {
745       struct comb_file *first, *file;
746       struct ccase *output;
747
748       /* Find first nonnull case in array and make an output case
749          from it. */
750       output = create_output_case (proc);
751       for (first = &proc->files[0]; ; first++)
752         if (first->is_minimal)
753           break;
754       apply_file_case_and_advance (first, output, by);
755
756       /* Read additional cases and update the output case from
757          them.  (Don't update the output case from any duplicate
758          cases in the master file.) */
759       for (file = first + (first == proc->files);
760            file < &proc->files[proc->n_files]; file++)
761         {
762           while (file->is_minimal)
763             apply_file_case_and_advance (file, output, by);
764         }
765       casewriter_write (proc->output, output);
766
767       /* Write duplicate cases in the master file directly to the
768          output.  */
769       if (first == proc->files && first->is_minimal)
770         {
771           n_duplicates++;
772           while (first->is_minimal)
773             {
774               output = create_output_case (proc);
775               apply_file_case_and_advance (first, output, by);
776               casewriter_write (proc->output, output);
777             }
778         }
779     }
780
781   if (n_duplicates)
782     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
783          n_duplicates);
784 }
785
786 /* Reads FILE, which must be of type COMB_TABLE, until it
787    encounters a case with BY or greater for its BY variables.
788    Returns true if a case with exactly BY for its BY variables
789    was found, otherwise false. */
790 static bool
791 scan_table (struct comb_file *file, union value by[])
792 {
793   while (file->data != NULL)
794     {
795       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
796       if (cmp > 0)
797         {
798           case_unref (file->data);
799           file->data = casereader_read (file->reader);
800         }
801       else
802         return cmp == 0;
803     }
804   return false;
805 }
806
807 /* Creates and returns an output case for PROC, initializing each
808    of its values to system-missing or blanks, except that the
809    values of IN variables are set to 0. */
810 static struct ccase *
811 create_output_case (const struct comb_proc *proc)
812 {
813   size_t n_vars = dict_get_var_cnt (proc->dict);
814   struct ccase *output;
815   size_t i;
816
817   output = case_create (dict_get_proto (proc->dict));
818   for (i = 0; i < n_vars; i++)
819     {
820       struct variable *v = dict_get_var (proc->dict, i);
821       value_set_missing (case_data_rw (output, v), var_get_width (v));
822     }
823   for (i = 0; i < proc->n_files; i++)
824     {
825       struct comb_file *file = &proc->files[i];
826       if (file->in_var != NULL)
827         case_data_rw (output, file->in_var)->f = false;
828     }
829   return output;
830 }
831
832 /* Copies the data from FILE's case into output case OUTPUT.
833    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
834 static void
835 apply_case (const struct comb_file *file, struct ccase *output)
836 {
837   subcase_copy (&file->src, file->data, &file->dst, output);
838   if (file->in_var != NULL)
839     case_data_rw (output, file->in_var)->f = true;
840 }
841
842 /* Like apply_case() above, but also advances FILE to its next
843    case.  Also, if BY is nonnull, then FILE's is_minimal member
844    is updated based on whether the new case's BY values still
845    match those in BY. */
846 static void
847 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
848                              union value by[])
849 {
850   apply_case (file, output);
851   case_unref (file->data);
852   file->data = casereader_read (file->reader);
853   if (by)
854     file->is_minimal = (file->data != NULL
855                         && subcase_equal_cx (&file->by_vars, file->data, by));
856 }
857
858 /* Writes OUTPUT, whose BY values has been extracted into BY, to
859    PROC's output file, first initializing any FIRST or LAST
860    variables in OUTPUT to the correct values. */
861 static void
862 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
863 {
864   if (proc->first == NULL && proc->last == NULL)
865     casewriter_write (proc->output, output);
866   else
867     {
868       /* It's harder with LAST, because we can't know whether
869          this case is the last in a group until we've prepared
870          the *next* case also.  Thus, we buffer the previous
871          output case until the next one is ready. */
872       bool new_BY;
873       if (proc->prev_BY != NULL)
874         {
875           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
876           if (proc->last != NULL)
877             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
878           casewriter_write (proc->output, proc->buffered_case);
879         }
880       else
881         new_BY = true;
882
883       proc->buffered_case = output;
884       if (proc->first != NULL)
885         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
886
887       if (new_BY)
888         {
889           size_t n_values = subcase_get_n_fields (&proc->by_vars);
890           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
891           if (proc->prev_BY == NULL)
892             {
893               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
894               caseproto_init_values (proto, proc->prev_BY);
895             }
896           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
897                           proc->prev_BY, by);
898         }
899     }
900 }
901
902 /* Writes a trailing buffered case to the output, if FIRST or
903    LAST is in use. */
904 static void
905 output_buffered_case (struct comb_proc *proc)
906 {
907   if (proc->prev_BY != NULL)
908     {
909       if (proc->last != NULL)
910         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
911       casewriter_write (proc->output, proc->buffered_case);
912       proc->buffered_case = NULL;
913     }
914 }