Rename procedure.[ch] to dataset.[ch].
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/message.h"
39 #include "libpspp/string-array.h"
40 #include "libpspp/taint.h"
41 #include "math/sort.h"
42
43 #include "gl/xalloc.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 enum comb_command_type
49   {
50     COMB_ADD,
51     COMB_MATCH,
52     COMB_UPDATE
53   };
54
55 /* File types. */
56 enum comb_file_type
57   {
58     COMB_FILE,                  /* Specified on FILE= subcommand. */
59     COMB_TABLE                  /* Specified on TABLE= subcommand. */
60   };
61
62 /* One FILE or TABLE subcommand. */
63 struct comb_file
64   {
65     /* Basics. */
66     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
67
68     /* Variables. */
69     struct subcase by_vars;     /* BY variables in this input file. */
70     struct subcase src, dst;    /* Data to copy to output; where to put it. */
71
72     /* Input files. */
73     struct file_handle *handle; /* Input file handle. */
74     struct dictionary *dict;    /* Input file dictionary. */
75     struct casereader *reader;  /* Input data source. */
76     struct ccase *data;         /* The current input case. */
77     bool is_minimal;            /* Does 'data' have minimum BY values across
78                                    all input files? */
79     bool is_sorted;             /* Is file presorted on the BY variables? */
80
81     /* IN subcommand. */
82     char *in_name;
83     struct variable *in_var;
84   };
85
86 struct comb_proc
87   {
88     struct comb_file *files;    /* All the files being merged. */
89     size_t n_files;             /* Number of files. */
90
91     struct dictionary *dict;    /* Dictionary of output file. */
92     struct subcase by_vars;     /* BY variables in the output. */
93     struct casewriter *output;  /* Destination for output. */
94
95     struct case_matcher *matcher;
96
97     /* FIRST, LAST.
98        Only if "first" or "last" is nonnull are the remaining
99        members used. */
100     struct variable *first;     /* Variable specified on FIRST (if any). */
101     struct variable *last;      /* Variable specified on LAST (if any). */
102     struct ccase *buffered_case; /* Case ready for output except that we don't
103                                     know the value for the LAST var yet. */
104     union value *prev_BY;       /* Values of BY vars in buffered_case. */
105   };
106
107 static int combine_files (enum comb_command_type, struct lexer *,
108                           struct dataset *);
109 static void free_comb_proc (struct comb_proc *);
110
111 static void close_all_comb_files (struct comb_proc *);
112 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
113
114 static void execute_update (struct comb_proc *);
115 static void execute_match_files (struct comb_proc *);
116 static void execute_add_files (struct comb_proc *);
117
118 static bool create_flag_var (const char *subcommand_name, const char *var_name,
119                              struct dictionary *, struct variable **);
120 static void output_case (struct comb_proc *, struct ccase *, union value *by);
121 static void output_buffered_case (struct comb_proc *);
122
123 int
124 cmd_add_files (struct lexer *lexer, struct dataset *ds)
125 {
126   return combine_files (COMB_ADD, lexer, ds);
127 }
128
129 int
130 cmd_match_files (struct lexer *lexer, struct dataset *ds)
131 {
132   return combine_files (COMB_MATCH, lexer, ds);
133 }
134
135 int
136 cmd_update (struct lexer *lexer, struct dataset *ds)
137 {
138   return combine_files (COMB_UPDATE, lexer, ds);
139 }
140
141 static int
142 combine_files (enum comb_command_type command,
143                struct lexer *lexer, struct dataset *ds)
144 {
145   struct comb_proc proc;
146
147   bool saw_by = false;
148   bool saw_sort = false;
149   struct casereader *active_file = NULL;
150
151   char *first_name = NULL;
152   char *last_name = NULL;
153
154   struct taint *taint = NULL;
155
156   size_t n_tables = 0;
157   size_t allocated_files = 0;
158
159   size_t i;
160
161   proc.files = NULL;
162   proc.n_files = 0;
163   proc.dict = dict_create ();
164   proc.output = NULL;
165   proc.matcher = NULL;
166   subcase_init_empty (&proc.by_vars);
167   proc.first = NULL;
168   proc.last = NULL;
169   proc.buffered_case = NULL;
170   proc.prev_BY = NULL;
171
172   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
173
174   lex_match (lexer, T_SLASH);
175   for (;;)
176     {
177       struct comb_file *file;
178       enum comb_file_type type;
179
180       if (lex_match_id (lexer, "FILE"))
181         type = COMB_FILE;
182       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
183         {
184           type = COMB_TABLE;
185           n_tables++;
186         }
187       else
188         break;
189       lex_match (lexer, T_EQUALS);
190
191       if (proc.n_files >= allocated_files)
192         proc.files = x2nrealloc (proc.files, &allocated_files,
193                                 sizeof *proc.files);
194       file = &proc.files[proc.n_files++];
195       file->type = type;
196       subcase_init_empty (&file->by_vars);
197       subcase_init_empty (&file->src);
198       subcase_init_empty (&file->dst);
199       file->handle = NULL;
200       file->dict = NULL;
201       file->reader = NULL;
202       file->data = NULL;
203       file->is_sorted = true;
204       file->in_name = NULL;
205       file->in_var = NULL;
206
207       if (lex_match (lexer, T_ASTERISK))
208         {
209           if (!proc_has_active_file (ds))
210             {
211               msg (SE, _("Cannot specify the active file since no active "
212                          "file has been defined."));
213               goto error;
214             }
215
216           if (proc_make_temporary_transformations_permanent (ds))
217             msg (SE, _("This command may not be used after TEMPORARY when "
218                        "the active file is an input source.  "
219                        "Temporary transformations will be made permanent."));
220
221           file->dict = dict_clone (dataset_dict (ds));
222         }
223       else
224         {
225           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
226           if (file->handle == NULL)
227             goto error;
228
229           file->reader = any_reader_open (file->handle, &file->dict);
230           if (file->reader == NULL)
231             goto error;
232         }
233
234       while (lex_match (lexer, T_SLASH))
235         if (lex_match_id (lexer, "RENAME"))
236           {
237             if (!parse_dict_rename (lexer, file->dict))
238               goto error;
239           }
240         else if (lex_match_id (lexer, "IN"))
241           {
242             lex_match (lexer, T_EQUALS);
243             if (lex_token (lexer) != T_ID)
244               {
245                 lex_error (lexer, NULL);
246                 goto error;
247               }
248
249             if (file->in_name)
250               {
251                 msg (SE, _("Multiple IN subcommands for a single FILE or "
252                            "TABLE."));
253                 goto error;
254               }
255             file->in_name = xstrdup (lex_tokcstr (lexer));
256             lex_get (lexer);
257           }
258         else if (lex_match_id (lexer, "SORT"))
259           {
260             file->is_sorted = false;
261             saw_sort = true;
262           }
263
264       merge_dictionary (proc.dict, file);
265     }
266
267   while (lex_token (lexer) != T_ENDCMD)
268     {
269       if (lex_match (lexer, T_BY))
270         {
271           const struct variable **by_vars;
272           size_t i;
273           bool ok;
274
275           if (saw_by)
276             {
277               lex_sbc_only_once ("BY");
278               goto error;
279             }
280           saw_by = true;
281
282           lex_match (lexer, T_EQUALS);
283           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
284                                     &by_vars, NULL))
285             goto error;
286
287           ok = true;
288           for (i = 0; i < proc.n_files; i++)
289             {
290               struct comb_file *file = &proc.files[i];
291               size_t j;
292
293               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
294                 {
295                   const char *name = var_get_name (by_vars[j]);
296                   struct variable *var = dict_lookup_var (file->dict, name);
297                   if (var != NULL)
298                     subcase_add_var (&file->by_vars, var,
299                                      subcase_get_direction (&proc.by_vars, j));
300                   else
301                     {
302                       if (file->handle != NULL)
303                         msg (SE, _("File %s lacks BY variable %s."),
304                              fh_get_name (file->handle), name);
305                       else
306                         msg (SE, _("Active file lacks BY variable %s."), name);
307                       ok = false;
308                     }
309                 }
310               assert (!ok || subcase_conformable (&file->by_vars,
311                                                   &proc.files[0].by_vars));
312             }
313           free (by_vars);
314
315           if (!ok)
316             goto error;
317         }
318       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
319         {
320           if (first_name != NULL)
321             {
322               lex_sbc_only_once ("FIRST");
323               goto error;
324             }
325
326           lex_match (lexer, T_EQUALS);
327           if (!lex_force_id (lexer))
328             goto error;
329           first_name = xstrdup (lex_tokcstr (lexer));
330           lex_get (lexer);
331         }
332       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
333         {
334           if (last_name != NULL)
335             {
336               lex_sbc_only_once ("LAST");
337               goto error;
338             }
339
340           lex_match (lexer, T_EQUALS);
341           if (!lex_force_id (lexer))
342             goto error;
343           last_name = xstrdup (lex_tokcstr (lexer));
344           lex_get (lexer);
345         }
346       else if (lex_match_id (lexer, "MAP"))
347         {
348           /* FIXME. */
349         }
350       else if (lex_match_id (lexer, "DROP"))
351         {
352           if (!parse_dict_drop (lexer, proc.dict))
353             goto error;
354         }
355       else if (lex_match_id (lexer, "KEEP"))
356         {
357           if (!parse_dict_keep (lexer, proc.dict))
358             goto error;
359         }
360       else
361         {
362           lex_error (lexer, NULL);
363           goto error;
364         }
365
366       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
367         {
368           lex_end_of_command (lexer);
369           goto error;
370         }
371     }
372
373   if (!saw_by)
374     {
375       if (command == COMB_UPDATE)
376         {
377           msg (SE, _("The BY subcommand is required."));
378           goto error;
379         }
380       if (n_tables)
381         {
382           msg (SE, _("BY is required when %s is specified."), "TABLE");
383           goto error;
384         }
385       if (saw_sort)
386         {
387           msg (SE, _("BY is required when %s is specified."), "SORT");
388           goto error;
389         }
390     }
391
392   /* Add IN, FIRST, and LAST variables to master dictionary. */
393   for (i = 0; i < proc.n_files; i++)
394     {
395       struct comb_file *file = &proc.files[i];
396       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
397         goto error;
398     }
399   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
400       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
401     goto error;
402
403   dict_delete_scratch_vars (proc.dict);
404   dict_compact_values (proc.dict);
405
406   /* Set up mapping from each file's variables to master
407      variables. */
408   for (i = 0; i < proc.n_files; i++)
409     {
410       struct comb_file *file = &proc.files[i];
411       size_t src_var_cnt = dict_get_var_cnt (file->dict);
412       size_t j;
413
414       for (j = 0; j < src_var_cnt; j++)
415         {
416           struct variable *src_var = dict_get_var (file->dict, j);
417           struct variable *dst_var = dict_lookup_var (proc.dict,
418                                                       var_get_name (src_var));
419           if (dst_var != NULL)
420             {
421               subcase_add_var (&file->src, src_var, SC_ASCEND);
422               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
423             }
424         }
425     }
426
427   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
428   taint = taint_clone (casewriter_get_taint (proc.output));
429
430   /* Set up case matcher. */
431   proc.matcher = case_matcher_create ();
432   for (i = 0; i < proc.n_files; i++)
433     {
434       struct comb_file *file = &proc.files[i];
435       if (file->reader == NULL)
436         {
437           if (active_file == NULL)
438             {
439               proc_discard_output (ds);
440               file->reader = active_file = proc_open (ds);
441             }
442           else
443             file->reader = casereader_clone (active_file);
444         }
445       if (!file->is_sorted)
446         file->reader = sort_execute (file->reader, &file->by_vars);
447       taint_propagate (casereader_get_taint (file->reader), taint);
448       file->data = casereader_read (file->reader);
449       if (file->type == COMB_FILE)
450         case_matcher_add_input (proc.matcher, &file->by_vars,
451                                 &file->data, &file->is_minimal);
452     }
453
454   if (command == COMB_ADD)
455     execute_add_files (&proc);
456   else if (command == COMB_MATCH)
457     execute_match_files (&proc);
458   else if (command == COMB_UPDATE)
459     execute_update (&proc);
460   else
461     NOT_REACHED ();
462
463   case_matcher_destroy (proc.matcher);
464   proc.matcher = NULL;
465   close_all_comb_files (&proc);
466   if (active_file != NULL)
467     proc_commit (ds);
468
469   proc_set_active_file (ds, casewriter_make_reader (proc.output), proc.dict);
470   proc.dict = NULL;
471   proc.output = NULL;
472
473   free_comb_proc (&proc);
474
475   free (first_name);
476   free (last_name);
477
478   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
479
480  error:
481   if (active_file != NULL)
482     proc_commit (ds);
483   free_comb_proc (&proc);
484   taint_destroy (taint);
485   free (first_name);
486   free (last_name);
487   return CMD_CASCADING_FAILURE;
488 }
489
490 /* Merge the dictionary for file F into master dictionary M. */
491 static bool
492 merge_dictionary (struct dictionary *const m, struct comb_file *f)
493 {
494   struct dictionary *d = f->dict;
495   const struct string_array *d_docs, *m_docs;
496   int i;
497   const char *file_encoding;
498
499   if (dict_get_label (m) == NULL)
500     dict_set_label (m, dict_get_label (d));
501
502   d_docs = dict_get_documents (d);
503   m_docs = dict_get_documents (m);
504
505
506   /* FIXME: If the input files have different encodings, then
507      the result is undefined.
508      The correct thing to do would be to convert to an encoding
509      which can cope with all the input files (eg UTF-8).
510    */
511   file_encoding = dict_get_encoding (f->dict);
512   if ( file_encoding != NULL)
513     {
514       if ( dict_get_encoding (m) == NULL)
515         dict_set_encoding (m, file_encoding);
516       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
517         {
518           msg (MW,
519                _("Combining files with incompatible encodings. String data may not be represented correctly."));
520         }
521     }
522
523   if (d_docs != NULL)
524     {
525       if (m_docs == NULL)
526         dict_set_documents (m, d_docs);
527       else
528         {
529           struct string_array new_docs;
530           size_t i;
531
532           new_docs.n = m_docs->n + d_docs->n;
533           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
534           for (i = 0; i < m_docs->n; i++)
535             new_docs.strings[i] = m_docs->strings[i];
536           for (i = 0; i < d_docs->n; i++)
537             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
538
539           dict_set_documents (m, &new_docs);
540
541           free (new_docs.strings);
542         }
543     }
544
545   for (i = 0; i < dict_get_var_cnt (d); i++)
546     {
547       struct variable *dv = dict_get_var (d, i);
548       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
549
550       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
551         continue;
552
553       if (mv != NULL)
554         {
555           if (var_get_width (mv) != var_get_width (dv))
556             {
557               const char *var_name = var_get_name (dv);
558               const char *file_name = fh_get_name (f->handle);
559               struct string s = DS_EMPTY_INITIALIZER;
560               ds_put_format (&s,
561                              _("Variable %s in file %s has different "
562                                "type or width from the same variable in "
563                                "earlier file."),
564                              var_name, file_name);
565               ds_put_cstr (&s, "  ");
566               if (var_is_numeric (dv))
567                 ds_put_format (&s, _("In file %s, %s is numeric."),
568                                file_name, var_name);
569               else
570                 ds_put_format (&s, _("In file %s, %s is a string variable "
571                                      "with width %d."),
572                                file_name, var_name, var_get_width (dv));
573               ds_put_cstr (&s, "  ");
574               if (var_is_numeric (mv))
575                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
576                                var_name);
577               else
578                 ds_put_format (&s, _("In an earlier file, %s was a string "
579                                      "variable with width %d."),
580                                var_name, var_get_width (mv));
581               msg (SE, "%s", ds_cstr (&s));
582               ds_destroy (&s);
583               return false;
584             }
585
586           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
587             var_set_value_labels (mv, var_get_value_labels (dv));
588           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
589             var_set_missing_values (mv, var_get_missing_values (dv));
590           if (var_get_label (dv) && !var_get_label (mv))
591             var_set_label (mv, var_get_label (dv), file_encoding, false);
592         }
593       else
594         mv = dict_clone_var_assert (m, dv);
595     }
596
597   return true;
598 }
599
600 /* If VAR_NAME is non-NULL, attempts to create a
601    variable named VAR_NAME, with format F1.0, in DICT, and stores
602    a pointer to the variable in *VAR.  Returns true if
603    successful, false if the variable name is a duplicate (in
604    which case a message saying that the variable specified on the
605    given SUBCOMMAND is a duplicate is emitted).
606
607    Does nothing and returns true if VAR_NAME is null. */
608 static bool
609 create_flag_var (const char *subcommand, const char *var_name,
610                  struct dictionary *dict, struct variable **var)
611 {
612   if (var_name != NULL)
613     {
614       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
615       *var = dict_create_var (dict, var_name, 0);
616       if (*var == NULL)
617         {
618           msg (SE, _("Variable name %s specified on %s subcommand "
619                      "duplicates an existing variable name."),
620                subcommand, var_name);
621           return false;
622         }
623       var_set_both_formats (*var, &format);
624     }
625   else
626     *var = NULL;
627   return true;
628 }
629
630 /* Closes all the files in PROC and frees their associated data. */
631 static void
632 close_all_comb_files (struct comb_proc *proc)
633 {
634   size_t i;
635
636   for (i = 0; i < proc->n_files; i++)
637     {
638       struct comb_file *file = &proc->files[i];
639       subcase_destroy (&file->by_vars);
640       subcase_destroy (&file->src);
641       subcase_destroy (&file->dst);
642       fh_unref (file->handle);
643       dict_destroy (file->dict);
644       casereader_destroy (file->reader);
645       case_unref (file->data);
646       free (file->in_name);
647     }
648   free (proc->files);
649   proc->files = NULL;
650   proc->n_files = 0;
651 }
652
653 /* Frees all the data for the procedure. */
654 static void
655 free_comb_proc (struct comb_proc *proc)
656 {
657   close_all_comb_files (proc);
658   dict_destroy (proc->dict);
659   casewriter_destroy (proc->output);
660   case_matcher_destroy (proc->matcher);
661   if (proc->prev_BY)
662     {
663       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
664                                 proc->prev_BY);
665       free (proc->prev_BY);
666     }
667   subcase_destroy (&proc->by_vars);
668   case_unref (proc->buffered_case);
669 }
670 \f
671 static bool scan_table (struct comb_file *, union value by[]);
672 static struct ccase *create_output_case (const struct comb_proc *);
673 static void apply_case (const struct comb_file *, struct ccase *);
674 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
675                                          union value by[]);
676 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
677 static void output_buffered_case (struct comb_proc *);
678
679 /* Executes the ADD FILES command. */
680 static void
681 execute_add_files (struct comb_proc *proc)
682 {
683   union value *by;
684
685   while (case_matcher_match (proc->matcher, &by))
686     {
687       size_t i;
688
689       for (i = 0; i < proc->n_files; i++)
690         {
691           struct comb_file *file = &proc->files[i];
692           while (file->is_minimal)
693             {
694               struct ccase *output = create_output_case (proc);
695               apply_file_case_and_advance (file, output, by);
696               output_case (proc, output, by);
697             }
698         }
699     }
700   output_buffered_case (proc);
701 }
702
703 /* Executes the MATCH FILES command. */
704 static void
705 execute_match_files (struct comb_proc *proc)
706 {
707   union value *by;
708
709   while (case_matcher_match (proc->matcher, &by))
710     {
711       struct ccase *output;
712       size_t i;
713
714       output = create_output_case (proc);
715       for (i = proc->n_files; i-- > 0; )
716         {
717           struct comb_file *file = &proc->files[i];
718           if (file->type == COMB_FILE)
719             {
720               if (file->is_minimal)
721                 apply_file_case_and_advance (file, output, NULL);
722             }
723           else
724             {
725               if (scan_table (file, by))
726                 apply_case (file, output);
727             }
728         }
729       output_case (proc, output, by);
730     }
731   output_buffered_case (proc);
732 }
733
734 /* Executes the UPDATE command. */
735 static void
736 execute_update (struct comb_proc *proc)
737 {
738   union value *by;
739   size_t n_duplicates = 0;
740
741   while (case_matcher_match (proc->matcher, &by))
742     {
743       struct comb_file *first, *file;
744       struct ccase *output;
745
746       /* Find first nonnull case in array and make an output case
747          from it. */
748       output = create_output_case (proc);
749       for (first = &proc->files[0]; ; first++)
750         if (first->is_minimal)
751           break;
752       apply_file_case_and_advance (first, output, by);
753
754       /* Read additional cases and update the output case from
755          them.  (Don't update the output case from any duplicate
756          cases in the master file.) */
757       for (file = first + (first == proc->files);
758            file < &proc->files[proc->n_files]; file++)
759         {
760           while (file->is_minimal)
761             apply_file_case_and_advance (file, output, by);
762         }
763       casewriter_write (proc->output, output);
764
765       /* Write duplicate cases in the master file directly to the
766          output.  */
767       if (first == proc->files && first->is_minimal)
768         {
769           n_duplicates++;
770           while (first->is_minimal)
771             {
772               output = create_output_case (proc);
773               apply_file_case_and_advance (first, output, by);
774               casewriter_write (proc->output, output);
775             }
776         }
777     }
778
779   if (n_duplicates)
780     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
781          n_duplicates);
782 }
783
784 /* Reads FILE, which must be of type COMB_TABLE, until it
785    encounters a case with BY or greater for its BY variables.
786    Returns true if a case with exactly BY for its BY variables
787    was found, otherwise false. */
788 static bool
789 scan_table (struct comb_file *file, union value by[])
790 {
791   while (file->data != NULL)
792     {
793       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
794       if (cmp > 0)
795         {
796           case_unref (file->data);
797           file->data = casereader_read (file->reader);
798         }
799       else
800         return cmp == 0;
801     }
802   return false;
803 }
804
805 /* Creates and returns an output case for PROC, initializing each
806    of its values to system-missing or blanks, except that the
807    values of IN variables are set to 0. */
808 static struct ccase *
809 create_output_case (const struct comb_proc *proc)
810 {
811   size_t n_vars = dict_get_var_cnt (proc->dict);
812   struct ccase *output;
813   size_t i;
814
815   output = case_create (dict_get_proto (proc->dict));
816   for (i = 0; i < n_vars; i++)
817     {
818       struct variable *v = dict_get_var (proc->dict, i);
819       value_set_missing (case_data_rw (output, v), var_get_width (v));
820     }
821   for (i = 0; i < proc->n_files; i++)
822     {
823       struct comb_file *file = &proc->files[i];
824       if (file->in_var != NULL)
825         case_data_rw (output, file->in_var)->f = false;
826     }
827   return output;
828 }
829
830 /* Copies the data from FILE's case into output case OUTPUT.
831    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
832 static void
833 apply_case (const struct comb_file *file, struct ccase *output)
834 {
835   subcase_copy (&file->src, file->data, &file->dst, output);
836   if (file->in_var != NULL)
837     case_data_rw (output, file->in_var)->f = true;
838 }
839
840 /* Like apply_case() above, but also advances FILE to its next
841    case.  Also, if BY is nonnull, then FILE's is_minimal member
842    is updated based on whether the new case's BY values still
843    match those in BY. */
844 static void
845 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
846                              union value by[])
847 {
848   apply_case (file, output);
849   case_unref (file->data);
850   file->data = casereader_read (file->reader);
851   if (by)
852     file->is_minimal = (file->data != NULL
853                         && subcase_equal_cx (&file->by_vars, file->data, by));
854 }
855
856 /* Writes OUTPUT, whose BY values has been extracted into BY, to
857    PROC's output file, first initializing any FIRST or LAST
858    variables in OUTPUT to the correct values. */
859 static void
860 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
861 {
862   if (proc->first == NULL && proc->last == NULL)
863     casewriter_write (proc->output, output);
864   else
865     {
866       /* It's harder with LAST, because we can't know whether
867          this case is the last in a group until we've prepared
868          the *next* case also.  Thus, we buffer the previous
869          output case until the next one is ready. */
870       bool new_BY;
871       if (proc->prev_BY != NULL)
872         {
873           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
874           if (proc->last != NULL)
875             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
876           casewriter_write (proc->output, proc->buffered_case);
877         }
878       else
879         new_BY = true;
880
881       proc->buffered_case = output;
882       if (proc->first != NULL)
883         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
884
885       if (new_BY)
886         {
887           size_t n_values = subcase_get_n_fields (&proc->by_vars);
888           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
889           if (proc->prev_BY == NULL)
890             {
891               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
892               caseproto_init_values (proto, proc->prev_BY);
893             }
894           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
895                           proc->prev_BY, by);
896         }
897     }
898 }
899
900 /* Writes a trailing buffered case to the output, if FIRST or
901    LAST is in use. */
902 static void
903 output_buffered_case (struct comb_proc *proc)
904 {
905   if (proc->prev_BY != NULL)
906     {
907       if (proc->last != NULL)
908         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
909       casewriter_write (proc->output, proc->buffered_case);
910       proc->buffered_case = NULL;
911     }
912 }