39ecedd133eec1c7aa54475df35ee2b5efb3cde1
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-reader.h>
22 #include <data/case-matcher.h>
23 #include <data/case.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/format.h>
28 #include <data/procedure.h>
29 #include <data/subcase.h>
30 #include <data/variable.h>
31 #include <language/command.h>
32 #include <language/data-io/file-handle.h>
33 #include <language/data-io/trim.h>
34 #include <language/lexer/lexer.h>
35 #include <language/lexer/variable-parser.h>
36 #include <language/stats/sort-criteria.h>
37 #include <libpspp/assertion.h>
38 #include <libpspp/message.h>
39 #include <libpspp/taint.h>
40 #include <math/sort.h>
41
42 #include "xalloc.h"
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 enum comb_command_type
48   {
49     COMB_ADD,
50     COMB_MATCH,
51     COMB_UPDATE
52   };
53
54 /* File types. */
55 enum comb_file_type
56   {
57     COMB_FILE,                  /* Specified on FILE= subcommand. */
58     COMB_TABLE                  /* Specified on TABLE= subcommand. */
59   };
60
61 /* One FILE or TABLE subcommand. */
62 struct comb_file
63   {
64     /* Basics. */
65     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
66
67     /* Variables. */
68     struct subcase by_vars;     /* BY variables in this input file. */
69     struct subcase src, dst;    /* Data to copy to output; where to put it. */
70
71     /* Input files. */
72     struct file_handle *handle; /* Input file handle. */
73     struct dictionary *dict;    /* Input file dictionary. */
74     struct casereader *reader;  /* Input data source. */
75     struct ccase *data;         /* The current input case. */
76     bool is_minimal;            /* Does 'data' have minimum BY values across
77                                    all input files? */
78     bool is_sorted;             /* Is file presorted on the BY variables? */
79
80     /* IN subcommand. */
81     char in_name[VAR_NAME_LEN + 1];
82     struct variable *in_var;
83   };
84
85 struct comb_proc
86   {
87     struct comb_file *files;    /* All the files being merged. */
88     size_t n_files;             /* Number of files. */
89
90     struct dictionary *dict;    /* Dictionary of output file. */
91     struct subcase by_vars;     /* BY variables in the output. */
92     struct casewriter *output;  /* Destination for output. */
93
94     struct case_matcher *matcher;
95
96     /* FIRST, LAST.
97        Only if "first" or "last" is nonnull are the remaining
98        members used. */
99     struct variable *first;     /* Variable specified on FIRST (if any). */
100     struct variable *last;      /* Variable specified on LAST (if any). */
101     struct ccase *buffered_case; /* Case ready for output except that we don't
102                                     know the value for the LAST var yet. */
103     union value *prev_BY;       /* Values of BY vars in buffered_case. */
104   };
105
106 static int combine_files (enum comb_command_type, struct lexer *,
107                           struct dataset *);
108 static void free_comb_proc (struct comb_proc *);
109
110 static void close_all_comb_files (struct comb_proc *);
111 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
112
113 static void execute_update (struct comb_proc *);
114 static void execute_match_files (struct comb_proc *);
115 static void execute_add_files (struct comb_proc *);
116
117 static bool create_flag_var (const char *subcommand_name, const char *var_name,
118                              struct dictionary *, struct variable **);
119 static void output_case (struct comb_proc *, struct ccase *, union value *by);
120 static void output_buffered_case (struct comb_proc *);
121
122 int
123 cmd_add_files (struct lexer *lexer, struct dataset *ds)
124 {
125   return combine_files (COMB_ADD, lexer, ds);
126 }
127
128 int
129 cmd_match_files (struct lexer *lexer, struct dataset *ds)
130 {
131   return combine_files (COMB_MATCH, lexer, ds);
132 }
133
134 int
135 cmd_update (struct lexer *lexer, struct dataset *ds)
136 {
137   return combine_files (COMB_UPDATE, lexer, ds);
138 }
139
140 static int
141 combine_files (enum comb_command_type command,
142                struct lexer *lexer, struct dataset *ds)
143 {
144   struct comb_proc proc;
145
146   bool saw_by = false;
147   bool saw_sort = false;
148   struct casereader *active_file = NULL;
149
150   char first_name[VAR_NAME_LEN + 1] = "";
151   char last_name[VAR_NAME_LEN + 1] = "";
152
153   struct taint *taint = NULL;
154
155   size_t n_tables = 0;
156   size_t allocated_files = 0;
157
158   size_t i;
159
160   proc.files = NULL;
161   proc.n_files = 0;
162   proc.dict = dict_create ();
163   proc.output = NULL;
164   proc.matcher = NULL;
165   subcase_init_empty (&proc.by_vars);
166   proc.first = NULL;
167   proc.last = NULL;
168   proc.buffered_case = NULL;
169   proc.prev_BY = NULL;
170
171   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
172
173   lex_match (lexer, '/');
174   for (;;)
175     {
176       struct comb_file *file;
177       enum comb_file_type type;
178
179       if (lex_match_id (lexer, "FILE"))
180         type = COMB_FILE;
181       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
182         {
183           type = COMB_TABLE;
184           n_tables++;
185         }
186       else
187         break;
188       lex_match (lexer, '=');
189
190       if (proc.n_files >= allocated_files)
191         proc.files = x2nrealloc (proc.files, &allocated_files,
192                                 sizeof *proc.files);
193       file = &proc.files[proc.n_files++];
194       file->type = type;
195       subcase_init_empty (&file->by_vars);
196       subcase_init_empty (&file->src);
197       subcase_init_empty (&file->dst);
198       file->handle = NULL;
199       file->dict = NULL;
200       file->reader = NULL;
201       file->data = NULL;
202       file->is_sorted = true;
203       file->in_name[0] = '\0';
204       file->in_var = NULL;
205
206       if (lex_match (lexer, '*'))
207         {
208           if (!proc_has_active_file (ds))
209             {
210               msg (SE, _("Cannot specify the active file since no active "
211                          "file has been defined."));
212               goto error;
213             }
214
215           if (proc_make_temporary_transformations_permanent (ds))
216             msg (SE, _("This command may not be used after TEMPORARY when "
217                        "the active file is an input source.  "
218                        "Temporary transformations will be made permanent."));
219
220           file->dict = dict_clone (dataset_dict (ds));
221         }
222       else
223         {
224           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
225           if (file->handle == NULL)
226             goto error;
227
228           file->reader = any_reader_open (file->handle, &file->dict);
229           if (file->reader == NULL)
230             goto error;
231         }
232
233       while (lex_match (lexer, '/'))
234         if (lex_match_id (lexer, "RENAME"))
235           {
236             if (!parse_dict_rename (lexer, file->dict))
237               goto error;
238           }
239         else if (lex_match_id (lexer, "IN"))
240           {
241             lex_match (lexer, '=');
242             if (lex_token (lexer) != T_ID)
243               {
244                 lex_error (lexer, NULL);
245                 goto error;
246               }
247
248             if (file->in_name[0])
249               {
250                 msg (SE, _("Multiple IN subcommands for a single FILE or "
251                            "TABLE."));
252                 goto error;
253               }
254             strcpy (file->in_name, lex_tokid (lexer));
255             lex_get (lexer);
256           }
257         else if (lex_match_id (lexer, "SORT"))
258           {
259             file->is_sorted = false;
260             saw_sort = true;
261           }
262
263       merge_dictionary (proc.dict, file);
264     }
265
266   while (lex_token (lexer) != '.')
267     {
268       if (lex_match (lexer, T_BY))
269         {
270           const struct variable **by_vars;
271           size_t i;
272           bool ok;
273
274           if (saw_by)
275             {
276               lex_sbc_only_once ("BY");
277               goto error;
278             }
279           saw_by = true;
280
281           lex_match (lexer, '=');
282           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
283                                     &by_vars, NULL))
284             goto error;
285
286           ok = true;
287           for (i = 0; i < proc.n_files; i++)
288             {
289               struct comb_file *file = &proc.files[i];
290               size_t j;
291
292               for (j = 0; j < subcase_get_n_values (&proc.by_vars); j++)
293                 {
294                   const char *name = var_get_name (by_vars[j]);
295                   struct variable *var = dict_lookup_var (file->dict, name);
296                   if (var != NULL)
297                     subcase_add_var (&file->by_vars, var,
298                                      subcase_get_direction (&proc.by_vars, j));
299                   else
300                     {
301                       if (file->handle != NULL)
302                         msg (SE, _("File %s lacks BY variable %s."),
303                              fh_get_name (file->handle), name);
304                       else
305                         msg (SE, _("Active file lacks BY variable %s."), name);
306                       ok = false;
307                     }
308                 }
309               assert (!ok || subcase_conformable (&file->by_vars,
310                                                   &proc.files[0].by_vars));
311             }
312           free (by_vars);
313
314           if (!ok)
315             goto error;
316         }
317       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
318         {
319           if (first_name[0] != '\0')
320             {
321               lex_sbc_only_once ("FIRST");
322               goto error;
323             }
324
325           lex_match (lexer, '=');
326           if (!lex_force_id (lexer))
327             goto error;
328           strcpy (first_name, lex_tokid (lexer));
329           lex_get (lexer);
330         }
331       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
332         {
333           if (last_name[0] != '\0')
334             {
335               lex_sbc_only_once ("LAST");
336               goto error;
337             }
338
339           lex_match (lexer, '=');
340           if (!lex_force_id (lexer))
341             goto error;
342           strcpy (last_name, lex_tokid (lexer));
343           lex_get (lexer);
344         }
345       else if (lex_match_id (lexer, "MAP"))
346         {
347           /* FIXME. */
348         }
349       else if (lex_match_id (lexer, "DROP"))
350         {
351           if (!parse_dict_drop (lexer, proc.dict))
352             goto error;
353         }
354       else if (lex_match_id (lexer, "KEEP"))
355         {
356           if (!parse_dict_keep (lexer, proc.dict))
357             goto error;
358         }
359       else
360         {
361           lex_error (lexer, NULL);
362           goto error;
363         }
364
365       if (!lex_match (lexer, '/') && lex_token (lexer) != '.')
366         {
367           lex_end_of_command (lexer);
368           goto error;
369         }
370     }
371
372   if (!saw_by)
373     {
374       if (command == COMB_UPDATE)
375         {
376           msg (SE, _("The BY subcommand is required."));
377           goto error;
378         }
379       if (n_tables)
380         {
381           msg (SE, _("BY is required when TABLE is specified."));
382           goto error;
383         }
384       if (saw_sort)
385         {
386           msg (SE, _("BY is required when SORT is specified."));
387           goto error;
388         }
389     }
390
391   /* Add IN, FIRST, and LAST variables to master dictionary. */
392   for (i = 0; i < proc.n_files; i++)
393     {
394       struct comb_file *file = &proc.files[i];
395       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
396         goto error;
397     }
398   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
399       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
400     goto error;
401
402   dict_delete_scratch_vars (proc.dict);
403   dict_compact_values (proc.dict);
404
405   /* Set up mapping from each file's variables to master
406      variables. */
407   for (i = 0; i < proc.n_files; i++)
408     {
409       struct comb_file *file = &proc.files[i];
410       size_t src_var_cnt = dict_get_var_cnt (file->dict);
411       size_t j;
412
413       for (j = 0; j < src_var_cnt; j++)
414         {
415           struct variable *src_var = dict_get_var (file->dict, j);
416           struct variable *dst_var = dict_lookup_var (proc.dict,
417                                                       var_get_name (src_var));
418           if (dst_var != NULL)
419             {
420               subcase_add_var (&file->src, src_var, SC_ASCEND);
421               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
422             }
423         }
424     }
425
426   proc.output = autopaging_writer_create (dict_get_next_value_idx (proc.dict));
427   taint = taint_clone (casewriter_get_taint (proc.output));
428
429   /* Set up case matcher. */
430   proc.matcher = case_matcher_create ();
431   for (i = 0; i < proc.n_files; i++)
432     {
433       struct comb_file *file = &proc.files[i];
434       if (file->reader == NULL)
435         {
436           if (active_file == NULL)
437             {
438               proc_discard_output (ds);
439               file->reader = active_file = proc_open (ds);
440             }
441           else
442             file->reader = casereader_clone (active_file);
443         }
444       if (!file->is_sorted)
445         file->reader = sort_execute (file->reader, &file->by_vars);
446       taint_propagate (casereader_get_taint (file->reader), taint);
447       file->data = casereader_read (file->reader);
448       if (file->type == COMB_FILE)
449         case_matcher_add_input (proc.matcher, &file->by_vars,
450                                 &file->data, &file->is_minimal);
451     }
452
453   if (command == COMB_ADD)
454     execute_add_files (&proc);
455   else if (command == COMB_MATCH)
456     execute_match_files (&proc);
457   else if (command == COMB_UPDATE)
458     execute_update (&proc);
459   else
460     NOT_REACHED ();
461
462   case_matcher_destroy (proc.matcher);
463   proc.matcher = NULL;
464   close_all_comb_files (&proc);
465   if (active_file != NULL)
466     proc_commit (ds);
467
468   proc_set_active_file (ds, casewriter_make_reader (proc.output), proc.dict);
469   proc.dict = NULL;
470   proc.output = NULL;
471
472   free_comb_proc (&proc);
473
474   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
475
476  error:
477   if (active_file != NULL)
478     proc_commit (ds);
479   free_comb_proc (&proc);
480   taint_destroy (taint);
481   return CMD_CASCADING_FAILURE;
482 }
483
484 /* Merge the dictionary for file F into master dictionary M. */
485 static bool
486 merge_dictionary (struct dictionary *const m, struct comb_file *f)
487 {
488   struct dictionary *d = f->dict;
489   const char *d_docs, *m_docs;
490   int i;
491   const char *file_encoding;
492
493   if (dict_get_label (m) == NULL)
494     dict_set_label (m, dict_get_label (d));
495
496   d_docs = dict_get_documents (d);
497   m_docs = dict_get_documents (m);
498
499
500   /* FIXME: If the input files have different encodings, then
501      the result is undefined.
502      The correct thing to do would be to convert to an encoding
503      which can cope with all the input files (eg UTF-8).
504    */
505   file_encoding = dict_get_encoding (f->dict);
506   if ( file_encoding != NULL)
507     {
508       if ( dict_get_encoding (m) == NULL)
509         dict_set_encoding (m, file_encoding);
510       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
511         {
512           msg (MW,
513                _("Combining files with incompatible encodings. String data may not be represented correctly."));
514         }
515     }
516
517   if (d_docs != NULL)
518     {
519       if (m_docs == NULL)
520         dict_set_documents (m, d_docs);
521       else
522         {
523           char *new_docs = xasprintf ("%s%s", m_docs, d_docs);
524           dict_set_documents (m, new_docs);
525           free (new_docs);
526         }
527     }
528
529   for (i = 0; i < dict_get_var_cnt (d); i++)
530     {
531       struct variable *dv = dict_get_var (d, i);
532       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
533
534       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
535         continue;
536
537       if (mv != NULL)
538         {
539           if (var_get_width (mv) != var_get_width (dv))
540             {
541               const char *var_name = var_get_name (dv);
542               const char *file_name = fh_get_name (f->handle);
543               struct string s = DS_EMPTY_INITIALIZER;
544               ds_put_format (&s,
545                              _("Variable %s in file %s has different "
546                                "type or width from the same variable in "
547                                "earlier file."),
548                              var_name, file_name);
549               ds_put_cstr (&s, "  ");
550               if (var_is_numeric (dv))
551                 ds_put_format (&s, _("In file %s, %s is numeric."),
552                                file_name, var_name);
553               else
554                 ds_put_format (&s, _("In file %s, %s is a string variable "
555                                      "with width %d."),
556                                file_name, var_name, var_get_width (dv));
557               ds_put_cstr (&s, "  ");
558               if (var_is_numeric (mv))
559                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
560                                var_name);
561               else
562                 ds_put_format (&s, _("In an earlier file, %s was a string "
563                                      "variable with width %d."),
564                                var_name, var_get_width (mv));
565               msg (SE, ds_cstr (&s));
566               ds_destroy (&s);
567               return false;
568             }
569
570           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
571             var_set_value_labels (mv, var_get_value_labels (dv));
572           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
573             var_set_missing_values (mv, var_get_missing_values (dv));
574           if (var_get_label (dv) && !var_get_label (mv))
575             var_set_label (mv, var_get_label (dv));
576         }
577       else
578         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
579     }
580
581   return true;
582 }
583
584 /* If VAR_NAME is a non-empty string, attempts to create a
585    variable named VAR_NAME, with format F1.0, in DICT, and stores
586    a pointer to the variable in *VAR.  Returns true if
587    successful, false if the variable name is a duplicate (in
588    which case a message saying that the variable specified on the
589    given SUBCOMMAND is a duplicate is emitted).  Also returns
590    true, without doing anything, if VAR_NAME is null or empty. */
591 static bool
592 create_flag_var (const char *subcommand, const char *var_name,
593                  struct dictionary *dict, struct variable **var)
594 {
595   if (var_name[0] != '\0')
596     {
597       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
598       *var = dict_create_var (dict, var_name, 0);
599       if (*var == NULL)
600         {
601           msg (SE, _("Variable name %s specified on %s subcommand "
602                      "duplicates an existing variable name."),
603                subcommand, var_name);
604           return false;
605         }
606       var_set_both_formats (*var, &format);
607     }
608   else
609     *var = NULL;
610   return true;
611 }
612
613 /* Closes all the files in PROC and frees their associated data. */
614 static void
615 close_all_comb_files (struct comb_proc *proc)
616 {
617   size_t i;
618
619   for (i = 0; i < proc->n_files; i++)
620     {
621       struct comb_file *file = &proc->files[i];
622       subcase_destroy (&file->by_vars);
623       subcase_destroy (&file->src);
624       subcase_destroy (&file->dst);
625       fh_unref (file->handle);
626       dict_destroy (file->dict);
627       casereader_destroy (file->reader);
628       case_unref (file->data);
629     }
630   free (proc->files);
631   proc->files = NULL;
632   proc->n_files = 0;
633 }
634
635 /* Frees all the data for the procedure. */
636 static void
637 free_comb_proc (struct comb_proc *proc)
638 {
639   close_all_comb_files (proc);
640   dict_destroy (proc->dict);
641   casewriter_destroy (proc->output);
642   case_matcher_destroy (proc->matcher);
643   subcase_destroy (&proc->by_vars);
644   case_unref (proc->buffered_case);
645   free (proc->prev_BY);
646 }
647 \f
648 static bool scan_table (struct comb_file *, union value by[]);
649 static struct ccase *create_output_case (const struct comb_proc *);
650 static void apply_case (const struct comb_file *, struct ccase *);
651 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
652                                          union value by[]);
653 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
654 static void output_buffered_case (struct comb_proc *);
655
656 /* Executes the ADD FILES command. */
657 static void
658 execute_add_files (struct comb_proc *proc)
659 {
660   union value *by;
661
662   while (case_matcher_match (proc->matcher, &by))
663     {
664       size_t i;
665
666       for (i = 0; i < proc->n_files; i++)
667         {
668           struct comb_file *file = &proc->files[i];
669           while (file->is_minimal)
670             {
671               struct ccase *output = create_output_case (proc);
672               apply_file_case_and_advance (file, output, by);
673               output_case (proc, output, by);
674             }
675         }
676     }
677   output_buffered_case (proc);
678 }
679
680 /* Executes the MATCH FILES command. */
681 static void
682 execute_match_files (struct comb_proc *proc)
683 {
684   union value *by;
685
686   while (case_matcher_match (proc->matcher, &by))
687     {
688       struct ccase *output;
689       size_t i;
690
691       output = create_output_case (proc);
692       for (i = proc->n_files; i-- > 0; )
693         {
694           struct comb_file *file = &proc->files[i];
695           if (file->type == COMB_FILE)
696             {
697               if (file->is_minimal)
698                 apply_file_case_and_advance (file, output, NULL);
699             }
700           else
701             {
702               if (scan_table (file, by))
703                 apply_case (file, output);
704             }
705         }
706       output_case (proc, output, by);
707     }
708   output_buffered_case (proc);
709 }
710
711 /* Executes the UPDATE command. */
712 static void
713 execute_update (struct comb_proc *proc)
714 {
715   union value *by;
716   size_t n_duplicates = 0;
717
718   while (case_matcher_match (proc->matcher, &by))
719     {
720       struct comb_file *first, *file;
721       struct ccase *output;
722
723       /* Find first nonnull case in array and make an output case
724          from it. */
725       output = create_output_case (proc);
726       for (first = &proc->files[0]; ; first++)
727         if (first->is_minimal)
728           break;
729       apply_file_case_and_advance (first, output, by);
730
731       /* Read additional cases and update the output case from
732          them.  (Don't update the output case from any duplicate
733          cases in the master file.) */
734       for (file = first + (first == proc->files);
735            file < &proc->files[proc->n_files]; file++)
736         {
737           while (file->is_minimal)
738             apply_file_case_and_advance (file, output, by);
739         }
740       casewriter_write (proc->output, output);
741
742       /* Write duplicate cases in the master file directly to the
743          output.  */
744       if (first == proc->files && first->is_minimal)
745         {
746           n_duplicates++;
747           while (first->is_minimal)
748             {
749               output = create_output_case (proc);
750               apply_file_case_and_advance (first, output, by);
751               casewriter_write (proc->output, output);
752             }
753         }
754     }
755
756   if (n_duplicates)
757     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
758          n_duplicates);
759 }
760
761 /* Reads FILE, which must be of type COMB_TABLE, until it
762    encounters a case with BY or greater for its BY variables.
763    Returns true if a case with exactly BY for its BY variables
764    was found, otherwise false. */
765 static bool
766 scan_table (struct comb_file *file, union value by[])
767 {
768   while (file->data != NULL)
769     {
770       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
771       if (cmp > 0)
772         {
773           case_unref (file->data);
774           file->data = casereader_read (file->reader);
775         }
776       else
777         return cmp == 0;
778     }
779   return false;
780 }
781
782 /* Creates and returns an output case for PROC, initializing each
783    of its values to system-missing or blanks, except that the
784    values of IN variables are set to 0. */
785 static struct ccase *
786 create_output_case (const struct comb_proc *proc)
787 {
788   size_t n_vars = dict_get_var_cnt (proc->dict);
789   struct ccase *output;
790   size_t i;
791
792   output = case_create (dict_get_next_value_idx (proc->dict));
793   for (i = 0; i < n_vars; i++)
794     {
795       struct variable *v = dict_get_var (proc->dict, i);
796       value_set_missing (case_data_rw (output, v), var_get_width (v));
797     }
798   for (i = 0; i < proc->n_files; i++)
799     {
800       struct comb_file *file = &proc->files[i];
801       if (file->in_var != NULL)
802         case_data_rw (output, file->in_var)->f = false;
803     }
804   return output;
805 }
806
807 /* Copies the data from FILE's case into output case OUTPUT.
808    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
809 static void
810 apply_case (const struct comb_file *file, struct ccase *output)
811 {
812   subcase_copy (&file->src, file->data, &file->dst, output);
813   if (file->in_var != NULL)
814     case_data_rw (output, file->in_var)->f = true;
815 }
816
817 /* Like apply_case() above, but also advances FILE to its next
818    case.  Also, if BY is nonnull, then FILE's is_minimal member
819    is updated based on whether the new case's BY values still
820    match those in BY. */
821 static void
822 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
823                              union value by[])
824 {
825   apply_case (file, output);
826   case_unref (file->data);
827   file->data = casereader_read (file->reader);
828   if (by)
829     file->is_minimal = (file->data != NULL
830                         && subcase_equal_cx (&file->by_vars, file->data, by));
831 }
832
833 /* Writes OUTPUT, whose BY values has been extracted into BY, to
834    PROC's output file, first initializing any FIRST or LAST
835    variables in OUTPUT to the correct values. */
836 static void
837 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
838 {
839   if (proc->first == NULL && proc->last == NULL)
840     casewriter_write (proc->output, output);
841   else
842     {
843       /* It's harder with LAST, because we can't know whether
844          this case is the last in a group until we've prepared
845          the *next* case also.  Thus, we buffer the previous
846          output case until the next one is ready. */
847       bool new_BY;
848       if (proc->prev_BY != NULL)
849         {
850           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
851           if (proc->last != NULL)
852             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
853           casewriter_write (proc->output, proc->buffered_case);
854         }
855       else
856         new_BY = true;
857
858       proc->buffered_case = output;
859       if (proc->first != NULL)
860         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
861
862       if (new_BY)
863         {
864           size_t n = (subcase_get_n_values (&proc->by_vars)
865                       * sizeof (union value));
866           if (proc->prev_BY == NULL)
867             proc->prev_BY = xmalloc (n);
868           memcpy (proc->prev_BY, by, n);
869         }
870     }
871 }
872
873 /* Writes a trailing buffered case to the output, if FIRST or
874    LAST is in use. */
875 static void
876 output_buffered_case (struct comb_proc *proc)
877 {
878   if (proc->prev_BY != NULL)
879     {
880       if (proc->last != NULL)
881         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
882       casewriter_write (proc->output, proc->buffered_case);
883       proc->buffered_case = NULL;
884     }
885 }