1a82ef3f1bf61bfcc6ce1e9a2559237f37fcc37d
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-reader.h>
22 #include <data/case-matcher.h>
23 #include <data/case.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/format.h>
28 #include <data/procedure.h>
29 #include <data/subcase.h>
30 #include <data/variable.h>
31 #include <language/command.h>
32 #include <language/data-io/file-handle.h>
33 #include <language/data-io/trim.h>
34 #include <language/lexer/lexer.h>
35 #include <language/lexer/variable-parser.h>
36 #include <language/stats/sort-criteria.h>
37 #include <libpspp/assertion.h>
38 #include <libpspp/message.h>
39 #include <libpspp/taint.h>
40 #include <math/sort.h>
41
42 #include "xalloc.h"
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 enum comb_command_type
48   {
49     COMB_ADD,
50     COMB_MATCH,
51     COMB_UPDATE
52   };
53
54 /* File types. */
55 enum comb_file_type
56   {
57     COMB_FILE,                  /* Specified on FILE= subcommand. */
58     COMB_TABLE                  /* Specified on TABLE= subcommand. */
59   };
60
61 /* One FILE or TABLE subcommand. */
62 struct comb_file
63   {
64     /* Basics. */
65     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
66
67     /* Variables. */
68     struct subcase by_vars;     /* BY variables in this input file. */
69     struct subcase src, dst;    /* Data to copy to output; where to put it. */
70
71     /* Input files. */
72     struct file_handle *handle; /* Input file handle. */
73     struct dictionary *dict;    /* Input file dictionary. */
74     struct casereader *reader;  /* Input data source. */
75     struct ccase *data;         /* The current input case. */
76     bool is_minimal;            /* Does 'data' have minimum BY values across
77                                    all input files? */
78     bool is_sorted;             /* Is file presorted on the BY variables? */
79
80     /* IN subcommand. */
81     char in_name[VAR_NAME_LEN + 1];
82     struct variable *in_var;
83   };
84
85 struct comb_proc
86   {
87     struct comb_file *files;    /* All the files being merged. */
88     size_t n_files;             /* Number of files. */
89
90     struct dictionary *dict;    /* Dictionary of output file. */
91     struct subcase by_vars;     /* BY variables in the output. */
92     struct casewriter *output;  /* Destination for output. */
93
94     struct case_matcher *matcher;
95
96     /* FIRST, LAST.
97        Only if "first" or "last" is nonnull are the remaining
98        members used. */
99     struct variable *first;     /* Variable specified on FIRST (if any). */
100     struct variable *last;      /* Variable specified on LAST (if any). */
101     struct ccase *buffered_case; /* Case ready for output except that we don't
102                                     know the value for the LAST var yet. */
103     union value *prev_BY;       /* Values of BY vars in buffered_case. */
104   };
105
106 static int combine_files (enum comb_command_type, struct lexer *,
107                           struct dataset *);
108 static void free_comb_proc (struct comb_proc *);
109
110 static void close_all_comb_files (struct comb_proc *);
111 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
112
113 static void execute_update (struct comb_proc *);
114 static void execute_match_files (struct comb_proc *);
115 static void execute_add_files (struct comb_proc *);
116
117 static bool create_flag_var (const char *subcommand_name, const char *var_name,
118                              struct dictionary *, struct variable **);
119 static void output_case (struct comb_proc *, struct ccase *, union value *by);
120 static void output_buffered_case (struct comb_proc *);
121
122 int
123 cmd_add_files (struct lexer *lexer, struct dataset *ds)
124 {
125   return combine_files (COMB_ADD, lexer, ds);
126 }
127
128 int
129 cmd_match_files (struct lexer *lexer, struct dataset *ds)
130 {
131   return combine_files (COMB_MATCH, lexer, ds);
132 }
133
134 int
135 cmd_update (struct lexer *lexer, struct dataset *ds)
136 {
137   return combine_files (COMB_UPDATE, lexer, ds);
138 }
139
140 static int
141 combine_files (enum comb_command_type command,
142                struct lexer *lexer, struct dataset *ds)
143 {
144   struct comb_proc proc;
145
146   bool saw_by = false;
147   bool saw_sort = false;
148   struct casereader *active_file = NULL;
149
150   char first_name[VAR_NAME_LEN + 1] = "";
151   char last_name[VAR_NAME_LEN + 1] = "";
152
153   struct taint *taint = NULL;
154
155   size_t n_tables = 0;
156   size_t allocated_files = 0;
157
158   size_t i;
159
160   proc.files = NULL;
161   proc.n_files = 0;
162   proc.dict = dict_create ();
163   proc.output = NULL;
164   proc.matcher = NULL;
165   subcase_init_empty (&proc.by_vars);
166   proc.first = NULL;
167   proc.last = NULL;
168   proc.buffered_case = NULL;
169   proc.prev_BY = NULL;
170
171   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
172
173   lex_match (lexer, '/');
174   for (;;)
175     {
176       struct comb_file *file;
177       enum comb_file_type type;
178
179       if (lex_match_id (lexer, "FILE"))
180         type = COMB_FILE;
181       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
182         {
183           type = COMB_TABLE;
184           n_tables++;
185         }
186       else
187         break;
188       lex_match (lexer, '=');
189
190       if (proc.n_files >= allocated_files)
191         proc.files = x2nrealloc (proc.files, &allocated_files,
192                                 sizeof *proc.files);
193       file = &proc.files[proc.n_files++];
194       file->type = type;
195       subcase_init_empty (&file->by_vars);
196       subcase_init_empty (&file->src);
197       subcase_init_empty (&file->dst);
198       file->handle = NULL;
199       file->dict = NULL;
200       file->reader = NULL;
201       file->data = NULL;
202       file->is_sorted = true;
203       file->in_name[0] = '\0';
204       file->in_var = NULL;
205
206       if (lex_match (lexer, '*'))
207         {
208           if (!proc_has_active_file (ds))
209             {
210               msg (SE, _("Cannot specify the active file since no active "
211                          "file has been defined."));
212               goto error;
213             }
214
215           if (proc_make_temporary_transformations_permanent (ds))
216             msg (SE, _("This command may not be used after TEMPORARY when "
217                        "the active file is an input source.  "
218                        "Temporary transformations will be made permanent."));
219
220           file->dict = dict_clone (dataset_dict (ds));
221         }
222       else
223         {
224           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
225           if (file->handle == NULL)
226             goto error;
227
228           file->reader = any_reader_open (file->handle, &file->dict);
229           if (file->reader == NULL)
230             goto error;
231         }
232
233       while (lex_match (lexer, '/'))
234         if (lex_match_id (lexer, "RENAME"))
235           {
236             if (!parse_dict_rename (lexer, file->dict))
237               goto error;
238           }
239         else if (lex_match_id (lexer, "IN"))
240           {
241             lex_match (lexer, '=');
242             if (lex_token (lexer) != T_ID)
243               {
244                 lex_error (lexer, NULL);
245                 goto error;
246               }
247
248             if (file->in_name[0])
249               {
250                 msg (SE, _("Multiple IN subcommands for a single FILE or "
251                            "TABLE."));
252                 goto error;
253               }
254             strcpy (file->in_name, lex_tokid (lexer));
255             lex_get (lexer);
256           }
257         else if (lex_match_id (lexer, "SORT"))
258           {
259             file->is_sorted = false;
260             saw_sort = true;
261           }
262
263       merge_dictionary (proc.dict, file);
264     }
265
266   while (lex_token (lexer) != '.')
267     {
268       if (lex_match (lexer, T_BY))
269         {
270           const struct variable **by_vars;
271           size_t i;
272           bool ok;
273
274           if (saw_by)
275             {
276               lex_sbc_only_once ("BY");
277               goto error;
278             }
279           saw_by = true;
280
281           lex_match (lexer, '=');
282           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
283                                     &by_vars, NULL))
284             goto error;
285
286           ok = true;
287           for (i = 0; i < proc.n_files; i++)
288             {
289               struct comb_file *file = &proc.files[i];
290               size_t j;
291
292               for (j = 0; j < subcase_get_n_values (&proc.by_vars); j++)
293                 {
294                   const char *name = var_get_name (by_vars[j]);
295                   struct variable *var = dict_lookup_var (file->dict, name);
296                   if (var != NULL)
297                     subcase_add_var (&file->by_vars, var,
298                                      subcase_get_direction (&proc.by_vars, j));
299                   else
300                     {
301                       if (file->handle != NULL)
302                         msg (SE, _("File %s lacks BY variable %s."),
303                              fh_get_name (file->handle), name);
304                       else
305                         msg (SE, _("Active file lacks BY variable %s."), name);
306                       ok = false;
307                     }
308                 }
309               assert (!ok || subcase_conformable (&file->by_vars,
310                                                   &proc.files[0].by_vars));
311             }
312           free (by_vars);
313
314           if (!ok)
315             goto error;
316         }
317       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
318         {
319           if (first_name[0] != '\0')
320             {
321               lex_sbc_only_once ("FIRST");
322               goto error;
323             }
324
325           lex_match (lexer, '=');
326           if (!lex_force_id (lexer))
327             goto error;
328           strcpy (first_name, lex_tokid (lexer));
329           lex_get (lexer);
330         }
331       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
332         {
333           if (last_name[0] != '\0')
334             {
335               lex_sbc_only_once ("LAST");
336               goto error;
337             }
338
339           lex_match (lexer, '=');
340           if (!lex_force_id (lexer))
341             goto error;
342           strcpy (last_name, lex_tokid (lexer));
343           lex_get (lexer);
344         }
345       else if (lex_match_id (lexer, "MAP"))
346         {
347           /* FIXME. */
348         }
349       else if (lex_match_id (lexer, "DROP"))
350         {
351           if (!parse_dict_drop (lexer, proc.dict))
352             goto error;
353         }
354       else if (lex_match_id (lexer, "KEEP"))
355         {
356           if (!parse_dict_keep (lexer, proc.dict))
357             goto error;
358         }
359       else
360         {
361           lex_error (lexer, NULL);
362           goto error;
363         }
364
365       if (!lex_match (lexer, '/') && lex_token (lexer) != '.')
366         {
367           lex_end_of_command (lexer);
368           goto error;
369         }
370     }
371
372   if (!saw_by)
373     {
374       if (command == COMB_UPDATE)
375         {
376           msg (SE, _("The BY subcommand is required."));
377           goto error;
378         }
379       if (n_tables)
380         {
381           msg (SE, _("BY is required when TABLE is specified."));
382           goto error;
383         }
384       if (saw_sort)
385         {
386           msg (SE, _("BY is required when SORT is specified."));
387           goto error;
388         }
389     }
390
391   /* Add IN, FIRST, and LAST variables to master dictionary. */
392   for (i = 0; i < proc.n_files; i++)
393     {
394       struct comb_file *file = &proc.files[i];
395       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
396         goto error;
397     }
398   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
399       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
400     goto error;
401
402   dict_delete_scratch_vars (proc.dict);
403   dict_compact_values (proc.dict);
404
405   /* Set up mapping from each file's variables to master
406      variables. */
407   for (i = 0; i < proc.n_files; i++)
408     {
409       struct comb_file *file = &proc.files[i];
410       size_t src_var_cnt = dict_get_var_cnt (file->dict);
411       size_t j;
412
413       for (j = 0; j < src_var_cnt; j++)
414         {
415           struct variable *src_var = dict_get_var (file->dict, j);
416           struct variable *dst_var = dict_lookup_var (proc.dict,
417                                                       var_get_name (src_var));
418           if (dst_var != NULL)
419             {
420               subcase_add_var (&file->src, src_var, SC_ASCEND);
421               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
422             }
423         }
424     }
425
426   proc.output = autopaging_writer_create (dict_get_next_value_idx (proc.dict));
427   taint = taint_clone (casewriter_get_taint (proc.output));
428
429   /* Set up case matcher. */
430   proc.matcher = case_matcher_create ();
431   for (i = 0; i < proc.n_files; i++)
432     {
433       struct comb_file *file = &proc.files[i];
434       if (file->reader == NULL)
435         {
436           if (active_file == NULL)
437             {
438               proc_discard_output (ds);
439               file->reader = active_file = proc_open (ds);
440             }
441           else
442             file->reader = casereader_clone (active_file);
443         }
444       if (!file->is_sorted)
445         file->reader = sort_execute (file->reader, &file->by_vars);
446       taint_propagate (casereader_get_taint (file->reader), taint);
447       file->data = casereader_read (file->reader);
448       if (file->type == COMB_FILE)
449         case_matcher_add_input (proc.matcher, &file->by_vars,
450                                 &file->data, &file->is_minimal);
451     }
452
453   if (command == COMB_ADD)
454     execute_add_files (&proc);
455   else if (command == COMB_MATCH)
456     execute_match_files (&proc);
457   else if (command == COMB_UPDATE)
458     execute_update (&proc);
459   else
460     NOT_REACHED ();
461
462   case_matcher_destroy (proc.matcher);
463   proc.matcher = NULL;
464   close_all_comb_files (&proc);
465   if (active_file != NULL)
466     proc_commit (ds);
467
468   proc_set_active_file (ds, casewriter_make_reader (proc.output), proc.dict);
469   proc.dict = NULL;
470   proc.output = NULL;
471
472   free_comb_proc (&proc);
473
474   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
475
476  error:
477   if (active_file != NULL)
478     proc_commit (ds);
479   free_comb_proc (&proc);
480   taint_destroy (taint);
481   return CMD_CASCADING_FAILURE;
482 }
483
484 /* Merge the dictionary for file F into master dictionary M. */
485 static bool
486 merge_dictionary (struct dictionary *const m, struct comb_file *f)
487 {
488   struct dictionary *d = f->dict;
489   const char *d_docs, *m_docs;
490   int i;
491   const char *file_encoding;
492
493   if (dict_get_label (m) == NULL)
494     dict_set_label (m, dict_get_label (d));
495
496   d_docs = dict_get_documents (d);
497   m_docs = dict_get_documents (m);
498
499
500   /* If the input files have different encodings, then
501    */
502   file_encoding = dict_get_encoding (f->dict);
503   if ( file_encoding != NULL)
504     {
505       if ( dict_get_encoding (m) == NULL)
506         dict_set_encoding (m, file_encoding);
507       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
508         {
509           msg (MW,
510                _("Combining files with incompatible encodings. String data may not be represented correctly."));
511         }
512     }
513
514   if (d_docs != NULL)
515     {
516       if (m_docs == NULL)
517         dict_set_documents (m, d_docs);
518       else
519         {
520           char *new_docs = xasprintf ("%s%s", m_docs, d_docs);
521           dict_set_documents (m, new_docs);
522           free (new_docs);
523         }
524     }
525
526   for (i = 0; i < dict_get_var_cnt (d); i++)
527     {
528       struct variable *dv = dict_get_var (d, i);
529       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
530
531       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
532         continue;
533
534       if (mv != NULL)
535         {
536           if (var_get_width (mv) != var_get_width (dv))
537             {
538               const char *var_name = var_get_name (dv);
539               const char *file_name = fh_get_name (f->handle);
540               struct string s = DS_EMPTY_INITIALIZER;
541               ds_put_format (&s,
542                              _("Variable %s in file %s has different "
543                                "type or width from the same variable in "
544                                "earlier file."),
545                              var_name, file_name);
546               ds_put_cstr (&s, "  ");
547               if (var_is_numeric (dv))
548                 ds_put_format (&s, _("In file %s, %s is numeric."),
549                                file_name, var_name);
550               else
551                 ds_put_format (&s, _("In file %s, %s is a string variable "
552                                      "with width %d."),
553                                file_name, var_name, var_get_width (dv));
554               ds_put_cstr (&s, "  ");
555               if (var_is_numeric (mv))
556                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
557                                var_name);
558               else
559                 ds_put_format (&s, _("In an earlier file, %s was a string "
560                                      "variable with width %d."),
561                                var_name, var_get_width (mv));
562               msg (SE, ds_cstr (&s));
563               ds_destroy (&s);
564               return false;
565             }
566
567           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
568             var_set_value_labels (mv, var_get_value_labels (dv));
569           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
570             var_set_missing_values (mv, var_get_missing_values (dv));
571           if (var_get_label (dv) && !var_get_label (mv))
572             var_set_label (mv, var_get_label (dv));
573         }
574       else
575         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
576     }
577
578   return true;
579 }
580
581 /* If VAR_NAME is a non-empty string, attempts to create a
582    variable named VAR_NAME, with format F1.0, in DICT, and stores
583    a pointer to the variable in *VAR.  Returns true if
584    successful, false if the variable name is a duplicate (in
585    which case a message saying that the variable specified on the
586    given SUBCOMMAND is a duplicate is emitted).  Also returns
587    true, without doing anything, if VAR_NAME is null or empty. */
588 static bool
589 create_flag_var (const char *subcommand, const char *var_name,
590                  struct dictionary *dict, struct variable **var)
591 {
592   if (var_name[0] != '\0')
593     {
594       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
595       *var = dict_create_var (dict, var_name, 0);
596       if (*var == NULL)
597         {
598           msg (SE, _("Variable name %s specified on %s subcommand "
599                      "duplicates an existing variable name."),
600                subcommand, var_name);
601           return false;
602         }
603       var_set_both_formats (*var, &format);
604     }
605   else
606     *var = NULL;
607   return true;
608 }
609
610 /* Closes all the files in PROC and frees their associated data. */
611 static void
612 close_all_comb_files (struct comb_proc *proc)
613 {
614   size_t i;
615
616   for (i = 0; i < proc->n_files; i++)
617     {
618       struct comb_file *file = &proc->files[i];
619       subcase_destroy (&file->by_vars);
620       subcase_destroy (&file->src);
621       subcase_destroy (&file->dst);
622       fh_unref (file->handle);
623       dict_destroy (file->dict);
624       casereader_destroy (file->reader);
625       case_unref (file->data);
626     }
627   free (proc->files);
628   proc->files = NULL;
629   proc->n_files = 0;
630 }
631
632 /* Frees all the data for the procedure. */
633 static void
634 free_comb_proc (struct comb_proc *proc)
635 {
636   close_all_comb_files (proc);
637   dict_destroy (proc->dict);
638   casewriter_destroy (proc->output);
639   case_matcher_destroy (proc->matcher);
640   subcase_destroy (&proc->by_vars);
641   case_unref (proc->buffered_case);
642   free (proc->prev_BY);
643 }
644 \f
645 static bool scan_table (struct comb_file *, union value by[]);
646 static struct ccase *create_output_case (const struct comb_proc *);
647 static void apply_case (const struct comb_file *, struct ccase *);
648 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
649                                          union value by[]);
650 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
651 static void output_buffered_case (struct comb_proc *);
652
653 /* Executes the ADD FILES command. */
654 static void
655 execute_add_files (struct comb_proc *proc)
656 {
657   union value *by;
658
659   while (case_matcher_match (proc->matcher, &by))
660     {
661       size_t i;
662
663       for (i = 0; i < proc->n_files; i++)
664         {
665           struct comb_file *file = &proc->files[i];
666           while (file->is_minimal)
667             {
668               struct ccase *output = create_output_case (proc);
669               apply_file_case_and_advance (file, output, by);
670               output_case (proc, output, by);
671             }
672         }
673     }
674   output_buffered_case (proc);
675 }
676
677 /* Executes the MATCH FILES command. */
678 static void
679 execute_match_files (struct comb_proc *proc)
680 {
681   union value *by;
682
683   while (case_matcher_match (proc->matcher, &by))
684     {
685       struct ccase *output;
686       size_t i;
687
688       output = create_output_case (proc);
689       for (i = proc->n_files; i-- > 0; )
690         {
691           struct comb_file *file = &proc->files[i];
692           if (file->type == COMB_FILE)
693             {
694               if (file->is_minimal)
695                 apply_file_case_and_advance (file, output, NULL);
696             }
697           else
698             {
699               if (scan_table (file, by))
700                 apply_case (file, output);
701             }
702         }
703       output_case (proc, output, by);
704     }
705   output_buffered_case (proc);
706 }
707
708 /* Executes the UPDATE command. */
709 static void
710 execute_update (struct comb_proc *proc)
711 {
712   union value *by;
713   size_t n_duplicates = 0;
714
715   while (case_matcher_match (proc->matcher, &by))
716     {
717       struct comb_file *first, *file;
718       struct ccase *output;
719
720       /* Find first nonnull case in array and make an output case
721          from it. */
722       output = create_output_case (proc);
723       for (first = &proc->files[0]; ; first++)
724         if (first->is_minimal)
725           break;
726       apply_file_case_and_advance (first, output, by);
727
728       /* Read additional cases and update the output case from
729          them.  (Don't update the output case from any duplicate
730          cases in the master file.) */
731       for (file = first + (first == proc->files);
732            file < &proc->files[proc->n_files]; file++)
733         {
734           while (file->is_minimal)
735             apply_file_case_and_advance (file, output, by);
736         }
737       casewriter_write (proc->output, output);
738
739       /* Write duplicate cases in the master file directly to the
740          output.  */
741       if (first == proc->files && first->is_minimal)
742         {
743           n_duplicates++;
744           while (first->is_minimal)
745             {
746               output = create_output_case (proc);
747               apply_file_case_and_advance (first, output, by);
748               casewriter_write (proc->output, output);
749             }
750         }
751     }
752
753   if (n_duplicates)
754     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
755          n_duplicates);
756 }
757
758 /* Reads FILE, which must be of type COMB_TABLE, until it
759    encounters a case with BY or greater for its BY variables.
760    Returns true if a case with exactly BY for its BY variables
761    was found, otherwise false. */
762 static bool
763 scan_table (struct comb_file *file, union value by[])
764 {
765   while (file->data != NULL)
766     {
767       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
768       if (cmp > 0)
769         {
770           case_unref (file->data);
771           file->data = casereader_read (file->reader);
772         }
773       else
774         return cmp == 0;
775     }
776   return false;
777 }
778
779 /* Creates and returns an output case for PROC, initializing each
780    of its values to system-missing or blanks, except that the
781    values of IN variables are set to 0. */
782 static struct ccase *
783 create_output_case (const struct comb_proc *proc)
784 {
785   size_t n_vars = dict_get_var_cnt (proc->dict);
786   struct ccase *output;
787   size_t i;
788
789   output = case_create (dict_get_next_value_idx (proc->dict));
790   for (i = 0; i < n_vars; i++)
791     {
792       struct variable *v = dict_get_var (proc->dict, i);
793       value_set_missing (case_data_rw (output, v), var_get_width (v));
794     }
795   for (i = 0; i < proc->n_files; i++)
796     {
797       struct comb_file *file = &proc->files[i];
798       if (file->in_var != NULL)
799         case_data_rw (output, file->in_var)->f = false;
800     }
801   return output;
802 }
803
804 /* Copies the data from FILE's case into output case OUTPUT.
805    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
806 static void
807 apply_case (const struct comb_file *file, struct ccase *output)
808 {
809   subcase_copy (&file->src, file->data, &file->dst, output);
810   if (file->in_var != NULL)
811     case_data_rw (output, file->in_var)->f = true;
812 }
813
814 /* Like apply_case() above, but also advances FILE to its next
815    case.  Also, if BY is nonnull, then FILE's is_minimal member
816    is updated based on whether the new case's BY values still
817    match those in BY. */
818 static void
819 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
820                              union value by[])
821 {
822   apply_case (file, output);
823   case_unref (file->data);
824   file->data = casereader_read (file->reader);
825   if (by)
826     file->is_minimal = (file->data != NULL
827                         && subcase_equal_cx (&file->by_vars, file->data, by));
828 }
829
830 /* Writes OUTPUT, whose BY values has been extracted into BY, to
831    PROC's output file, first initializing any FIRST or LAST
832    variables in OUTPUT to the correct values. */
833 static void
834 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
835 {
836   if (proc->first == NULL && proc->last == NULL)
837     casewriter_write (proc->output, output);
838   else
839     {
840       /* It's harder with LAST, because we can't know whether
841          this case is the last in a group until we've prepared
842          the *next* case also.  Thus, we buffer the previous
843          output case until the next one is ready. */
844       bool new_BY;
845       if (proc->prev_BY != NULL)
846         {
847           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
848           if (proc->last != NULL)
849             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
850           casewriter_write (proc->output, proc->buffered_case);
851         }
852       else
853         new_BY = true;
854
855       proc->buffered_case = output;
856       if (proc->first != NULL)
857         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
858
859       if (new_BY)
860         {
861           size_t n = (subcase_get_n_values (&proc->by_vars)
862                       * sizeof (union value));
863           if (proc->prev_BY == NULL)
864             proc->prev_BY = xmalloc (n);
865           memcpy (proc->prev_BY, by, n);
866         }
867     }
868 }
869
870 /* Writes a trailing buffered case to the output, if FIRST or
871    LAST is in use. */
872 static void
873 output_buffered_case (struct comb_proc *proc)
874 {
875   if (proc->prev_BY != NULL)
876     {
877       if (proc->last != NULL)
878         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
879       casewriter_write (proc->output, proc->buffered_case);
880       proc->buffered_case = NULL;
881     }
882 }