dataset: Rename functions with "dataset_" prefix.
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/message.h"
39 #include "libpspp/string-array.h"
40 #include "libpspp/taint.h"
41 #include "math/sort.h"
42
43 #include "gl/xalloc.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 enum comb_command_type
49   {
50     COMB_ADD,
51     COMB_MATCH,
52     COMB_UPDATE
53   };
54
55 /* File types. */
56 enum comb_file_type
57   {
58     COMB_FILE,                  /* Specified on FILE= subcommand. */
59     COMB_TABLE                  /* Specified on TABLE= subcommand. */
60   };
61
62 /* One FILE or TABLE subcommand. */
63 struct comb_file
64   {
65     /* Basics. */
66     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
67
68     /* Variables. */
69     struct subcase by_vars;     /* BY variables in this input file. */
70     struct subcase src, dst;    /* Data to copy to output; where to put it. */
71
72     /* Input files. */
73     struct file_handle *handle; /* Input file handle. */
74     struct dictionary *dict;    /* Input file dictionary. */
75     struct casereader *reader;  /* Input data source. */
76     struct ccase *data;         /* The current input case. */
77     bool is_minimal;            /* Does 'data' have minimum BY values across
78                                    all input files? */
79     bool is_sorted;             /* Is file presorted on the BY variables? */
80
81     /* IN subcommand. */
82     char *in_name;
83     struct variable *in_var;
84   };
85
86 struct comb_proc
87   {
88     struct comb_file *files;    /* All the files being merged. */
89     size_t n_files;             /* Number of files. */
90
91     struct dictionary *dict;    /* Dictionary of output file. */
92     struct subcase by_vars;     /* BY variables in the output. */
93     struct casewriter *output;  /* Destination for output. */
94
95     struct case_matcher *matcher;
96
97     /* FIRST, LAST.
98        Only if "first" or "last" is nonnull are the remaining
99        members used. */
100     struct variable *first;     /* Variable specified on FIRST (if any). */
101     struct variable *last;      /* Variable specified on LAST (if any). */
102     struct ccase *buffered_case; /* Case ready for output except that we don't
103                                     know the value for the LAST var yet. */
104     union value *prev_BY;       /* Values of BY vars in buffered_case. */
105   };
106
107 static int combine_files (enum comb_command_type, struct lexer *,
108                           struct dataset *);
109 static void free_comb_proc (struct comb_proc *);
110
111 static void close_all_comb_files (struct comb_proc *);
112 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
113
114 static void execute_update (struct comb_proc *);
115 static void execute_match_files (struct comb_proc *);
116 static void execute_add_files (struct comb_proc *);
117
118 static bool create_flag_var (const char *subcommand_name, const char *var_name,
119                              struct dictionary *, struct variable **);
120 static void output_case (struct comb_proc *, struct ccase *, union value *by);
121 static void output_buffered_case (struct comb_proc *);
122
123 int
124 cmd_add_files (struct lexer *lexer, struct dataset *ds)
125 {
126   return combine_files (COMB_ADD, lexer, ds);
127 }
128
129 int
130 cmd_match_files (struct lexer *lexer, struct dataset *ds)
131 {
132   return combine_files (COMB_MATCH, lexer, ds);
133 }
134
135 int
136 cmd_update (struct lexer *lexer, struct dataset *ds)
137 {
138   return combine_files (COMB_UPDATE, lexer, ds);
139 }
140
141 static int
142 combine_files (enum comb_command_type command,
143                struct lexer *lexer, struct dataset *ds)
144 {
145   struct comb_proc proc;
146
147   bool saw_by = false;
148   bool saw_sort = false;
149   struct casereader *active_file = NULL;
150
151   char *first_name = NULL;
152   char *last_name = NULL;
153
154   struct taint *taint = NULL;
155
156   size_t n_tables = 0;
157   size_t allocated_files = 0;
158
159   size_t i;
160
161   proc.files = NULL;
162   proc.n_files = 0;
163   proc.dict = dict_create ();
164   proc.output = NULL;
165   proc.matcher = NULL;
166   subcase_init_empty (&proc.by_vars);
167   proc.first = NULL;
168   proc.last = NULL;
169   proc.buffered_case = NULL;
170   proc.prev_BY = NULL;
171
172   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
173
174   lex_match (lexer, T_SLASH);
175   for (;;)
176     {
177       struct comb_file *file;
178       enum comb_file_type type;
179
180       if (lex_match_id (lexer, "FILE"))
181         type = COMB_FILE;
182       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
183         {
184           type = COMB_TABLE;
185           n_tables++;
186         }
187       else
188         break;
189       lex_match (lexer, T_EQUALS);
190
191       if (proc.n_files >= allocated_files)
192         proc.files = x2nrealloc (proc.files, &allocated_files,
193                                 sizeof *proc.files);
194       file = &proc.files[proc.n_files++];
195       file->type = type;
196       subcase_init_empty (&file->by_vars);
197       subcase_init_empty (&file->src);
198       subcase_init_empty (&file->dst);
199       file->handle = NULL;
200       file->dict = NULL;
201       file->reader = NULL;
202       file->data = NULL;
203       file->is_sorted = true;
204       file->in_name = NULL;
205       file->in_var = NULL;
206
207       if (lex_match (lexer, T_ASTERISK))
208         {
209           if (!dataset_has_source (ds))
210             {
211               msg (SE, _("Cannot specify the active file since no active "
212                          "file has been defined."));
213               goto error;
214             }
215
216           if (proc_make_temporary_transformations_permanent (ds))
217             msg (SE, _("This command may not be used after TEMPORARY when "
218                        "the active file is an input source.  "
219                        "Temporary transformations will be made permanent."));
220
221           file->dict = dict_clone (dataset_dict (ds));
222         }
223       else
224         {
225           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
226           if (file->handle == NULL)
227             goto error;
228
229           file->reader = any_reader_open (file->handle, &file->dict);
230           if (file->reader == NULL)
231             goto error;
232         }
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (lex_token (lexer) != T_ID)
244               {
245                 lex_error (lexer, NULL);
246                 goto error;
247               }
248
249             if (file->in_name)
250               {
251                 msg (SE, _("Multiple IN subcommands for a single FILE or "
252                            "TABLE."));
253                 goto error;
254               }
255             file->in_name = xstrdup (lex_tokcstr (lexer));
256             lex_get (lexer);
257           }
258         else if (lex_match_id (lexer, "SORT"))
259           {
260             file->is_sorted = false;
261             saw_sort = true;
262           }
263
264       merge_dictionary (proc.dict, file);
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           const struct variable **by_vars;
272           size_t i;
273           bool ok;
274
275           if (saw_by)
276             {
277               lex_sbc_only_once ("BY");
278               goto error;
279             }
280           saw_by = true;
281
282           lex_match (lexer, T_EQUALS);
283           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
284                                     &by_vars, NULL))
285             goto error;
286
287           ok = true;
288           for (i = 0; i < proc.n_files; i++)
289             {
290               struct comb_file *file = &proc.files[i];
291               size_t j;
292
293               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
294                 {
295                   const char *name = var_get_name (by_vars[j]);
296                   struct variable *var = dict_lookup_var (file->dict, name);
297                   if (var != NULL)
298                     subcase_add_var (&file->by_vars, var,
299                                      subcase_get_direction (&proc.by_vars, j));
300                   else
301                     {
302                       if (file->handle != NULL)
303                         msg (SE, _("File %s lacks BY variable %s."),
304                              fh_get_name (file->handle), name);
305                       else
306                         msg (SE, _("Active file lacks BY variable %s."), name);
307                       ok = false;
308                     }
309                 }
310               assert (!ok || subcase_conformable (&file->by_vars,
311                                                   &proc.files[0].by_vars));
312             }
313           free (by_vars);
314
315           if (!ok)
316             goto error;
317         }
318       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
319         {
320           if (first_name != NULL)
321             {
322               lex_sbc_only_once ("FIRST");
323               goto error;
324             }
325
326           lex_match (lexer, T_EQUALS);
327           if (!lex_force_id (lexer))
328             goto error;
329           first_name = xstrdup (lex_tokcstr (lexer));
330           lex_get (lexer);
331         }
332       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
333         {
334           if (last_name != NULL)
335             {
336               lex_sbc_only_once ("LAST");
337               goto error;
338             }
339
340           lex_match (lexer, T_EQUALS);
341           if (!lex_force_id (lexer))
342             goto error;
343           last_name = xstrdup (lex_tokcstr (lexer));
344           lex_get (lexer);
345         }
346       else if (lex_match_id (lexer, "MAP"))
347         {
348           /* FIXME. */
349         }
350       else if (lex_match_id (lexer, "DROP"))
351         {
352           if (!parse_dict_drop (lexer, proc.dict))
353             goto error;
354         }
355       else if (lex_match_id (lexer, "KEEP"))
356         {
357           if (!parse_dict_keep (lexer, proc.dict))
358             goto error;
359         }
360       else
361         {
362           lex_error (lexer, NULL);
363           goto error;
364         }
365
366       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
367         {
368           lex_end_of_command (lexer);
369           goto error;
370         }
371     }
372
373   if (!saw_by)
374     {
375       if (command == COMB_UPDATE)
376         {
377           msg (SE, _("The BY subcommand is required."));
378           goto error;
379         }
380       if (n_tables)
381         {
382           msg (SE, _("BY is required when %s is specified."), "TABLE");
383           goto error;
384         }
385       if (saw_sort)
386         {
387           msg (SE, _("BY is required when %s is specified."), "SORT");
388           goto error;
389         }
390     }
391
392   /* Add IN, FIRST, and LAST variables to master dictionary. */
393   for (i = 0; i < proc.n_files; i++)
394     {
395       struct comb_file *file = &proc.files[i];
396       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
397         goto error;
398     }
399   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
400       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
401     goto error;
402
403   dict_delete_scratch_vars (proc.dict);
404   dict_compact_values (proc.dict);
405
406   /* Set up mapping from each file's variables to master
407      variables. */
408   for (i = 0; i < proc.n_files; i++)
409     {
410       struct comb_file *file = &proc.files[i];
411       size_t src_var_cnt = dict_get_var_cnt (file->dict);
412       size_t j;
413
414       for (j = 0; j < src_var_cnt; j++)
415         {
416           struct variable *src_var = dict_get_var (file->dict, j);
417           struct variable *dst_var = dict_lookup_var (proc.dict,
418                                                       var_get_name (src_var));
419           if (dst_var != NULL)
420             {
421               subcase_add_var (&file->src, src_var, SC_ASCEND);
422               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
423             }
424         }
425     }
426
427   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
428   taint = taint_clone (casewriter_get_taint (proc.output));
429
430   /* Set up case matcher. */
431   proc.matcher = case_matcher_create ();
432   for (i = 0; i < proc.n_files; i++)
433     {
434       struct comb_file *file = &proc.files[i];
435       if (file->reader == NULL)
436         {
437           if (active_file == NULL)
438             {
439               proc_discard_output (ds);
440               file->reader = active_file = proc_open (ds);
441             }
442           else
443             file->reader = casereader_clone (active_file);
444         }
445       if (!file->is_sorted)
446         file->reader = sort_execute (file->reader, &file->by_vars);
447       taint_propagate (casereader_get_taint (file->reader), taint);
448       file->data = casereader_read (file->reader);
449       if (file->type == COMB_FILE)
450         case_matcher_add_input (proc.matcher, &file->by_vars,
451                                 &file->data, &file->is_minimal);
452     }
453
454   if (command == COMB_ADD)
455     execute_add_files (&proc);
456   else if (command == COMB_MATCH)
457     execute_match_files (&proc);
458   else if (command == COMB_UPDATE)
459     execute_update (&proc);
460   else
461     NOT_REACHED ();
462
463   case_matcher_destroy (proc.matcher);
464   proc.matcher = NULL;
465   close_all_comb_files (&proc);
466   if (active_file != NULL)
467     proc_commit (ds);
468
469   dataset_set_dict (ds, proc.dict);
470   dataset_set_source (ds, casewriter_make_reader (proc.output));
471   proc.dict = NULL;
472   proc.output = NULL;
473
474   free_comb_proc (&proc);
475
476   free (first_name);
477   free (last_name);
478
479   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
480
481  error:
482   if (active_file != NULL)
483     proc_commit (ds);
484   free_comb_proc (&proc);
485   taint_destroy (taint);
486   free (first_name);
487   free (last_name);
488   return CMD_CASCADING_FAILURE;
489 }
490
491 /* Merge the dictionary for file F into master dictionary M. */
492 static bool
493 merge_dictionary (struct dictionary *const m, struct comb_file *f)
494 {
495   struct dictionary *d = f->dict;
496   const struct string_array *d_docs, *m_docs;
497   int i;
498   const char *file_encoding;
499
500   if (dict_get_label (m) == NULL)
501     dict_set_label (m, dict_get_label (d));
502
503   d_docs = dict_get_documents (d);
504   m_docs = dict_get_documents (m);
505
506
507   /* FIXME: If the input files have different encodings, then
508      the result is undefined.
509      The correct thing to do would be to convert to an encoding
510      which can cope with all the input files (eg UTF-8).
511    */
512   file_encoding = dict_get_encoding (f->dict);
513   if ( file_encoding != NULL)
514     {
515       if ( dict_get_encoding (m) == NULL)
516         dict_set_encoding (m, file_encoding);
517       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
518         {
519           msg (MW,
520                _("Combining files with incompatible encodings. String data may not be represented correctly."));
521         }
522     }
523
524   if (d_docs != NULL)
525     {
526       if (m_docs == NULL)
527         dict_set_documents (m, d_docs);
528       else
529         {
530           struct string_array new_docs;
531           size_t i;
532
533           new_docs.n = m_docs->n + d_docs->n;
534           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
535           for (i = 0; i < m_docs->n; i++)
536             new_docs.strings[i] = m_docs->strings[i];
537           for (i = 0; i < d_docs->n; i++)
538             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
539
540           dict_set_documents (m, &new_docs);
541
542           free (new_docs.strings);
543         }
544     }
545
546   for (i = 0; i < dict_get_var_cnt (d); i++)
547     {
548       struct variable *dv = dict_get_var (d, i);
549       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
550
551       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
552         continue;
553
554       if (mv != NULL)
555         {
556           if (var_get_width (mv) != var_get_width (dv))
557             {
558               const char *var_name = var_get_name (dv);
559               const char *file_name = fh_get_name (f->handle);
560               struct string s = DS_EMPTY_INITIALIZER;
561               ds_put_format (&s,
562                              _("Variable %s in file %s has different "
563                                "type or width from the same variable in "
564                                "earlier file."),
565                              var_name, file_name);
566               ds_put_cstr (&s, "  ");
567               if (var_is_numeric (dv))
568                 ds_put_format (&s, _("In file %s, %s is numeric."),
569                                file_name, var_name);
570               else
571                 ds_put_format (&s, _("In file %s, %s is a string variable "
572                                      "with width %d."),
573                                file_name, var_name, var_get_width (dv));
574               ds_put_cstr (&s, "  ");
575               if (var_is_numeric (mv))
576                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
577                                var_name);
578               else
579                 ds_put_format (&s, _("In an earlier file, %s was a string "
580                                      "variable with width %d."),
581                                var_name, var_get_width (mv));
582               msg (SE, "%s", ds_cstr (&s));
583               ds_destroy (&s);
584               return false;
585             }
586
587           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
588             var_set_value_labels (mv, var_get_value_labels (dv));
589           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
590             var_set_missing_values (mv, var_get_missing_values (dv));
591           if (var_get_label (dv) && !var_get_label (mv))
592             var_set_label (mv, var_get_label (dv), file_encoding, false);
593         }
594       else
595         mv = dict_clone_var_assert (m, dv);
596     }
597
598   return true;
599 }
600
601 /* If VAR_NAME is non-NULL, attempts to create a
602    variable named VAR_NAME, with format F1.0, in DICT, and stores
603    a pointer to the variable in *VAR.  Returns true if
604    successful, false if the variable name is a duplicate (in
605    which case a message saying that the variable specified on the
606    given SUBCOMMAND is a duplicate is emitted).
607
608    Does nothing and returns true if VAR_NAME is null. */
609 static bool
610 create_flag_var (const char *subcommand, const char *var_name,
611                  struct dictionary *dict, struct variable **var)
612 {
613   if (var_name != NULL)
614     {
615       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
616       *var = dict_create_var (dict, var_name, 0);
617       if (*var == NULL)
618         {
619           msg (SE, _("Variable name %s specified on %s subcommand "
620                      "duplicates an existing variable name."),
621                subcommand, var_name);
622           return false;
623         }
624       var_set_both_formats (*var, &format);
625     }
626   else
627     *var = NULL;
628   return true;
629 }
630
631 /* Closes all the files in PROC and frees their associated data. */
632 static void
633 close_all_comb_files (struct comb_proc *proc)
634 {
635   size_t i;
636
637   for (i = 0; i < proc->n_files; i++)
638     {
639       struct comb_file *file = &proc->files[i];
640       subcase_destroy (&file->by_vars);
641       subcase_destroy (&file->src);
642       subcase_destroy (&file->dst);
643       fh_unref (file->handle);
644       dict_destroy (file->dict);
645       casereader_destroy (file->reader);
646       case_unref (file->data);
647       free (file->in_name);
648     }
649   free (proc->files);
650   proc->files = NULL;
651   proc->n_files = 0;
652 }
653
654 /* Frees all the data for the procedure. */
655 static void
656 free_comb_proc (struct comb_proc *proc)
657 {
658   close_all_comb_files (proc);
659   dict_destroy (proc->dict);
660   casewriter_destroy (proc->output);
661   case_matcher_destroy (proc->matcher);
662   if (proc->prev_BY)
663     {
664       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
665                                 proc->prev_BY);
666       free (proc->prev_BY);
667     }
668   subcase_destroy (&proc->by_vars);
669   case_unref (proc->buffered_case);
670 }
671 \f
672 static bool scan_table (struct comb_file *, union value by[]);
673 static struct ccase *create_output_case (const struct comb_proc *);
674 static void apply_case (const struct comb_file *, struct ccase *);
675 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
676                                          union value by[]);
677 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
678 static void output_buffered_case (struct comb_proc *);
679
680 /* Executes the ADD FILES command. */
681 static void
682 execute_add_files (struct comb_proc *proc)
683 {
684   union value *by;
685
686   while (case_matcher_match (proc->matcher, &by))
687     {
688       size_t i;
689
690       for (i = 0; i < proc->n_files; i++)
691         {
692           struct comb_file *file = &proc->files[i];
693           while (file->is_minimal)
694             {
695               struct ccase *output = create_output_case (proc);
696               apply_file_case_and_advance (file, output, by);
697               output_case (proc, output, by);
698             }
699         }
700     }
701   output_buffered_case (proc);
702 }
703
704 /* Executes the MATCH FILES command. */
705 static void
706 execute_match_files (struct comb_proc *proc)
707 {
708   union value *by;
709
710   while (case_matcher_match (proc->matcher, &by))
711     {
712       struct ccase *output;
713       size_t i;
714
715       output = create_output_case (proc);
716       for (i = proc->n_files; i-- > 0; )
717         {
718           struct comb_file *file = &proc->files[i];
719           if (file->type == COMB_FILE)
720             {
721               if (file->is_minimal)
722                 apply_file_case_and_advance (file, output, NULL);
723             }
724           else
725             {
726               if (scan_table (file, by))
727                 apply_case (file, output);
728             }
729         }
730       output_case (proc, output, by);
731     }
732   output_buffered_case (proc);
733 }
734
735 /* Executes the UPDATE command. */
736 static void
737 execute_update (struct comb_proc *proc)
738 {
739   union value *by;
740   size_t n_duplicates = 0;
741
742   while (case_matcher_match (proc->matcher, &by))
743     {
744       struct comb_file *first, *file;
745       struct ccase *output;
746
747       /* Find first nonnull case in array and make an output case
748          from it. */
749       output = create_output_case (proc);
750       for (first = &proc->files[0]; ; first++)
751         if (first->is_minimal)
752           break;
753       apply_file_case_and_advance (first, output, by);
754
755       /* Read additional cases and update the output case from
756          them.  (Don't update the output case from any duplicate
757          cases in the master file.) */
758       for (file = first + (first == proc->files);
759            file < &proc->files[proc->n_files]; file++)
760         {
761           while (file->is_minimal)
762             apply_file_case_and_advance (file, output, by);
763         }
764       casewriter_write (proc->output, output);
765
766       /* Write duplicate cases in the master file directly to the
767          output.  */
768       if (first == proc->files && first->is_minimal)
769         {
770           n_duplicates++;
771           while (first->is_minimal)
772             {
773               output = create_output_case (proc);
774               apply_file_case_and_advance (first, output, by);
775               casewriter_write (proc->output, output);
776             }
777         }
778     }
779
780   if (n_duplicates)
781     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
782          n_duplicates);
783 }
784
785 /* Reads FILE, which must be of type COMB_TABLE, until it
786    encounters a case with BY or greater for its BY variables.
787    Returns true if a case with exactly BY for its BY variables
788    was found, otherwise false. */
789 static bool
790 scan_table (struct comb_file *file, union value by[])
791 {
792   while (file->data != NULL)
793     {
794       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
795       if (cmp > 0)
796         {
797           case_unref (file->data);
798           file->data = casereader_read (file->reader);
799         }
800       else
801         return cmp == 0;
802     }
803   return false;
804 }
805
806 /* Creates and returns an output case for PROC, initializing each
807    of its values to system-missing or blanks, except that the
808    values of IN variables are set to 0. */
809 static struct ccase *
810 create_output_case (const struct comb_proc *proc)
811 {
812   size_t n_vars = dict_get_var_cnt (proc->dict);
813   struct ccase *output;
814   size_t i;
815
816   output = case_create (dict_get_proto (proc->dict));
817   for (i = 0; i < n_vars; i++)
818     {
819       struct variable *v = dict_get_var (proc->dict, i);
820       value_set_missing (case_data_rw (output, v), var_get_width (v));
821     }
822   for (i = 0; i < proc->n_files; i++)
823     {
824       struct comb_file *file = &proc->files[i];
825       if (file->in_var != NULL)
826         case_data_rw (output, file->in_var)->f = false;
827     }
828   return output;
829 }
830
831 /* Copies the data from FILE's case into output case OUTPUT.
832    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
833 static void
834 apply_case (const struct comb_file *file, struct ccase *output)
835 {
836   subcase_copy (&file->src, file->data, &file->dst, output);
837   if (file->in_var != NULL)
838     case_data_rw (output, file->in_var)->f = true;
839 }
840
841 /* Like apply_case() above, but also advances FILE to its next
842    case.  Also, if BY is nonnull, then FILE's is_minimal member
843    is updated based on whether the new case's BY values still
844    match those in BY. */
845 static void
846 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
847                              union value by[])
848 {
849   apply_case (file, output);
850   case_unref (file->data);
851   file->data = casereader_read (file->reader);
852   if (by)
853     file->is_minimal = (file->data != NULL
854                         && subcase_equal_cx (&file->by_vars, file->data, by));
855 }
856
857 /* Writes OUTPUT, whose BY values has been extracted into BY, to
858    PROC's output file, first initializing any FIRST or LAST
859    variables in OUTPUT to the correct values. */
860 static void
861 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
862 {
863   if (proc->first == NULL && proc->last == NULL)
864     casewriter_write (proc->output, output);
865   else
866     {
867       /* It's harder with LAST, because we can't know whether
868          this case is the last in a group until we've prepared
869          the *next* case also.  Thus, we buffer the previous
870          output case until the next one is ready. */
871       bool new_BY;
872       if (proc->prev_BY != NULL)
873         {
874           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
875           if (proc->last != NULL)
876             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
877           casewriter_write (proc->output, proc->buffered_case);
878         }
879       else
880         new_BY = true;
881
882       proc->buffered_case = output;
883       if (proc->first != NULL)
884         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
885
886       if (new_BY)
887         {
888           size_t n_values = subcase_get_n_fields (&proc->by_vars);
889           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
890           if (proc->prev_BY == NULL)
891             {
892               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
893               caseproto_init_values (proto, proc->prev_BY);
894             }
895           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
896                           proc->prev_BY, by);
897         }
898     }
899 }
900
901 /* Writes a trailing buffered case to the output, if FIRST or
902    LAST is in use. */
903 static void
904 output_buffered_case (struct comb_proc *proc)
905 {
906   if (proc->prev_BY != NULL)
907     {
908       if (proc->last != NULL)
909         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
910       casewriter_write (proc->output, proc->buffered_case);
911       proc->buffered_case = NULL;
912     }
913 }