combine-files: Eliminate VAR_NAME_LEN restriction from combine_files().
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-reader.h>
22 #include <data/case-matcher.h>
23 #include <data/case.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/format.h>
28 #include <data/procedure.h>
29 #include <data/subcase.h>
30 #include <data/variable.h>
31 #include <language/command.h>
32 #include <language/data-io/file-handle.h>
33 #include <language/data-io/trim.h>
34 #include <language/lexer/lexer.h>
35 #include <language/lexer/variable-parser.h>
36 #include <language/stats/sort-criteria.h>
37 #include <libpspp/assertion.h>
38 #include <libpspp/message.h>
39 #include <libpspp/taint.h>
40 #include <math/sort.h>
41
42 #include "xalloc.h"
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 enum comb_command_type
48   {
49     COMB_ADD,
50     COMB_MATCH,
51     COMB_UPDATE
52   };
53
54 /* File types. */
55 enum comb_file_type
56   {
57     COMB_FILE,                  /* Specified on FILE= subcommand. */
58     COMB_TABLE                  /* Specified on TABLE= subcommand. */
59   };
60
61 /* One FILE or TABLE subcommand. */
62 struct comb_file
63   {
64     /* Basics. */
65     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
66
67     /* Variables. */
68     struct subcase by_vars;     /* BY variables in this input file. */
69     struct subcase src, dst;    /* Data to copy to output; where to put it. */
70
71     /* Input files. */
72     struct file_handle *handle; /* Input file handle. */
73     struct dictionary *dict;    /* Input file dictionary. */
74     struct casereader *reader;  /* Input data source. */
75     struct ccase *data;         /* The current input case. */
76     bool is_minimal;            /* Does 'data' have minimum BY values across
77                                    all input files? */
78     bool is_sorted;             /* Is file presorted on the BY variables? */
79
80     /* IN subcommand. */
81     char *in_name;
82     struct variable *in_var;
83   };
84
85 struct comb_proc
86   {
87     struct comb_file *files;    /* All the files being merged. */
88     size_t n_files;             /* Number of files. */
89
90     struct dictionary *dict;    /* Dictionary of output file. */
91     struct subcase by_vars;     /* BY variables in the output. */
92     struct casewriter *output;  /* Destination for output. */
93
94     struct case_matcher *matcher;
95
96     /* FIRST, LAST.
97        Only if "first" or "last" is nonnull are the remaining
98        members used. */
99     struct variable *first;     /* Variable specified on FIRST (if any). */
100     struct variable *last;      /* Variable specified on LAST (if any). */
101     struct ccase *buffered_case; /* Case ready for output except that we don't
102                                     know the value for the LAST var yet. */
103     union value *prev_BY;       /* Values of BY vars in buffered_case. */
104   };
105
106 static int combine_files (enum comb_command_type, struct lexer *,
107                           struct dataset *);
108 static void free_comb_proc (struct comb_proc *);
109
110 static void close_all_comb_files (struct comb_proc *);
111 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
112
113 static void execute_update (struct comb_proc *);
114 static void execute_match_files (struct comb_proc *);
115 static void execute_add_files (struct comb_proc *);
116
117 static bool create_flag_var (const char *subcommand_name, const char *var_name,
118                              struct dictionary *, struct variable **);
119 static void output_case (struct comb_proc *, struct ccase *, union value *by);
120 static void output_buffered_case (struct comb_proc *);
121
122 int
123 cmd_add_files (struct lexer *lexer, struct dataset *ds)
124 {
125   return combine_files (COMB_ADD, lexer, ds);
126 }
127
128 int
129 cmd_match_files (struct lexer *lexer, struct dataset *ds)
130 {
131   return combine_files (COMB_MATCH, lexer, ds);
132 }
133
134 int
135 cmd_update (struct lexer *lexer, struct dataset *ds)
136 {
137   return combine_files (COMB_UPDATE, lexer, ds);
138 }
139
140 static int
141 combine_files (enum comb_command_type command,
142                struct lexer *lexer, struct dataset *ds)
143 {
144   struct comb_proc proc;
145
146   bool saw_by = false;
147   bool saw_sort = false;
148   struct casereader *active_file = NULL;
149
150   char *first_name = NULL;
151   char *last_name = NULL;
152
153   struct taint *taint = NULL;
154
155   size_t n_tables = 0;
156   size_t allocated_files = 0;
157
158   size_t i;
159
160   proc.files = NULL;
161   proc.n_files = 0;
162   proc.dict = dict_create ();
163   proc.output = NULL;
164   proc.matcher = NULL;
165   subcase_init_empty (&proc.by_vars);
166   proc.first = NULL;
167   proc.last = NULL;
168   proc.buffered_case = NULL;
169   proc.prev_BY = NULL;
170
171   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
172
173   lex_match (lexer, T_SLASH);
174   for (;;)
175     {
176       struct comb_file *file;
177       enum comb_file_type type;
178
179       if (lex_match_id (lexer, "FILE"))
180         type = COMB_FILE;
181       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
182         {
183           type = COMB_TABLE;
184           n_tables++;
185         }
186       else
187         break;
188       lex_match (lexer, T_EQUALS);
189
190       if (proc.n_files >= allocated_files)
191         proc.files = x2nrealloc (proc.files, &allocated_files,
192                                 sizeof *proc.files);
193       file = &proc.files[proc.n_files++];
194       file->type = type;
195       subcase_init_empty (&file->by_vars);
196       subcase_init_empty (&file->src);
197       subcase_init_empty (&file->dst);
198       file->handle = NULL;
199       file->dict = NULL;
200       file->reader = NULL;
201       file->data = NULL;
202       file->is_sorted = true;
203       file->in_name = NULL;
204       file->in_var = NULL;
205
206       if (lex_match (lexer, T_ASTERISK))
207         {
208           if (!proc_has_active_file (ds))
209             {
210               msg (SE, _("Cannot specify the active file since no active "
211                          "file has been defined."));
212               goto error;
213             }
214
215           if (proc_make_temporary_transformations_permanent (ds))
216             msg (SE, _("This command may not be used after TEMPORARY when "
217                        "the active file is an input source.  "
218                        "Temporary transformations will be made permanent."));
219
220           file->dict = dict_clone (dataset_dict (ds));
221         }
222       else
223         {
224           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
225           if (file->handle == NULL)
226             goto error;
227
228           file->reader = any_reader_open (file->handle, &file->dict);
229           if (file->reader == NULL)
230             goto error;
231         }
232
233       while (lex_match (lexer, T_SLASH))
234         if (lex_match_id (lexer, "RENAME"))
235           {
236             if (!parse_dict_rename (lexer, file->dict))
237               goto error;
238           }
239         else if (lex_match_id (lexer, "IN"))
240           {
241             lex_match (lexer, T_EQUALS);
242             if (lex_token (lexer) != T_ID)
243               {
244                 lex_error (lexer, NULL);
245                 goto error;
246               }
247
248             if (file->in_name)
249               {
250                 msg (SE, _("Multiple IN subcommands for a single FILE or "
251                            "TABLE."));
252                 goto error;
253               }
254             file->in_name = xstrdup (lex_tokcstr (lexer));
255             lex_get (lexer);
256           }
257         else if (lex_match_id (lexer, "SORT"))
258           {
259             file->is_sorted = false;
260             saw_sort = true;
261           }
262
263       merge_dictionary (proc.dict, file);
264     }
265
266   while (lex_token (lexer) != T_ENDCMD)
267     {
268       if (lex_match (lexer, T_BY))
269         {
270           const struct variable **by_vars;
271           size_t i;
272           bool ok;
273
274           if (saw_by)
275             {
276               lex_sbc_only_once ("BY");
277               goto error;
278             }
279           saw_by = true;
280
281           lex_match (lexer, T_EQUALS);
282           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
283                                     &by_vars, NULL))
284             goto error;
285
286           ok = true;
287           for (i = 0; i < proc.n_files; i++)
288             {
289               struct comb_file *file = &proc.files[i];
290               size_t j;
291
292               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
293                 {
294                   const char *name = var_get_name (by_vars[j]);
295                   struct variable *var = dict_lookup_var (file->dict, name);
296                   if (var != NULL)
297                     subcase_add_var (&file->by_vars, var,
298                                      subcase_get_direction (&proc.by_vars, j));
299                   else
300                     {
301                       if (file->handle != NULL)
302                         msg (SE, _("File %s lacks BY variable %s."),
303                              fh_get_name (file->handle), name);
304                       else
305                         msg (SE, _("Active file lacks BY variable %s."), name);
306                       ok = false;
307                     }
308                 }
309               assert (!ok || subcase_conformable (&file->by_vars,
310                                                   &proc.files[0].by_vars));
311             }
312           free (by_vars);
313
314           if (!ok)
315             goto error;
316         }
317       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
318         {
319           if (first_name != NULL)
320             {
321               lex_sbc_only_once ("FIRST");
322               goto error;
323             }
324
325           lex_match (lexer, T_EQUALS);
326           if (!lex_force_id (lexer))
327             goto error;
328           first_name = xstrdup (lex_tokcstr (lexer));
329           lex_get (lexer);
330         }
331       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
332         {
333           if (last_name != NULL)
334             {
335               lex_sbc_only_once ("LAST");
336               goto error;
337             }
338
339           lex_match (lexer, T_EQUALS);
340           if (!lex_force_id (lexer))
341             goto error;
342           last_name = xstrdup (lex_tokcstr (lexer));
343           lex_get (lexer);
344         }
345       else if (lex_match_id (lexer, "MAP"))
346         {
347           /* FIXME. */
348         }
349       else if (lex_match_id (lexer, "DROP"))
350         {
351           if (!parse_dict_drop (lexer, proc.dict))
352             goto error;
353         }
354       else if (lex_match_id (lexer, "KEEP"))
355         {
356           if (!parse_dict_keep (lexer, proc.dict))
357             goto error;
358         }
359       else
360         {
361           lex_error (lexer, NULL);
362           goto error;
363         }
364
365       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
366         {
367           lex_end_of_command (lexer);
368           goto error;
369         }
370     }
371
372   if (!saw_by)
373     {
374       if (command == COMB_UPDATE)
375         {
376           msg (SE, _("The BY subcommand is required."));
377           goto error;
378         }
379       if (n_tables)
380         {
381           msg (SE, _("BY is required when %s is specified."), "TABLE");
382           goto error;
383         }
384       if (saw_sort)
385         {
386           msg (SE, _("BY is required when %s is specified."), "SORT");
387           goto error;
388         }
389     }
390
391   /* Add IN, FIRST, and LAST variables to master dictionary. */
392   for (i = 0; i < proc.n_files; i++)
393     {
394       struct comb_file *file = &proc.files[i];
395       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
396         goto error;
397     }
398   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
399       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
400     goto error;
401
402   dict_delete_scratch_vars (proc.dict);
403   dict_compact_values (proc.dict);
404
405   /* Set up mapping from each file's variables to master
406      variables. */
407   for (i = 0; i < proc.n_files; i++)
408     {
409       struct comb_file *file = &proc.files[i];
410       size_t src_var_cnt = dict_get_var_cnt (file->dict);
411       size_t j;
412
413       for (j = 0; j < src_var_cnt; j++)
414         {
415           struct variable *src_var = dict_get_var (file->dict, j);
416           struct variable *dst_var = dict_lookup_var (proc.dict,
417                                                       var_get_name (src_var));
418           if (dst_var != NULL)
419             {
420               subcase_add_var (&file->src, src_var, SC_ASCEND);
421               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
422             }
423         }
424     }
425
426   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
427   taint = taint_clone (casewriter_get_taint (proc.output));
428
429   /* Set up case matcher. */
430   proc.matcher = case_matcher_create ();
431   for (i = 0; i < proc.n_files; i++)
432     {
433       struct comb_file *file = &proc.files[i];
434       if (file->reader == NULL)
435         {
436           if (active_file == NULL)
437             {
438               proc_discard_output (ds);
439               file->reader = active_file = proc_open (ds);
440             }
441           else
442             file->reader = casereader_clone (active_file);
443         }
444       if (!file->is_sorted)
445         file->reader = sort_execute (file->reader, &file->by_vars);
446       taint_propagate (casereader_get_taint (file->reader), taint);
447       file->data = casereader_read (file->reader);
448       if (file->type == COMB_FILE)
449         case_matcher_add_input (proc.matcher, &file->by_vars,
450                                 &file->data, &file->is_minimal);
451     }
452
453   if (command == COMB_ADD)
454     execute_add_files (&proc);
455   else if (command == COMB_MATCH)
456     execute_match_files (&proc);
457   else if (command == COMB_UPDATE)
458     execute_update (&proc);
459   else
460     NOT_REACHED ();
461
462   case_matcher_destroy (proc.matcher);
463   proc.matcher = NULL;
464   close_all_comb_files (&proc);
465   if (active_file != NULL)
466     proc_commit (ds);
467
468   proc_set_active_file (ds, casewriter_make_reader (proc.output), proc.dict);
469   proc.dict = NULL;
470   proc.output = NULL;
471
472   free_comb_proc (&proc);
473
474   free (first_name);
475   free (last_name);
476
477   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
478
479  error:
480   if (active_file != NULL)
481     proc_commit (ds);
482   free_comb_proc (&proc);
483   taint_destroy (taint);
484   free (first_name);
485   free (last_name);
486   return CMD_CASCADING_FAILURE;
487 }
488
489 /* Merge the dictionary for file F into master dictionary M. */
490 static bool
491 merge_dictionary (struct dictionary *const m, struct comb_file *f)
492 {
493   struct dictionary *d = f->dict;
494   const char *d_docs, *m_docs;
495   int i;
496   const char *file_encoding;
497
498   if (dict_get_label (m) == NULL)
499     dict_set_label (m, dict_get_label (d));
500
501   d_docs = dict_get_documents (d);
502   m_docs = dict_get_documents (m);
503
504
505   /* FIXME: If the input files have different encodings, then
506      the result is undefined.
507      The correct thing to do would be to convert to an encoding
508      which can cope with all the input files (eg UTF-8).
509    */
510   file_encoding = dict_get_encoding (f->dict);
511   if ( file_encoding != NULL)
512     {
513       if ( dict_get_encoding (m) == NULL)
514         dict_set_encoding (m, file_encoding);
515       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
516         {
517           msg (MW,
518                _("Combining files with incompatible encodings. String data may not be represented correctly."));
519         }
520     }
521
522   if (d_docs != NULL)
523     {
524       if (m_docs == NULL)
525         dict_set_documents (m, d_docs);
526       else
527         {
528           char *new_docs = xasprintf ("%s%s", m_docs, d_docs);
529           dict_set_documents (m, new_docs);
530           free (new_docs);
531         }
532     }
533
534   for (i = 0; i < dict_get_var_cnt (d); i++)
535     {
536       struct variable *dv = dict_get_var (d, i);
537       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
538
539       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
540         continue;
541
542       if (mv != NULL)
543         {
544           if (var_get_width (mv) != var_get_width (dv))
545             {
546               const char *var_name = var_get_name (dv);
547               const char *file_name = fh_get_name (f->handle);
548               struct string s = DS_EMPTY_INITIALIZER;
549               ds_put_format (&s,
550                              _("Variable %s in file %s has different "
551                                "type or width from the same variable in "
552                                "earlier file."),
553                              var_name, file_name);
554               ds_put_cstr (&s, "  ");
555               if (var_is_numeric (dv))
556                 ds_put_format (&s, _("In file %s, %s is numeric."),
557                                file_name, var_name);
558               else
559                 ds_put_format (&s, _("In file %s, %s is a string variable "
560                                      "with width %d."),
561                                file_name, var_name, var_get_width (dv));
562               ds_put_cstr (&s, "  ");
563               if (var_is_numeric (mv))
564                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
565                                var_name);
566               else
567                 ds_put_format (&s, _("In an earlier file, %s was a string "
568                                      "variable with width %d."),
569                                var_name, var_get_width (mv));
570               msg (SE, "%s", ds_cstr (&s));
571               ds_destroy (&s);
572               return false;
573             }
574
575           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
576             var_set_value_labels (mv, var_get_value_labels (dv));
577           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
578             var_set_missing_values (mv, var_get_missing_values (dv));
579           if (var_get_label (dv) && !var_get_label (mv))
580             var_set_label (mv, var_get_label (dv));
581         }
582       else
583         mv = dict_clone_var_assert (m, dv);
584     }
585
586   return true;
587 }
588
589 /* If VAR_NAME is non-NULL, attempts to create a
590    variable named VAR_NAME, with format F1.0, in DICT, and stores
591    a pointer to the variable in *VAR.  Returns true if
592    successful, false if the variable name is a duplicate (in
593    which case a message saying that the variable specified on the
594    given SUBCOMMAND is a duplicate is emitted).
595
596    Does nothing and returns true if VAR_NAME is null. */
597 static bool
598 create_flag_var (const char *subcommand, const char *var_name,
599                  struct dictionary *dict, struct variable **var)
600 {
601   if (var_name != NULL)
602     {
603       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
604       *var = dict_create_var (dict, var_name, 0);
605       if (*var == NULL)
606         {
607           msg (SE, _("Variable name %s specified on %s subcommand "
608                      "duplicates an existing variable name."),
609                subcommand, var_name);
610           return false;
611         }
612       var_set_both_formats (*var, &format);
613     }
614   else
615     *var = NULL;
616   return true;
617 }
618
619 /* Closes all the files in PROC and frees their associated data. */
620 static void
621 close_all_comb_files (struct comb_proc *proc)
622 {
623   size_t i;
624
625   for (i = 0; i < proc->n_files; i++)
626     {
627       struct comb_file *file = &proc->files[i];
628       subcase_destroy (&file->by_vars);
629       subcase_destroy (&file->src);
630       subcase_destroy (&file->dst);
631       fh_unref (file->handle);
632       dict_destroy (file->dict);
633       casereader_destroy (file->reader);
634       case_unref (file->data);
635       free (file->in_name);
636     }
637   free (proc->files);
638   proc->files = NULL;
639   proc->n_files = 0;
640 }
641
642 /* Frees all the data for the procedure. */
643 static void
644 free_comb_proc (struct comb_proc *proc)
645 {
646   close_all_comb_files (proc);
647   dict_destroy (proc->dict);
648   casewriter_destroy (proc->output);
649   case_matcher_destroy (proc->matcher);
650   if (proc->prev_BY)
651     {
652       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
653                                 proc->prev_BY);
654       free (proc->prev_BY);
655     }
656   subcase_destroy (&proc->by_vars);
657   case_unref (proc->buffered_case);
658 }
659 \f
660 static bool scan_table (struct comb_file *, union value by[]);
661 static struct ccase *create_output_case (const struct comb_proc *);
662 static void apply_case (const struct comb_file *, struct ccase *);
663 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
664                                          union value by[]);
665 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
666 static void output_buffered_case (struct comb_proc *);
667
668 /* Executes the ADD FILES command. */
669 static void
670 execute_add_files (struct comb_proc *proc)
671 {
672   union value *by;
673
674   while (case_matcher_match (proc->matcher, &by))
675     {
676       size_t i;
677
678       for (i = 0; i < proc->n_files; i++)
679         {
680           struct comb_file *file = &proc->files[i];
681           while (file->is_minimal)
682             {
683               struct ccase *output = create_output_case (proc);
684               apply_file_case_and_advance (file, output, by);
685               output_case (proc, output, by);
686             }
687         }
688     }
689   output_buffered_case (proc);
690 }
691
692 /* Executes the MATCH FILES command. */
693 static void
694 execute_match_files (struct comb_proc *proc)
695 {
696   union value *by;
697
698   while (case_matcher_match (proc->matcher, &by))
699     {
700       struct ccase *output;
701       size_t i;
702
703       output = create_output_case (proc);
704       for (i = proc->n_files; i-- > 0; )
705         {
706           struct comb_file *file = &proc->files[i];
707           if (file->type == COMB_FILE)
708             {
709               if (file->is_minimal)
710                 apply_file_case_and_advance (file, output, NULL);
711             }
712           else
713             {
714               if (scan_table (file, by))
715                 apply_case (file, output);
716             }
717         }
718       output_case (proc, output, by);
719     }
720   output_buffered_case (proc);
721 }
722
723 /* Executes the UPDATE command. */
724 static void
725 execute_update (struct comb_proc *proc)
726 {
727   union value *by;
728   size_t n_duplicates = 0;
729
730   while (case_matcher_match (proc->matcher, &by))
731     {
732       struct comb_file *first, *file;
733       struct ccase *output;
734
735       /* Find first nonnull case in array and make an output case
736          from it. */
737       output = create_output_case (proc);
738       for (first = &proc->files[0]; ; first++)
739         if (first->is_minimal)
740           break;
741       apply_file_case_and_advance (first, output, by);
742
743       /* Read additional cases and update the output case from
744          them.  (Don't update the output case from any duplicate
745          cases in the master file.) */
746       for (file = first + (first == proc->files);
747            file < &proc->files[proc->n_files]; file++)
748         {
749           while (file->is_minimal)
750             apply_file_case_and_advance (file, output, by);
751         }
752       casewriter_write (proc->output, output);
753
754       /* Write duplicate cases in the master file directly to the
755          output.  */
756       if (first == proc->files && first->is_minimal)
757         {
758           n_duplicates++;
759           while (first->is_minimal)
760             {
761               output = create_output_case (proc);
762               apply_file_case_and_advance (first, output, by);
763               casewriter_write (proc->output, output);
764             }
765         }
766     }
767
768   if (n_duplicates)
769     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
770          n_duplicates);
771 }
772
773 /* Reads FILE, which must be of type COMB_TABLE, until it
774    encounters a case with BY or greater for its BY variables.
775    Returns true if a case with exactly BY for its BY variables
776    was found, otherwise false. */
777 static bool
778 scan_table (struct comb_file *file, union value by[])
779 {
780   while (file->data != NULL)
781     {
782       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
783       if (cmp > 0)
784         {
785           case_unref (file->data);
786           file->data = casereader_read (file->reader);
787         }
788       else
789         return cmp == 0;
790     }
791   return false;
792 }
793
794 /* Creates and returns an output case for PROC, initializing each
795    of its values to system-missing or blanks, except that the
796    values of IN variables are set to 0. */
797 static struct ccase *
798 create_output_case (const struct comb_proc *proc)
799 {
800   size_t n_vars = dict_get_var_cnt (proc->dict);
801   struct ccase *output;
802   size_t i;
803
804   output = case_create (dict_get_proto (proc->dict));
805   for (i = 0; i < n_vars; i++)
806     {
807       struct variable *v = dict_get_var (proc->dict, i);
808       value_set_missing (case_data_rw (output, v), var_get_width (v));
809     }
810   for (i = 0; i < proc->n_files; i++)
811     {
812       struct comb_file *file = &proc->files[i];
813       if (file->in_var != NULL)
814         case_data_rw (output, file->in_var)->f = false;
815     }
816   return output;
817 }
818
819 /* Copies the data from FILE's case into output case OUTPUT.
820    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
821 static void
822 apply_case (const struct comb_file *file, struct ccase *output)
823 {
824   subcase_copy (&file->src, file->data, &file->dst, output);
825   if (file->in_var != NULL)
826     case_data_rw (output, file->in_var)->f = true;
827 }
828
829 /* Like apply_case() above, but also advances FILE to its next
830    case.  Also, if BY is nonnull, then FILE's is_minimal member
831    is updated based on whether the new case's BY values still
832    match those in BY. */
833 static void
834 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
835                              union value by[])
836 {
837   apply_case (file, output);
838   case_unref (file->data);
839   file->data = casereader_read (file->reader);
840   if (by)
841     file->is_minimal = (file->data != NULL
842                         && subcase_equal_cx (&file->by_vars, file->data, by));
843 }
844
845 /* Writes OUTPUT, whose BY values has been extracted into BY, to
846    PROC's output file, first initializing any FIRST or LAST
847    variables in OUTPUT to the correct values. */
848 static void
849 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
850 {
851   if (proc->first == NULL && proc->last == NULL)
852     casewriter_write (proc->output, output);
853   else
854     {
855       /* It's harder with LAST, because we can't know whether
856          this case is the last in a group until we've prepared
857          the *next* case also.  Thus, we buffer the previous
858          output case until the next one is ready. */
859       bool new_BY;
860       if (proc->prev_BY != NULL)
861         {
862           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
863           if (proc->last != NULL)
864             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
865           casewriter_write (proc->output, proc->buffered_case);
866         }
867       else
868         new_BY = true;
869
870       proc->buffered_case = output;
871       if (proc->first != NULL)
872         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
873
874       if (new_BY)
875         {
876           size_t n_values = subcase_get_n_fields (&proc->by_vars);
877           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
878           if (proc->prev_BY == NULL)
879             {
880               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
881               caseproto_init_values (proto, proc->prev_BY);
882             }
883           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
884                           proc->prev_BY, by);
885         }
886     }
887 }
888
889 /* Writes a trailing buffered case to the output, if FIRST or
890    LAST is in use. */
891 static void
892 output_buffered_case (struct comb_proc *proc)
893 {
894   if (proc->prev_BY != NULL)
895     {
896       if (proc->last != NULL)
897         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
898       casewriter_write (proc->output, proc->buffered_case);
899       proc->buffered_case = NULL;
900     }
901 }