b4f06655ca1ff2de8addf70e915ab051f4a0fa03
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
43
44 #include "gl/xalloc.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 enum comb_command_type
50   {
51     COMB_ADD,
52     COMB_MATCH,
53     COMB_UPDATE
54   };
55
56 /* File types. */
57 enum comb_file_type
58   {
59     COMB_FILE,                  /* Specified on FILE= subcommand. */
60     COMB_TABLE                  /* Specified on TABLE= subcommand. */
61   };
62
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
65   {
66     /* Basics. */
67     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
68
69     /* Variables. */
70     struct subcase by_vars;     /* BY variables in this input file. */
71     struct subcase src, dst;    /* Data to copy to output; where to put it. */
72
73     /* Input files. */
74     struct file_handle *handle; /* Input file handle. */
75     struct dictionary *dict;    /* Input file dictionary. */
76     struct casereader *reader;  /* Input data source. */
77     struct ccase *data;         /* The current input case. */
78     bool is_minimal;            /* Does 'data' have minimum BY values across
79                                    all input files? */
80     bool is_sorted;             /* Is file presorted on the BY variables? */
81
82     /* IN subcommand. */
83     char *in_name;
84     struct variable *in_var;
85   };
86
87 struct comb_proc
88   {
89     struct comb_file *files;    /* All the files being merged. */
90     size_t n_files;             /* Number of files. */
91
92     struct dictionary *dict;    /* Dictionary of output file. */
93     struct subcase by_vars;     /* BY variables in the output. */
94     struct casewriter *output;  /* Destination for output. */
95
96     struct case_matcher *matcher;
97
98     /* FIRST, LAST.
99        Only if "first" or "last" is nonnull are the remaining
100        members used. */
101     struct variable *first;     /* Variable specified on FIRST (if any). */
102     struct variable *last;      /* Variable specified on LAST (if any). */
103     struct ccase *buffered_case; /* Case ready for output except that we don't
104                                     know the value for the LAST var yet. */
105     union value *prev_BY;       /* Values of BY vars in buffered_case. */
106   };
107
108 static int combine_files (enum comb_command_type, struct lexer *,
109                           struct dataset *);
110 static void free_comb_proc (struct comb_proc *);
111
112 static void close_all_comb_files (struct comb_proc *);
113 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
114
115 static void execute_update (struct comb_proc *);
116 static void execute_match_files (struct comb_proc *);
117 static void execute_add_files (struct comb_proc *);
118
119 static bool create_flag_var (const char *subcommand_name, const char *var_name,
120                              struct dictionary *, struct variable **);
121 static void output_case (struct comb_proc *, struct ccase *, union value *by);
122 static void output_buffered_case (struct comb_proc *);
123
124 int
125 cmd_add_files (struct lexer *lexer, struct dataset *ds)
126 {
127   return combine_files (COMB_ADD, lexer, ds);
128 }
129
130 int
131 cmd_match_files (struct lexer *lexer, struct dataset *ds)
132 {
133   return combine_files (COMB_MATCH, lexer, ds);
134 }
135
136 int
137 cmd_update (struct lexer *lexer, struct dataset *ds)
138 {
139   return combine_files (COMB_UPDATE, lexer, ds);
140 }
141
142 static int
143 combine_files (enum comb_command_type command,
144                struct lexer *lexer, struct dataset *ds)
145 {
146   struct comb_proc proc;
147
148   bool saw_by = false;
149   bool saw_sort = false;
150   struct casereader *active_file = NULL;
151
152   char *first_name = NULL;
153   char *last_name = NULL;
154
155   struct taint *taint = NULL;
156
157   size_t n_tables = 0;
158   size_t allocated_files = 0;
159
160   size_t i;
161
162   proc.files = NULL;
163   proc.n_files = 0;
164   proc.dict = dict_create (get_default_encoding ());
165   proc.output = NULL;
166   proc.matcher = NULL;
167   subcase_init_empty (&proc.by_vars);
168   proc.first = NULL;
169   proc.last = NULL;
170   proc.buffered_case = NULL;
171   proc.prev_BY = NULL;
172
173   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
174
175   lex_match (lexer, T_SLASH);
176   for (;;)
177     {
178       struct comb_file *file;
179       enum comb_file_type type;
180
181       if (lex_match_id (lexer, "FILE"))
182         type = COMB_FILE;
183       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
184         {
185           type = COMB_TABLE;
186           n_tables++;
187         }
188       else
189         break;
190       lex_match (lexer, T_EQUALS);
191
192       if (proc.n_files >= allocated_files)
193         proc.files = x2nrealloc (proc.files, &allocated_files,
194                                 sizeof *proc.files);
195       file = &proc.files[proc.n_files++];
196       file->type = type;
197       subcase_init_empty (&file->by_vars);
198       subcase_init_empty (&file->src);
199       subcase_init_empty (&file->dst);
200       file->handle = NULL;
201       file->dict = NULL;
202       file->reader = NULL;
203       file->data = NULL;
204       file->is_sorted = true;
205       file->in_name = NULL;
206       file->in_var = NULL;
207
208       if (lex_match (lexer, T_ASTERISK))
209         {
210           if (!dataset_has_source (ds))
211             {
212               msg (SE, _("Cannot specify the active dataset since none "
213                          "has been defined."));
214               goto error;
215             }
216
217           if (proc_make_temporary_transformations_permanent (ds))
218             msg (SE, _("This command may not be used after TEMPORARY when "
219                        "the active dataset is an input source.  "
220                        "Temporary transformations will be made permanent."));
221
222           file->dict = dict_clone (dataset_dict (ds));
223         }
224       else
225         {
226           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
227           if (file->handle == NULL)
228             goto error;
229
230           file->reader = any_reader_open (file->handle, &file->dict);
231           if (file->reader == NULL)
232             goto error;
233         }
234
235       while (lex_match (lexer, T_SLASH))
236         if (lex_match_id (lexer, "RENAME"))
237           {
238             if (!parse_dict_rename (lexer, file->dict))
239               goto error;
240           }
241         else if (lex_match_id (lexer, "IN"))
242           {
243             lex_match (lexer, T_EQUALS);
244             if (lex_token (lexer) != T_ID)
245               {
246                 lex_error (lexer, NULL);
247                 goto error;
248               }
249
250             if (file->in_name)
251               {
252                 msg (SE, _("Multiple IN subcommands for a single FILE or "
253                            "TABLE."));
254                 goto error;
255               }
256             file->in_name = xstrdup (lex_tokcstr (lexer));
257             lex_get (lexer);
258           }
259         else if (lex_match_id (lexer, "SORT"))
260           {
261             file->is_sorted = false;
262             saw_sort = true;
263           }
264
265       merge_dictionary (proc.dict, file);
266     }
267
268   while (lex_token (lexer) != T_ENDCMD)
269     {
270       if (lex_match (lexer, T_BY))
271         {
272           const struct variable **by_vars;
273           size_t i;
274           bool ok;
275
276           if (saw_by)
277             {
278               lex_sbc_only_once ("BY");
279               goto error;
280             }
281           saw_by = true;
282
283           lex_match (lexer, T_EQUALS);
284           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
285                                     &by_vars, NULL))
286             goto error;
287
288           ok = true;
289           for (i = 0; i < proc.n_files; i++)
290             {
291               struct comb_file *file = &proc.files[i];
292               size_t j;
293
294               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
295                 {
296                   const char *name = var_get_name (by_vars[j]);
297                   struct variable *var = dict_lookup_var (file->dict, name);
298                   if (var != NULL)
299                     subcase_add_var (&file->by_vars, var,
300                                      subcase_get_direction (&proc.by_vars, j));
301                   else
302                     {
303                       if (file->handle != NULL)
304                         msg (SE, _("File %s lacks BY variable %s."),
305                              fh_get_name (file->handle), name);
306                       else
307                         msg (SE, _("Active dataset lacks BY variable %s."),
308                              name);
309                       ok = false;
310                     }
311                 }
312               assert (!ok || subcase_conformable (&file->by_vars,
313                                                   &proc.files[0].by_vars));
314             }
315           free (by_vars);
316
317           if (!ok)
318             goto error;
319         }
320       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
321         {
322           if (first_name != NULL)
323             {
324               lex_sbc_only_once ("FIRST");
325               goto error;
326             }
327
328           lex_match (lexer, T_EQUALS);
329           if (!lex_force_id (lexer))
330             goto error;
331           first_name = xstrdup (lex_tokcstr (lexer));
332           lex_get (lexer);
333         }
334       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
335         {
336           if (last_name != NULL)
337             {
338               lex_sbc_only_once ("LAST");
339               goto error;
340             }
341
342           lex_match (lexer, T_EQUALS);
343           if (!lex_force_id (lexer))
344             goto error;
345           last_name = xstrdup (lex_tokcstr (lexer));
346           lex_get (lexer);
347         }
348       else if (lex_match_id (lexer, "MAP"))
349         {
350           /* FIXME. */
351         }
352       else if (lex_match_id (lexer, "DROP"))
353         {
354           if (!parse_dict_drop (lexer, proc.dict))
355             goto error;
356         }
357       else if (lex_match_id (lexer, "KEEP"))
358         {
359           if (!parse_dict_keep (lexer, proc.dict))
360             goto error;
361         }
362       else
363         {
364           lex_error (lexer, NULL);
365           goto error;
366         }
367
368       if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
369         {
370           lex_end_of_command (lexer);
371           goto error;
372         }
373     }
374
375   if (!saw_by)
376     {
377       if (command == COMB_UPDATE)
378         {
379           msg (SE, _("The BY subcommand is required."));
380           goto error;
381         }
382       if (n_tables)
383         {
384           msg (SE, _("BY is required when %s is specified."), "TABLE");
385           goto error;
386         }
387       if (saw_sort)
388         {
389           msg (SE, _("BY is required when %s is specified."), "SORT");
390           goto error;
391         }
392     }
393
394   /* Add IN, FIRST, and LAST variables to master dictionary. */
395   for (i = 0; i < proc.n_files; i++)
396     {
397       struct comb_file *file = &proc.files[i];
398       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
399         goto error;
400     }
401   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
402       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
403     goto error;
404
405   dict_delete_scratch_vars (proc.dict);
406   dict_compact_values (proc.dict);
407
408   /* Set up mapping from each file's variables to master
409      variables. */
410   for (i = 0; i < proc.n_files; i++)
411     {
412       struct comb_file *file = &proc.files[i];
413       size_t src_var_cnt = dict_get_var_cnt (file->dict);
414       size_t j;
415
416       for (j = 0; j < src_var_cnt; j++)
417         {
418           struct variable *src_var = dict_get_var (file->dict, j);
419           struct variable *dst_var = dict_lookup_var (proc.dict,
420                                                       var_get_name (src_var));
421           if (dst_var != NULL)
422             {
423               subcase_add_var (&file->src, src_var, SC_ASCEND);
424               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
425             }
426         }
427     }
428
429   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
430   taint = taint_clone (casewriter_get_taint (proc.output));
431
432   /* Set up case matcher. */
433   proc.matcher = case_matcher_create ();
434   for (i = 0; i < proc.n_files; i++)
435     {
436       struct comb_file *file = &proc.files[i];
437       if (file->reader == NULL)
438         {
439           if (active_file == NULL)
440             {
441               proc_discard_output (ds);
442               file->reader = active_file = proc_open (ds);
443             }
444           else
445             file->reader = casereader_clone (active_file);
446         }
447       if (!file->is_sorted)
448         file->reader = sort_execute (file->reader, &file->by_vars);
449       taint_propagate (casereader_get_taint (file->reader), taint);
450       file->data = casereader_read (file->reader);
451       if (file->type == COMB_FILE)
452         case_matcher_add_input (proc.matcher, &file->by_vars,
453                                 &file->data, &file->is_minimal);
454     }
455
456   if (command == COMB_ADD)
457     execute_add_files (&proc);
458   else if (command == COMB_MATCH)
459     execute_match_files (&proc);
460   else if (command == COMB_UPDATE)
461     execute_update (&proc);
462   else
463     NOT_REACHED ();
464
465   case_matcher_destroy (proc.matcher);
466   proc.matcher = NULL;
467   close_all_comb_files (&proc);
468   if (active_file != NULL)
469     proc_commit (ds);
470
471   dataset_set_dict (ds, proc.dict);
472   dataset_set_source (ds, casewriter_make_reader (proc.output));
473   proc.dict = NULL;
474   proc.output = NULL;
475
476   free_comb_proc (&proc);
477
478   free (first_name);
479   free (last_name);
480
481   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
482
483  error:
484   if (active_file != NULL)
485     proc_commit (ds);
486   free_comb_proc (&proc);
487   taint_destroy (taint);
488   free (first_name);
489   free (last_name);
490   return CMD_CASCADING_FAILURE;
491 }
492
493 /* Merge the dictionary for file F into master dictionary M. */
494 static bool
495 merge_dictionary (struct dictionary *const m, struct comb_file *f)
496 {
497   struct dictionary *d = f->dict;
498   const struct string_array *d_docs, *m_docs;
499   int i;
500
501   if (dict_get_label (m) == NULL)
502     dict_set_label (m, dict_get_label (d));
503
504   d_docs = dict_get_documents (d);
505   m_docs = dict_get_documents (m);
506
507
508   /* FIXME: If the input files have different encodings, then
509      the result is undefined.
510      The correct thing to do would be to convert to an encoding
511      which can cope with all the input files (eg UTF-8).
512    */
513   if ( 0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
514     msg (MW, _("Combining files with incompatible encodings. String data may "
515                "not be represented correctly."));
516
517   if (d_docs != NULL)
518     {
519       if (m_docs == NULL)
520         dict_set_documents (m, d_docs);
521       else
522         {
523           struct string_array new_docs;
524           size_t i;
525
526           new_docs.n = m_docs->n + d_docs->n;
527           new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
528           for (i = 0; i < m_docs->n; i++)
529             new_docs.strings[i] = m_docs->strings[i];
530           for (i = 0; i < d_docs->n; i++)
531             new_docs.strings[m_docs->n + i] = d_docs->strings[i];
532
533           dict_set_documents (m, &new_docs);
534
535           free (new_docs.strings);
536         }
537     }
538
539   for (i = 0; i < dict_get_var_cnt (d); i++)
540     {
541       struct variable *dv = dict_get_var (d, i);
542       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
543
544       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
545         continue;
546
547       if (mv != NULL)
548         {
549           if (var_get_width (mv) != var_get_width (dv))
550             {
551               const char *var_name = var_get_name (dv);
552               const char *file_name = fh_get_name (f->handle);
553               struct string s = DS_EMPTY_INITIALIZER;
554               ds_put_format (&s,
555                              _("Variable %s in file %s has different "
556                                "type or width from the same variable in "
557                                "earlier file."),
558                              var_name, file_name);
559               ds_put_cstr (&s, "  ");
560               if (var_is_numeric (dv))
561                 ds_put_format (&s, _("In file %s, %s is numeric."),
562                                file_name, var_name);
563               else
564                 ds_put_format (&s, _("In file %s, %s is a string variable "
565                                      "with width %d."),
566                                file_name, var_name, var_get_width (dv));
567               ds_put_cstr (&s, "  ");
568               if (var_is_numeric (mv))
569                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
570                                var_name);
571               else
572                 ds_put_format (&s, _("In an earlier file, %s was a string "
573                                      "variable with width %d."),
574                                var_name, var_get_width (mv));
575               msg (SE, "%s", ds_cstr (&s));
576               ds_destroy (&s);
577               return false;
578             }
579
580           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
581             var_set_value_labels (mv, var_get_value_labels (dv));
582           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
583             var_set_missing_values (mv, var_get_missing_values (dv));
584           if (var_get_label (dv) && !var_get_label (mv))
585             var_set_label (mv, var_get_label (dv), file_encoding, false);
586         }
587       else
588         mv = dict_clone_var_assert (m, dv);
589     }
590
591   return true;
592 }
593
594 /* If VAR_NAME is non-NULL, attempts to create a
595    variable named VAR_NAME, with format F1.0, in DICT, and stores
596    a pointer to the variable in *VAR.  Returns true if
597    successful, false if the variable name is a duplicate (in
598    which case a message saying that the variable specified on the
599    given SUBCOMMAND is a duplicate is emitted).
600
601    Does nothing and returns true if VAR_NAME is null. */
602 static bool
603 create_flag_var (const char *subcommand, const char *var_name,
604                  struct dictionary *dict, struct variable **var)
605 {
606   if (var_name != NULL)
607     {
608       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
609       *var = dict_create_var (dict, var_name, 0);
610       if (*var == NULL)
611         {
612           msg (SE, _("Variable name %s specified on %s subcommand "
613                      "duplicates an existing variable name."),
614                subcommand, var_name);
615           return false;
616         }
617       var_set_both_formats (*var, &format);
618     }
619   else
620     *var = NULL;
621   return true;
622 }
623
624 /* Closes all the files in PROC and frees their associated data. */
625 static void
626 close_all_comb_files (struct comb_proc *proc)
627 {
628   size_t i;
629
630   for (i = 0; i < proc->n_files; i++)
631     {
632       struct comb_file *file = &proc->files[i];
633       subcase_destroy (&file->by_vars);
634       subcase_destroy (&file->src);
635       subcase_destroy (&file->dst);
636       fh_unref (file->handle);
637       dict_destroy (file->dict);
638       casereader_destroy (file->reader);
639       case_unref (file->data);
640       free (file->in_name);
641     }
642   free (proc->files);
643   proc->files = NULL;
644   proc->n_files = 0;
645 }
646
647 /* Frees all the data for the procedure. */
648 static void
649 free_comb_proc (struct comb_proc *proc)
650 {
651   close_all_comb_files (proc);
652   dict_destroy (proc->dict);
653   casewriter_destroy (proc->output);
654   case_matcher_destroy (proc->matcher);
655   if (proc->prev_BY)
656     {
657       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
658                                 proc->prev_BY);
659       free (proc->prev_BY);
660     }
661   subcase_destroy (&proc->by_vars);
662   case_unref (proc->buffered_case);
663 }
664 \f
665 static bool scan_table (struct comb_file *, union value by[]);
666 static struct ccase *create_output_case (const struct comb_proc *);
667 static void apply_case (const struct comb_file *, struct ccase *);
668 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
669                                          union value by[]);
670 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
671 static void output_buffered_case (struct comb_proc *);
672
673 /* Executes the ADD FILES command. */
674 static void
675 execute_add_files (struct comb_proc *proc)
676 {
677   union value *by;
678
679   while (case_matcher_match (proc->matcher, &by))
680     {
681       size_t i;
682
683       for (i = 0; i < proc->n_files; i++)
684         {
685           struct comb_file *file = &proc->files[i];
686           while (file->is_minimal)
687             {
688               struct ccase *output = create_output_case (proc);
689               apply_file_case_and_advance (file, output, by);
690               output_case (proc, output, by);
691             }
692         }
693     }
694   output_buffered_case (proc);
695 }
696
697 /* Executes the MATCH FILES command. */
698 static void
699 execute_match_files (struct comb_proc *proc)
700 {
701   union value *by;
702
703   while (case_matcher_match (proc->matcher, &by))
704     {
705       struct ccase *output;
706       size_t i;
707
708       output = create_output_case (proc);
709       for (i = proc->n_files; i-- > 0; )
710         {
711           struct comb_file *file = &proc->files[i];
712           if (file->type == COMB_FILE)
713             {
714               if (file->is_minimal)
715                 apply_file_case_and_advance (file, output, NULL);
716             }
717           else
718             {
719               if (scan_table (file, by))
720                 apply_case (file, output);
721             }
722         }
723       output_case (proc, output, by);
724     }
725   output_buffered_case (proc);
726 }
727
728 /* Executes the UPDATE command. */
729 static void
730 execute_update (struct comb_proc *proc)
731 {
732   union value *by;
733   size_t n_duplicates = 0;
734
735   while (case_matcher_match (proc->matcher, &by))
736     {
737       struct comb_file *first, *file;
738       struct ccase *output;
739
740       /* Find first nonnull case in array and make an output case
741          from it. */
742       output = create_output_case (proc);
743       for (first = &proc->files[0]; ; first++)
744         if (first->is_minimal)
745           break;
746       apply_file_case_and_advance (first, output, by);
747
748       /* Read additional cases and update the output case from
749          them.  (Don't update the output case from any duplicate
750          cases in the master file.) */
751       for (file = first + (first == proc->files);
752            file < &proc->files[proc->n_files]; file++)
753         {
754           while (file->is_minimal)
755             apply_file_case_and_advance (file, output, by);
756         }
757       casewriter_write (proc->output, output);
758
759       /* Write duplicate cases in the master file directly to the
760          output.  */
761       if (first == proc->files && first->is_minimal)
762         {
763           n_duplicates++;
764           while (first->is_minimal)
765             {
766               output = create_output_case (proc);
767               apply_file_case_and_advance (first, output, by);
768               casewriter_write (proc->output, output);
769             }
770         }
771     }
772
773   if (n_duplicates)
774     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
775          n_duplicates);
776 }
777
778 /* Reads FILE, which must be of type COMB_TABLE, until it
779    encounters a case with BY or greater for its BY variables.
780    Returns true if a case with exactly BY for its BY variables
781    was found, otherwise false. */
782 static bool
783 scan_table (struct comb_file *file, union value by[])
784 {
785   while (file->data != NULL)
786     {
787       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
788       if (cmp > 0)
789         {
790           case_unref (file->data);
791           file->data = casereader_read (file->reader);
792         }
793       else
794         return cmp == 0;
795     }
796   return false;
797 }
798
799 /* Creates and returns an output case for PROC, initializing each
800    of its values to system-missing or blanks, except that the
801    values of IN variables are set to 0. */
802 static struct ccase *
803 create_output_case (const struct comb_proc *proc)
804 {
805   size_t n_vars = dict_get_var_cnt (proc->dict);
806   struct ccase *output;
807   size_t i;
808
809   output = case_create (dict_get_proto (proc->dict));
810   for (i = 0; i < n_vars; i++)
811     {
812       struct variable *v = dict_get_var (proc->dict, i);
813       value_set_missing (case_data_rw (output, v), var_get_width (v));
814     }
815   for (i = 0; i < proc->n_files; i++)
816     {
817       struct comb_file *file = &proc->files[i];
818       if (file->in_var != NULL)
819         case_data_rw (output, file->in_var)->f = false;
820     }
821   return output;
822 }
823
824 /* Copies the data from FILE's case into output case OUTPUT.
825    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
826 static void
827 apply_case (const struct comb_file *file, struct ccase *output)
828 {
829   subcase_copy (&file->src, file->data, &file->dst, output);
830   if (file->in_var != NULL)
831     case_data_rw (output, file->in_var)->f = true;
832 }
833
834 /* Like apply_case() above, but also advances FILE to its next
835    case.  Also, if BY is nonnull, then FILE's is_minimal member
836    is updated based on whether the new case's BY values still
837    match those in BY. */
838 static void
839 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
840                              union value by[])
841 {
842   apply_case (file, output);
843   case_unref (file->data);
844   file->data = casereader_read (file->reader);
845   if (by)
846     file->is_minimal = (file->data != NULL
847                         && subcase_equal_cx (&file->by_vars, file->data, by));
848 }
849
850 /* Writes OUTPUT, whose BY values has been extracted into BY, to
851    PROC's output file, first initializing any FIRST or LAST
852    variables in OUTPUT to the correct values. */
853 static void
854 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
855 {
856   if (proc->first == NULL && proc->last == NULL)
857     casewriter_write (proc->output, output);
858   else
859     {
860       /* It's harder with LAST, because we can't know whether
861          this case is the last in a group until we've prepared
862          the *next* case also.  Thus, we buffer the previous
863          output case until the next one is ready. */
864       bool new_BY;
865       if (proc->prev_BY != NULL)
866         {
867           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
868           if (proc->last != NULL)
869             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
870           casewriter_write (proc->output, proc->buffered_case);
871         }
872       else
873         new_BY = true;
874
875       proc->buffered_case = output;
876       if (proc->first != NULL)
877         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
878
879       if (new_BY)
880         {
881           size_t n_values = subcase_get_n_fields (&proc->by_vars);
882           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
883           if (proc->prev_BY == NULL)
884             {
885               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
886               caseproto_init_values (proto, proc->prev_BY);
887             }
888           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
889                           proc->prev_BY, by);
890         }
891     }
892 }
893
894 /* Writes a trailing buffered case to the output, if FIRST or
895    LAST is in use. */
896 static void
897 output_buffered_case (struct comb_proc *proc)
898 {
899   if (proc->prev_BY != NULL)
900     {
901       if (proc->last != NULL)
902         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
903       casewriter_write (proc->output, proc->buffered_case);
904       proc->buffered_case = NULL;
905     }
906 }