combine-files: Break apart apply_file_case_and_advance().
[pspp] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/xalloc.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 enum comb_command_type
50   {
51     COMB_ADD,
52     COMB_MATCH,
53     COMB_UPDATE
54   };
55
56 /* File types. */
57 enum comb_file_type
58   {
59     COMB_FILE,                  /* Specified on FILE= subcommand. */
60     COMB_TABLE                  /* Specified on TABLE= subcommand. */
61   };
62
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
65   {
66     /* Basics. */
67     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
68
69     /* Variables. */
70     struct subcase by_vars;     /* BY variables in this input file. */
71     struct subcase src, dst;    /* Data to copy to output; where to put it. */
72
73     /* Input files. */
74     struct file_handle *handle; /* Input file handle. */
75     struct dictionary *dict;    /* Input file dictionary. */
76     struct casereader *reader;  /* Input data source. */
77     struct ccase *data;         /* The current input case. */
78     bool is_minimal;            /* Does 'data' have minimum BY values across
79                                    all input files? */
80     bool is_sorted;             /* Is file presorted on the BY variables? */
81
82     /* IN subcommand. */
83     char *in_name;
84     struct variable *in_var;
85   };
86
87 struct comb_proc
88   {
89     struct comb_file *files;    /* All the files being merged. */
90     size_t n_files;             /* Number of files. */
91
92     struct dictionary *dict;    /* Dictionary of output file. */
93     struct subcase by_vars;     /* BY variables in the output. */
94     struct casewriter *output;  /* Destination for output. */
95
96     struct case_matcher *matcher;
97
98     /* FIRST, LAST.
99        Only if "first" or "last" is nonnull are the remaining
100        members used. */
101     struct variable *first;     /* Variable specified on FIRST (if any). */
102     struct variable *last;      /* Variable specified on LAST (if any). */
103     struct ccase *buffered_case; /* Case ready for output except that we don't
104                                     know the value for the LAST var yet. */
105     union value *prev_BY;       /* Values of BY vars in buffered_case. */
106   };
107
108 static int combine_files (enum comb_command_type, struct lexer *,
109                           struct dataset *);
110 static void free_comb_proc (struct comb_proc *);
111
112 static void close_all_comb_files (struct comb_proc *);
113 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
114
115 static void execute_update (struct comb_proc *);
116 static void execute_match_files (struct comb_proc *);
117 static void execute_add_files (struct comb_proc *);
118
119 static bool create_flag_var (const char *subcommand_name, const char *var_name,
120                              struct dictionary *, struct variable **);
121 static void output_case (struct comb_proc *, struct ccase *, union value *by);
122 static void output_buffered_case (struct comb_proc *);
123
124 int
125 cmd_add_files (struct lexer *lexer, struct dataset *ds)
126 {
127   return combine_files (COMB_ADD, lexer, ds);
128 }
129
130 int
131 cmd_match_files (struct lexer *lexer, struct dataset *ds)
132 {
133   return combine_files (COMB_MATCH, lexer, ds);
134 }
135
136 int
137 cmd_update (struct lexer *lexer, struct dataset *ds)
138 {
139   return combine_files (COMB_UPDATE, lexer, ds);
140 }
141
142 static int
143 combine_files (enum comb_command_type command,
144                struct lexer *lexer, struct dataset *ds)
145 {
146   struct comb_proc proc;
147
148   bool saw_by = false;
149   bool saw_sort = false;
150   struct casereader *active_file = NULL;
151
152   char *first_name = NULL;
153   char *last_name = NULL;
154
155   struct taint *taint = NULL;
156
157   size_t n_tables = 0;
158   size_t allocated_files = 0;
159
160   size_t i;
161
162   proc.files = NULL;
163   proc.n_files = 0;
164   proc.dict = dict_create (get_default_encoding ());
165   proc.output = NULL;
166   proc.matcher = NULL;
167   subcase_init_empty (&proc.by_vars);
168   proc.first = NULL;
169   proc.last = NULL;
170   proc.buffered_case = NULL;
171   proc.prev_BY = NULL;
172
173   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
174
175   lex_match (lexer, T_SLASH);
176   for (;;)
177     {
178       struct comb_file *file;
179       enum comb_file_type type;
180
181       if (lex_match_id (lexer, "FILE"))
182         type = COMB_FILE;
183       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
184         {
185           type = COMB_TABLE;
186           n_tables++;
187         }
188       else
189         break;
190       lex_match (lexer, T_EQUALS);
191
192       if (proc.n_files >= allocated_files)
193         proc.files = x2nrealloc (proc.files, &allocated_files,
194                                 sizeof *proc.files);
195       file = &proc.files[proc.n_files++];
196       file->type = type;
197       subcase_init_empty (&file->by_vars);
198       subcase_init_empty (&file->src);
199       subcase_init_empty (&file->dst);
200       file->handle = NULL;
201       file->dict = NULL;
202       file->reader = NULL;
203       file->data = NULL;
204       file->is_sorted = true;
205       file->in_name = NULL;
206       file->in_var = NULL;
207
208       if (lex_match (lexer, T_ASTERISK))
209         {
210           if (!dataset_has_source (ds))
211             {
212               msg (SE, _("Cannot specify the active dataset since none "
213                          "has been defined."));
214               goto error;
215             }
216
217           if (proc_make_temporary_transformations_permanent (ds))
218             msg (SE, _("This command may not be used after TEMPORARY when "
219                        "the active dataset is an input source.  "
220                        "Temporary transformations will be made permanent."));
221
222           file->dict = dict_clone (dataset_dict (ds));
223         }
224       else
225         {
226           file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
227           if (file->handle == NULL)
228             goto error;
229
230           file->reader = any_reader_open (file->handle, NULL, &file->dict);
231           if (file->reader == NULL)
232             goto error;
233         }
234
235       while (lex_match (lexer, T_SLASH))
236         if (lex_match_id (lexer, "RENAME"))
237           {
238             if (!parse_dict_rename (lexer, file->dict))
239               goto error;
240           }
241         else if (lex_match_id (lexer, "IN"))
242           {
243             lex_match (lexer, T_EQUALS);
244             if (lex_token (lexer) != T_ID)
245               {
246                 lex_error (lexer, NULL);
247                 goto error;
248               }
249
250             if (file->in_name)
251               {
252                 msg (SE, _("Multiple IN subcommands for a single FILE or "
253                            "TABLE."));
254                 goto error;
255               }
256             file->in_name = xstrdup (lex_tokcstr (lexer));
257             lex_get (lexer);
258           }
259         else if (lex_match_id (lexer, "SORT"))
260           {
261             file->is_sorted = false;
262             saw_sort = true;
263           }
264
265       merge_dictionary (proc.dict, file);
266     }
267
268   while (lex_token (lexer) != T_ENDCMD)
269     {
270       if (lex_match (lexer, T_BY))
271         {
272           const struct variable **by_vars;
273           size_t i;
274           bool ok;
275
276           if (saw_by)
277             {
278               lex_sbc_only_once ("BY");
279               goto error;
280             }
281           saw_by = true;
282
283           lex_match (lexer, T_EQUALS);
284           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
285                                     &by_vars, NULL))
286             goto error;
287
288           ok = true;
289           for (i = 0; i < proc.n_files; i++)
290             {
291               struct comb_file *file = &proc.files[i];
292               size_t j;
293
294               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
295                 {
296                   const char *name = var_get_name (by_vars[j]);
297                   struct variable *var = dict_lookup_var (file->dict, name);
298                   if (var != NULL)
299                     subcase_add_var (&file->by_vars, var,
300                                      subcase_get_direction (&proc.by_vars, j));
301                   else
302                     {
303                       if (file->handle != NULL)
304                         msg (SE, _("File %s lacks BY variable %s."),
305                              fh_get_name (file->handle), name);
306                       else
307                         msg (SE, _("Active dataset lacks BY variable %s."),
308                              name);
309                       ok = false;
310                     }
311                 }
312               assert (!ok || subcase_conformable (&file->by_vars,
313                                                   &proc.files[0].by_vars));
314             }
315           free (by_vars);
316
317           if (!ok)
318             goto error;
319         }
320       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
321         {
322           if (first_name != NULL)
323             {
324               lex_sbc_only_once ("FIRST");
325               goto error;
326             }
327
328           lex_match (lexer, T_EQUALS);
329           if (!lex_force_id (lexer))
330             goto error;
331           first_name = xstrdup (lex_tokcstr (lexer));
332           lex_get (lexer);
333         }
334       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
335         {
336           if (last_name != NULL)
337             {
338               lex_sbc_only_once ("LAST");
339               goto error;
340             }
341
342           lex_match (lexer, T_EQUALS);
343           if (!lex_force_id (lexer))
344             goto error;
345           last_name = xstrdup (lex_tokcstr (lexer));
346           lex_get (lexer);
347         }
348       else if (lex_match_id (lexer, "MAP"))
349         {
350           /* FIXME. */
351         }
352       else if (lex_match_id (lexer, "DROP"))
353         {
354           if (!parse_dict_drop (lexer, proc.dict))
355             goto error;
356         }
357       else if (lex_match_id (lexer, "KEEP"))
358         {
359           if (!parse_dict_keep (lexer, proc.dict))
360             goto error;
361         }
362       else
363         {
364           lex_error (lexer, NULL);
365           goto error;
366         }
367
368       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
369         {
370           lex_end_of_command (lexer);
371           goto error;
372         }
373     }
374
375   if (!saw_by)
376     {
377       if (command == COMB_UPDATE)
378         {
379           lex_sbc_missing ("BY");
380           goto error;
381         }
382       if (n_tables)
383         {
384           msg (SE, _("BY is required when %s is specified."), "TABLE");
385           goto error;
386         }
387       if (saw_sort)
388         {
389           msg (SE, _("BY is required when %s is specified."), "SORT");
390           goto error;
391         }
392     }
393
394   /* Add IN, FIRST, and LAST variables to master dictionary. */
395   for (i = 0; i < proc.n_files; i++)
396     {
397       struct comb_file *file = &proc.files[i];
398       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
399         goto error;
400     }
401   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
402       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
403     goto error;
404
405   dict_delete_scratch_vars (proc.dict);
406   dict_compact_values (proc.dict);
407
408   /* Set up mapping from each file's variables to master
409      variables. */
410   for (i = 0; i < proc.n_files; i++)
411     {
412       struct comb_file *file = &proc.files[i];
413       size_t src_var_cnt = dict_get_var_cnt (file->dict);
414       size_t j;
415
416       for (j = 0; j < src_var_cnt; j++)
417         {
418           struct variable *src_var = dict_get_var (file->dict, j);
419           struct variable *dst_var = dict_lookup_var (proc.dict,
420                                                       var_get_name (src_var));
421           if (dst_var != NULL)
422             {
423               subcase_add_var (&file->src, src_var, SC_ASCEND);
424               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
425             }
426         }
427     }
428
429   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
430   taint = taint_clone (casewriter_get_taint (proc.output));
431
432   /* Set up case matcher. */
433   proc.matcher = case_matcher_create ();
434   for (i = 0; i < proc.n_files; i++)
435     {
436       struct comb_file *file = &proc.files[i];
437       if (file->reader == NULL)
438         {
439           if (active_file == NULL)
440             {
441               proc_discard_output (ds);
442               file->reader = active_file = proc_open (ds);
443             }
444           else
445             file->reader = casereader_clone (active_file);
446         }
447       if (!file->is_sorted)
448         file->reader = sort_execute (file->reader, &file->by_vars);
449       taint_propagate (casereader_get_taint (file->reader), taint);
450       file->data = casereader_read (file->reader);
451       if (file->type == COMB_FILE)
452         case_matcher_add_input (proc.matcher, &file->by_vars,
453                                 &file->data, &file->is_minimal);
454     }
455
456   if (command == COMB_ADD)
457     execute_add_files (&proc);
458   else if (command == COMB_MATCH)
459     execute_match_files (&proc);
460   else if (command == COMB_UPDATE)
461     execute_update (&proc);
462   else
463     NOT_REACHED ();
464
465   case_matcher_destroy (proc.matcher);
466   proc.matcher = NULL;
467   close_all_comb_files (&proc);
468   if (active_file != NULL)
469     proc_commit (ds);
470
471   dataset_set_dict (ds, proc.dict);
472   dataset_set_source (ds, casewriter_make_reader (proc.output));
473   proc.dict = NULL;
474   proc.output = NULL;
475
476   free_comb_proc (&proc);
477
478   free (first_name);
479   free (last_name);
480
481   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
482
483  error:
484   if (active_file != NULL)
485     proc_commit (ds);
486   free_comb_proc (&proc);
487   taint_destroy (taint);
488   free (first_name);
489   free (last_name);
490   return CMD_CASCADING_FAILURE;
491 }
492
493 /* Merge the dictionary for file F into master dictionary M. */
494 static bool
495 merge_dictionary (struct dictionary *const m, struct comb_file *f)
496 {
497   struct dictionary *d = f->dict;
498   const struct string_array *d_docs, *m_docs;
499   int i;
500
501   if (dict_get_label (m) == NULL)
502     dict_set_label (m, dict_get_label (d));
503
504   d_docs = dict_get_documents (d);
505   m_docs = dict_get_documents (m);
506
507
508   /* FIXME: If the input files have different encodings, then
509      the result is undefined.
510      The correct thing to do would be to convert to an encoding
511      which can cope with all the input files (eg UTF-8).
512    */
513   if ( 0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
514     msg (MW, _("Combining files with incompatible encodings. String data may "
515                "not be represented correctly."));
516
517   if (d_docs != NULL)
518     {
519       if (m_docs == NULL)
520         dict_set_documents (m, d_docs);
521       else
522         {
523           struct string_array new_docs;
524           size_t i;
525
526           new_docs.n = m_docs->n + d_docs->n;
527           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
528           for (i = 0; i < m_docs->n; i++)
529             new_docs.strings[i] = m_docs->strings[i];
530           for (i = 0; i < d_docs->n; i++)
531             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
532
533           dict_set_documents (m, &new_docs);
534
535           free (new_docs.strings);
536         }
537     }
538
539   for (i = 0; i < dict_get_var_cnt (d); i++)
540     {
541       struct variable *dv = dict_get_var (d, i);
542       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
543
544       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
545         continue;
546
547       if (mv != NULL)
548         {
549           if (var_get_width (mv) != var_get_width (dv))
550             {
551               const char *var_name = var_get_name (dv);
552               const char *file_name = fh_get_name (f->handle);
553               struct string s = DS_EMPTY_INITIALIZER;
554               ds_put_format (&s,
555                              _("Variable %s in file %s has different "
556                                "type or width from the same variable in "
557                                "earlier file."),
558                              var_name, file_name);
559               ds_put_cstr (&s, "  ");
560               if (var_is_numeric (dv))
561                 ds_put_format (&s, _("In file %s, %s is numeric."),
562                                file_name, var_name);
563               else
564                 ds_put_format (&s, _("In file %s, %s is a string variable "
565                                      "with width %d."),
566                                file_name, var_name, var_get_width (dv));
567               ds_put_cstr (&s, "  ");
568               if (var_is_numeric (mv))
569                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
570                                var_name);
571               else
572                 ds_put_format (&s, _("In an earlier file, %s was a string "
573                                      "variable with width %d."),
574                                var_name, var_get_width (mv));
575               msg (SE, "%s", ds_cstr (&s));
576               ds_destroy (&s);
577               return false;
578             }
579
580           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
581             var_set_value_labels (mv, var_get_value_labels (dv));
582           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
583             var_set_missing_values (mv, var_get_missing_values (dv));
584           if (var_get_label (dv) && !var_get_label (mv))
585             var_set_label (mv, var_get_label (dv), false);
586         }
587       else
588         mv = dict_clone_var_assert (m, dv);
589     }
590
591   return true;
592 }
593
594 /* If VAR_NAME is non-NULL, attempts to create a
595    variable named VAR_NAME, with format F1.0, in DICT, and stores
596    a pointer to the variable in *VAR.  Returns true if
597    successful, false if the variable name is a duplicate (in
598    which case a message saying that the variable specified on the
599    given SUBCOMMAND is a duplicate is emitted).
600
601    Does nothing and returns true if VAR_NAME is null. */
602 static bool
603 create_flag_var (const char *subcommand, const char *var_name,
604                  struct dictionary *dict, struct variable **var)
605 {
606   if (var_name != NULL)
607     {
608       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
609       *var = dict_create_var (dict, var_name, 0);
610       if (*var == NULL)
611         {
612           msg (SE, _("Variable name %s specified on %s subcommand "
613                      "duplicates an existing variable name."),
614                subcommand, var_name);
615           return false;
616         }
617       var_set_both_formats (*var, &format);
618     }
619   else
620     *var = NULL;
621   return true;
622 }
623
624 /* Closes all the files in PROC and frees their associated data. */
625 static void
626 close_all_comb_files (struct comb_proc *proc)
627 {
628   size_t i;
629
630   for (i = 0; i < proc->n_files; i++)
631     {
632       struct comb_file *file = &proc->files[i];
633       subcase_destroy (&file->by_vars);
634       subcase_destroy (&file->src);
635       subcase_destroy (&file->dst);
636       fh_unref (file->handle);
637       dict_destroy (file->dict);
638       casereader_destroy (file->reader);
639       case_unref (file->data);
640       free (file->in_name);
641     }
642   free (proc->files);
643   proc->files = NULL;
644   proc->n_files = 0;
645 }
646
647 /* Frees all the data for the procedure. */
648 static void
649 free_comb_proc (struct comb_proc *proc)
650 {
651   close_all_comb_files (proc);
652   dict_destroy (proc->dict);
653   casewriter_destroy (proc->output);
654   case_matcher_destroy (proc->matcher);
655   if (proc->prev_BY)
656     {
657       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
658                                 proc->prev_BY);
659       free (proc->prev_BY);
660     }
661   subcase_destroy (&proc->by_vars);
662   case_unref (proc->buffered_case);
663 }
664 \f
665 static bool scan_table (struct comb_file *, union value by[]);
666 static struct ccase *create_output_case (const struct comb_proc *);
667 static void apply_case (const struct comb_file *, struct ccase *);
668 static void advance_file (struct comb_file *, union value by[]);
669 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
670 static void output_buffered_case (struct comb_proc *);
671
672 /* Executes the ADD FILES command. */
673 static void
674 execute_add_files (struct comb_proc *proc)
675 {
676   union value *by;
677
678   while (case_matcher_match (proc->matcher, &by))
679     {
680       size_t i;
681
682       for (i = 0; i < proc->n_files; i++)
683         {
684           struct comb_file *file = &proc->files[i];
685           while (file->is_minimal)
686             {
687               struct ccase *output = create_output_case (proc);
688               apply_case (file, output);
689               advance_file (file, by);
690               output_case (proc, output, by);
691             }
692         }
693     }
694   output_buffered_case (proc);
695 }
696
697 /* Executes the MATCH FILES command. */
698 static void
699 execute_match_files (struct comb_proc *proc)
700 {
701   union value *by;
702
703   while (case_matcher_match (proc->matcher, &by))
704     {
705       struct ccase *output;
706       size_t i;
707
708       output = create_output_case (proc);
709       for (i = proc->n_files; i-- > 0; )
710         {
711           struct comb_file *file = &proc->files[i];
712           if (file->type == COMB_FILE)
713             {
714               if (file->is_minimal)
715                 {
716                   apply_case (file, output);
717                   advance_file (file, NULL);
718                 }
719             }
720           else
721             {
722               if (scan_table (file, by))
723                 apply_case (file, output);
724             }
725         }
726       output_case (proc, output, by);
727     }
728   output_buffered_case (proc);
729 }
730
731 /* Executes the UPDATE command. */
732 static void
733 execute_update (struct comb_proc *proc)
734 {
735   union value *by;
736   size_t n_duplicates = 0;
737
738   while (case_matcher_match (proc->matcher, &by))
739     {
740       struct comb_file *first, *file;
741       struct ccase *output;
742
743       /* Find first nonnull case in array and make an output case
744          from it. */
745       output = create_output_case (proc);
746       for (first = &proc->files[0]; ; first++)
747         if (first->is_minimal)
748           break;
749       apply_case (first, output);
750       advance_file (first, by);
751
752       /* Read additional cases and update the output case from
753          them.  (Don't update the output case from any duplicate
754          cases in the master file.) */
755       for (file = first + (first == proc->files);
756            file < &proc->files[proc->n_files]; file++)
757         {
758           while (file->is_minimal)
759             {
760               apply_case (file, output);
761               advance_file (file, by);
762             }
763         }
764       casewriter_write (proc->output, output);
765
766       /* Write duplicate cases in the master file directly to the
767          output.  */
768       if (first == proc->files && first->is_minimal)
769         {
770           n_duplicates++;
771           while (first->is_minimal)
772             {
773               output = create_output_case (proc);
774               apply_case (first, output);
775               advance_file (first, by);
776               casewriter_write (proc->output, output);
777             }
778         }
779     }
780
781   if (n_duplicates)
782     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
783          n_duplicates);
784 }
785
786 /* Reads FILE, which must be of type COMB_TABLE, until it
787    encounters a case with BY or greater for its BY variables.
788    Returns true if a case with exactly BY for its BY variables
789    was found, otherwise false. */
790 static bool
791 scan_table (struct comb_file *file, union value by[])
792 {
793   while (file->data != NULL)
794     {
795       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
796       if (cmp > 0)
797         {
798           case_unref (file->data);
799           file->data = casereader_read (file->reader);
800         }
801       else
802         return cmp == 0;
803     }
804   return false;
805 }
806
807 /* Creates and returns an output case for PROC, initializing each
808    of its values to system-missing or blanks, except that the
809    values of IN variables are set to 0. */
810 static struct ccase *
811 create_output_case (const struct comb_proc *proc)
812 {
813   size_t n_vars = dict_get_var_cnt (proc->dict);
814   struct ccase *output;
815   size_t i;
816
817   output = case_create (dict_get_proto (proc->dict));
818   for (i = 0; i < n_vars; i++)
819     {
820       struct variable *v = dict_get_var (proc->dict, i);
821       value_set_missing (case_data_rw (output, v), var_get_width (v));
822     }
823   for (i = 0; i < proc->n_files; i++)
824     {
825       struct comb_file *file = &proc->files[i];
826       if (file->in_var != NULL)
827         case_data_rw (output, file->in_var)->f = false;
828     }
829   return output;
830 }
831
832 /* Copies the data from FILE's case into output case OUTPUT.
833    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
834 static void
835 apply_case (const struct comb_file *file, struct ccase *output)
836 {
837   subcase_copy (&file->src, file->data, &file->dst, output);
838   if (file->in_var != NULL)
839     case_data_rw (output, file->in_var)->f = true;
840 }
841
842 /* Advances FILE to its next case.  If BY is nonnull, then FILE's is_minimal
843    member is updated based on whether the new case's BY values still match
844    those in BY. */
845 static void
846 advance_file (struct comb_file *file, union value by[])
847 {
848   case_unref (file->data);
849   file->data = casereader_read (file->reader);
850   if (by)
851     file->is_minimal = (file->data != NULL
852                         && subcase_equal_cx (&file->by_vars, file->data, by));
853 }
854
855 /* Writes OUTPUT, whose BY values has been extracted into BY, to
856    PROC's output file, first initializing any FIRST or LAST
857    variables in OUTPUT to the correct values. */
858 static void
859 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
860 {
861   if (proc->first == NULL && proc->last == NULL)
862     casewriter_write (proc->output, output);
863   else
864     {
865       /* It's harder with LAST, because we can't know whether
866          this case is the last in a group until we've prepared
867          the *next* case also.  Thus, we buffer the previous
868          output case until the next one is ready. */
869       bool new_BY;
870       if (proc->prev_BY != NULL)
871         {
872           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
873           if (proc->last != NULL)
874             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
875           casewriter_write (proc->output, proc->buffered_case);
876         }
877       else
878         new_BY = true;
879
880       proc->buffered_case = output;
881       if (proc->first != NULL)
882         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
883
884       if (new_BY)
885         {
886           size_t n_values = subcase_get_n_fields (&proc->by_vars);
887           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
888           if (proc->prev_BY == NULL)
889             {
890               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
891               caseproto_init_values (proto, proc->prev_BY);
892             }
893           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
894                           proc->prev_BY, by);
895         }
896     }
897 }
898
899 /* Writes a trailing buffered case to the output, if FIRST or
900    LAST is in use. */
901 static void
902 output_buffered_case (struct comb_proc *proc)
903 {
904   if (proc->prev_BY != NULL)
905     {
906       if (proc->last != NULL)
907         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
908       casewriter_write (proc->output, proc->buffered_case);
909       proc->buffered_case = NULL;
910     }
911 }