Merge commit 'origin/stable'
[pspp-builds.git] / src / language / data-io / combine-files.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-reader.h>
22 #include <data/case-matcher.h>
23 #include <data/case.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/format.h>
28 #include <data/procedure.h>
29 #include <data/subcase.h>
30 #include <data/variable.h>
31 #include <language/command.h>
32 #include <language/data-io/file-handle.h>
33 #include <language/data-io/trim.h>
34 #include <language/lexer/lexer.h>
35 #include <language/lexer/variable-parser.h>
36 #include <language/stats/sort-criteria.h>
37 #include <libpspp/assertion.h>
38 #include <libpspp/message.h>
39 #include <libpspp/taint.h>
40 #include <math/sort.h>
41
42 #include "xalloc.h"
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 enum comb_command_type
48   {
49     COMB_ADD,
50     COMB_MATCH,
51     COMB_UPDATE
52   };
53
54 /* File types. */
55 enum comb_file_type
56   {
57     COMB_FILE,                  /* Specified on FILE= subcommand. */
58     COMB_TABLE                  /* Specified on TABLE= subcommand. */
59   };
60
61 /* One FILE or TABLE subcommand. */
62 struct comb_file
63   {
64     /* Basics. */
65     enum comb_file_type type;   /* COMB_FILE or COMB_TABLE. */
66
67     /* Variables. */
68     struct subcase by_vars;     /* BY variables in this input file. */
69     struct subcase src, dst;    /* Data to copy to output; where to put it. */
70
71     /* Input files. */
72     struct file_handle *handle; /* Input file handle. */
73     struct dictionary *dict;    /* Input file dictionary. */
74     struct casereader *reader;  /* Input data source. */
75     struct ccase *data;         /* The current input case. */
76     bool is_minimal;            /* Does 'data' have minimum BY values across
77                                    all input files? */
78     bool is_sorted;             /* Is file presorted on the BY variables? */
79
80     /* IN subcommand. */
81     char in_name[VAR_NAME_LEN + 1];
82     struct variable *in_var;
83   };
84
85 struct comb_proc
86   {
87     struct comb_file *files;    /* All the files being merged. */
88     size_t n_files;             /* Number of files. */
89
90     struct dictionary *dict;    /* Dictionary of output file. */
91     struct subcase by_vars;     /* BY variables in the output. */
92     struct casewriter *output;  /* Destination for output. */
93
94     struct case_matcher *matcher;
95
96     /* FIRST, LAST.
97        Only if "first" or "last" is nonnull are the remaining
98        members used. */
99     struct variable *first;     /* Variable specified on FIRST (if any). */
100     struct variable *last;      /* Variable specified on LAST (if any). */
101     struct ccase *buffered_case; /* Case ready for output except that we don't
102                                     know the value for the LAST var yet. */
103     union value *prev_BY;       /* Values of BY vars in buffered_case. */
104   };
105
106 static int combine_files (enum comb_command_type, struct lexer *,
107                           struct dataset *);
108 static void free_comb_proc (struct comb_proc *);
109
110 static void close_all_comb_files (struct comb_proc *);
111 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
112
113 static void execute_update (struct comb_proc *);
114 static void execute_match_files (struct comb_proc *);
115 static void execute_add_files (struct comb_proc *);
116
117 static bool create_flag_var (const char *subcommand_name, const char *var_name,
118                              struct dictionary *, struct variable **);
119 static void output_case (struct comb_proc *, struct ccase *, union value *by);
120 static void output_buffered_case (struct comb_proc *);
121
122 int
123 cmd_add_files (struct lexer *lexer, struct dataset *ds)
124 {
125   return combine_files (COMB_ADD, lexer, ds);
126 }
127
128 int
129 cmd_match_files (struct lexer *lexer, struct dataset *ds)
130 {
131   return combine_files (COMB_MATCH, lexer, ds);
132 }
133
134 int
135 cmd_update (struct lexer *lexer, struct dataset *ds)
136 {
137   return combine_files (COMB_UPDATE, lexer, ds);
138 }
139
140 static int
141 combine_files (enum comb_command_type command,
142                struct lexer *lexer, struct dataset *ds)
143 {
144   struct comb_proc proc;
145
146   bool saw_by = false;
147   bool saw_sort = false;
148   struct casereader *active_file = NULL;
149
150   char first_name[VAR_NAME_LEN + 1] = "";
151   char last_name[VAR_NAME_LEN + 1] = "";
152
153   struct taint *taint = NULL;
154
155   size_t n_tables = 0;
156   size_t allocated_files = 0;
157
158   size_t i;
159
160   proc.files = NULL;
161   proc.n_files = 0;
162   proc.dict = dict_create ();
163   proc.output = NULL;
164   proc.matcher = NULL;
165   subcase_init_empty (&proc.by_vars);
166   proc.first = NULL;
167   proc.last = NULL;
168   proc.buffered_case = NULL;
169   proc.prev_BY = NULL;
170
171   dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
172
173   lex_match (lexer, '/');
174   for (;;)
175     {
176       struct comb_file *file;
177       enum comb_file_type type;
178
179       if (lex_match_id (lexer, "FILE"))
180         type = COMB_FILE;
181       else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
182         {
183           type = COMB_TABLE;
184           n_tables++;
185         }
186       else
187         break;
188       lex_match (lexer, '=');
189
190       if (proc.n_files >= allocated_files)
191         proc.files = x2nrealloc (proc.files, &allocated_files,
192                                 sizeof *proc.files);
193       file = &proc.files[proc.n_files++];
194       file->type = type;
195       subcase_init_empty (&file->by_vars);
196       subcase_init_empty (&file->src);
197       subcase_init_empty (&file->dst);
198       file->handle = NULL;
199       file->dict = NULL;
200       file->reader = NULL;
201       file->data = NULL;
202       file->is_sorted = true;
203       file->in_name[0] = '\0';
204       file->in_var = NULL;
205
206       if (lex_match (lexer, '*'))
207         {
208           if (!proc_has_active_file (ds))
209             {
210               msg (SE, _("Cannot specify the active file since no active "
211                          "file has been defined."));
212               goto error;
213             }
214
215           if (proc_make_temporary_transformations_permanent (ds))
216             msg (SE, _("This command may not be used after TEMPORARY when "
217                        "the active file is an input source.  "
218                        "Temporary transformations will be made permanent."));
219
220           file->dict = dict_clone (dataset_dict (ds));
221         }
222       else
223         {
224           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
225           if (file->handle == NULL)
226             goto error;
227
228           file->reader = any_reader_open (file->handle, &file->dict);
229           if (file->reader == NULL)
230             goto error;
231         }
232
233       while (lex_match (lexer, '/'))
234         if (lex_match_id (lexer, "RENAME"))
235           {
236             if (!parse_dict_rename (lexer, file->dict))
237               goto error;
238           }
239         else if (lex_match_id (lexer, "IN"))
240           {
241             lex_match (lexer, '=');
242             if (lex_token (lexer) != T_ID)
243               {
244                 lex_error (lexer, NULL);
245                 goto error;
246               }
247
248             if (file->in_name[0])
249               {
250                 msg (SE, _("Multiple IN subcommands for a single FILE or "
251                            "TABLE."));
252                 goto error;
253               }
254             strcpy (file->in_name, lex_tokid (lexer));
255             lex_get (lexer);
256           }
257         else if (lex_match_id (lexer, "SORT"))
258           {
259             file->is_sorted = false;
260             saw_sort = true;
261           }
262
263       merge_dictionary (proc.dict, file);
264     }
265
266   while (lex_token (lexer) != '.')
267     {
268       if (lex_match (lexer, T_BY))
269         {
270           const struct variable **by_vars;
271           size_t i;
272           bool ok;
273
274           if (saw_by)
275             {
276               lex_sbc_only_once ("BY");
277               goto error;
278             }
279           saw_by = true;
280
281           lex_match (lexer, '=');
282           if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
283                                     &by_vars, NULL))
284             goto error;
285
286           ok = true;
287           for (i = 0; i < proc.n_files; i++)
288             {
289               struct comb_file *file = &proc.files[i];
290               size_t j;
291
292               for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
293                 {
294                   const char *name = var_get_name (by_vars[j]);
295                   struct variable *var = dict_lookup_var (file->dict, name);
296                   if (var != NULL)
297                     subcase_add_var (&file->by_vars, var,
298                                      subcase_get_direction (&proc.by_vars, j));
299                   else
300                     {
301                       if (file->handle != NULL)
302                         msg (SE, _("File %s lacks BY variable %s."),
303                              fh_get_name (file->handle), name);
304                       else
305                         msg (SE, _("Active file lacks BY variable %s."), name);
306                       ok = false;
307                     }
308                 }
309               assert (!ok || subcase_conformable (&file->by_vars,
310                                                   &proc.files[0].by_vars));
311             }
312           free (by_vars);
313
314           if (!ok)
315             goto error;
316         }
317       else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
318         {
319           if (first_name[0] != '\0')
320             {
321               lex_sbc_only_once ("FIRST");
322               goto error;
323             }
324
325           lex_match (lexer, '=');
326           if (!lex_force_id (lexer))
327             goto error;
328           strcpy (first_name, lex_tokid (lexer));
329           lex_get (lexer);
330         }
331       else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
332         {
333           if (last_name[0] != '\0')
334             {
335               lex_sbc_only_once ("LAST");
336               goto error;
337             }
338
339           lex_match (lexer, '=');
340           if (!lex_force_id (lexer))
341             goto error;
342           strcpy (last_name, lex_tokid (lexer));
343           lex_get (lexer);
344         }
345       else if (lex_match_id (lexer, "MAP"))
346         {
347           /* FIXME. */
348         }
349       else if (lex_match_id (lexer, "DROP"))
350         {
351           if (!parse_dict_drop (lexer, proc.dict))
352             goto error;
353         }
354       else if (lex_match_id (lexer, "KEEP"))
355         {
356           if (!parse_dict_keep (lexer, proc.dict))
357             goto error;
358         }
359       else
360         {
361           lex_error (lexer, NULL);
362           goto error;
363         }
364
365       if (!lex_match (lexer, '/') && lex_token (lexer) != '.')
366         {
367           lex_end_of_command (lexer);
368           goto error;
369         }
370     }
371
372   if (!saw_by)
373     {
374       if (command == COMB_UPDATE)
375         {
376           msg (SE, _("The BY subcommand is required."));
377           goto error;
378         }
379       if (n_tables)
380         {
381           msg (SE, _("BY is required when %s is specified."), "TABLE");
382           goto error;
383         }
384       if (saw_sort)
385         {
386           msg (SE, _("BY is required when %s is specified."), "SORT");
387           goto error;
388         }
389     }
390
391   /* Add IN, FIRST, and LAST variables to master dictionary. */
392   for (i = 0; i < proc.n_files; i++)
393     {
394       struct comb_file *file = &proc.files[i];
395       if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
396         goto error;
397     }
398   if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
399       || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
400     goto error;
401
402   dict_delete_scratch_vars (proc.dict);
403   dict_compact_values (proc.dict);
404
405   /* Set up mapping from each file's variables to master
406      variables. */
407   for (i = 0; i < proc.n_files; i++)
408     {
409       struct comb_file *file = &proc.files[i];
410       size_t src_var_cnt = dict_get_var_cnt (file->dict);
411       size_t j;
412
413       for (j = 0; j < src_var_cnt; j++)
414         {
415           struct variable *src_var = dict_get_var (file->dict, j);
416           struct variable *dst_var = dict_lookup_var (proc.dict,
417                                                       var_get_name (src_var));
418           if (dst_var != NULL)
419             {
420               subcase_add_var (&file->src, src_var, SC_ASCEND);
421               subcase_add_var (&file->dst, dst_var, SC_ASCEND);
422             }
423         }
424     }
425
426   proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
427   taint = taint_clone (casewriter_get_taint (proc.output));
428
429   /* Set up case matcher. */
430   proc.matcher = case_matcher_create ();
431   for (i = 0; i < proc.n_files; i++)
432     {
433       struct comb_file *file = &proc.files[i];
434       if (file->reader == NULL)
435         {
436           if (active_file == NULL)
437             {
438               proc_discard_output (ds);
439               file->reader = active_file = proc_open (ds);
440             }
441           else
442             file->reader = casereader_clone (active_file);
443         }
444       if (!file->is_sorted)
445         file->reader = sort_execute (file->reader, &file->by_vars);
446       taint_propagate (casereader_get_taint (file->reader), taint);
447       file->data = casereader_read (file->reader);
448       if (file->type == COMB_FILE)
449         case_matcher_add_input (proc.matcher, &file->by_vars,
450                                 &file->data, &file->is_minimal);
451     }
452
453   if (command == COMB_ADD)
454     execute_add_files (&proc);
455   else if (command == COMB_MATCH)
456     execute_match_files (&proc);
457   else if (command == COMB_UPDATE)
458     execute_update (&proc);
459   else
460     NOT_REACHED ();
461
462   case_matcher_destroy (proc.matcher);
463   proc.matcher = NULL;
464   close_all_comb_files (&proc);
465   if (active_file != NULL)
466     proc_commit (ds);
467
468   proc_set_active_file (ds, casewriter_make_reader (proc.output), proc.dict);
469   proc.dict = NULL;
470   proc.output = NULL;
471
472   free_comb_proc (&proc);
473
474   return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
475
476  error:
477   if (active_file != NULL)
478     proc_commit (ds);
479   free_comb_proc (&proc);
480   taint_destroy (taint);
481   return CMD_CASCADING_FAILURE;
482 }
483
484 /* Merge the dictionary for file F into master dictionary M. */
485 static bool
486 merge_dictionary (struct dictionary *const m, struct comb_file *f)
487 {
488   struct dictionary *d = f->dict;
489   const char *d_docs, *m_docs;
490   int i;
491   const char *file_encoding;
492
493   if (dict_get_label (m) == NULL)
494     dict_set_label (m, dict_get_label (d));
495
496   d_docs = dict_get_documents (d);
497   m_docs = dict_get_documents (m);
498
499
500   /* FIXME: If the input files have different encodings, then
501      the result is undefined.
502      The correct thing to do would be to convert to an encoding
503      which can cope with all the input files (eg UTF-8).
504    */
505   file_encoding = dict_get_encoding (f->dict);
506   if ( file_encoding != NULL)
507     {
508       if ( dict_get_encoding (m) == NULL)
509         dict_set_encoding (m, file_encoding);
510       else if ( 0 != strcmp (file_encoding, dict_get_encoding (m)))
511         {
512           msg (MW,
513                _("Combining files with incompatible encodings. String data may not be represented correctly."));
514         }
515     }
516
517   if (d_docs != NULL)
518     {
519       if (m_docs == NULL)
520         dict_set_documents (m, d_docs);
521       else
522         {
523           char *new_docs = xasprintf ("%s%s", m_docs, d_docs);
524           dict_set_documents (m, new_docs);
525           free (new_docs);
526         }
527     }
528
529   for (i = 0; i < dict_get_var_cnt (d); i++)
530     {
531       struct variable *dv = dict_get_var (d, i);
532       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
533
534       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
535         continue;
536
537       if (mv != NULL)
538         {
539           if (var_get_width (mv) != var_get_width (dv))
540             {
541               const char *var_name = var_get_name (dv);
542               const char *file_name = fh_get_name (f->handle);
543               struct string s = DS_EMPTY_INITIALIZER;
544               ds_put_format (&s,
545                              _("Variable %s in file %s has different "
546                                "type or width from the same variable in "
547                                "earlier file."),
548                              var_name, file_name);
549               ds_put_cstr (&s, "  ");
550               if (var_is_numeric (dv))
551                 ds_put_format (&s, _("In file %s, %s is numeric."),
552                                file_name, var_name);
553               else
554                 ds_put_format (&s, _("In file %s, %s is a string variable "
555                                      "with width %d."),
556                                file_name, var_name, var_get_width (dv));
557               ds_put_cstr (&s, "  ");
558               if (var_is_numeric (mv))
559                 ds_put_format (&s, _("In an earlier file, %s was numeric."),
560                                var_name);
561               else
562                 ds_put_format (&s, _("In an earlier file, %s was a string "
563                                      "variable with width %d."),
564                                var_name, var_get_width (mv));
565               msg (SE, ds_cstr (&s));
566               ds_destroy (&s);
567               return false;
568             }
569
570           if (var_has_value_labels (dv) && !var_has_value_labels (mv))
571             var_set_value_labels (mv, var_get_value_labels (dv));
572           if (var_has_missing_values (dv) && !var_has_missing_values (mv))
573             var_set_missing_values (mv, var_get_missing_values (dv));
574           if (var_get_label (dv) && !var_get_label (mv))
575             var_set_label (mv, var_get_label (dv));
576         }
577       else
578         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
579     }
580
581   return true;
582 }
583
584 /* If VAR_NAME is a non-empty string, attempts to create a
585    variable named VAR_NAME, with format F1.0, in DICT, and stores
586    a pointer to the variable in *VAR.  Returns true if
587    successful, false if the variable name is a duplicate (in
588    which case a message saying that the variable specified on the
589    given SUBCOMMAND is a duplicate is emitted).  Also returns
590    true, without doing anything, if VAR_NAME is null or empty. */
591 static bool
592 create_flag_var (const char *subcommand, const char *var_name,
593                  struct dictionary *dict, struct variable **var)
594 {
595   if (var_name[0] != '\0')
596     {
597       struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
598       *var = dict_create_var (dict, var_name, 0);
599       if (*var == NULL)
600         {
601           msg (SE, _("Variable name %s specified on %s subcommand "
602                      "duplicates an existing variable name."),
603                subcommand, var_name);
604           return false;
605         }
606       var_set_both_formats (*var, &format);
607     }
608   else
609     *var = NULL;
610   return true;
611 }
612
613 /* Closes all the files in PROC and frees their associated data. */
614 static void
615 close_all_comb_files (struct comb_proc *proc)
616 {
617   size_t i;
618
619   for (i = 0; i < proc->n_files; i++)
620     {
621       struct comb_file *file = &proc->files[i];
622       subcase_destroy (&file->by_vars);
623       subcase_destroy (&file->src);
624       subcase_destroy (&file->dst);
625       fh_unref (file->handle);
626       dict_destroy (file->dict);
627       casereader_destroy (file->reader);
628       case_unref (file->data);
629     }
630   free (proc->files);
631   proc->files = NULL;
632   proc->n_files = 0;
633 }
634
635 /* Frees all the data for the procedure. */
636 static void
637 free_comb_proc (struct comb_proc *proc)
638 {
639   close_all_comb_files (proc);
640   dict_destroy (proc->dict);
641   casewriter_destroy (proc->output);
642   case_matcher_destroy (proc->matcher);
643   if (proc->prev_BY)
644     {
645       caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
646                                 proc->prev_BY);
647       free (proc->prev_BY);
648     }
649   subcase_destroy (&proc->by_vars);
650   case_unref (proc->buffered_case);
651 }
652 \f
653 static bool scan_table (struct comb_file *, union value by[]);
654 static struct ccase *create_output_case (const struct comb_proc *);
655 static void apply_case (const struct comb_file *, struct ccase *);
656 static void apply_file_case_and_advance (struct comb_file *, struct ccase *,
657                                          union value by[]);
658 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
659 static void output_buffered_case (struct comb_proc *);
660
661 /* Executes the ADD FILES command. */
662 static void
663 execute_add_files (struct comb_proc *proc)
664 {
665   union value *by;
666
667   while (case_matcher_match (proc->matcher, &by))
668     {
669       size_t i;
670
671       for (i = 0; i < proc->n_files; i++)
672         {
673           struct comb_file *file = &proc->files[i];
674           while (file->is_minimal)
675             {
676               struct ccase *output = create_output_case (proc);
677               apply_file_case_and_advance (file, output, by);
678               output_case (proc, output, by);
679             }
680         }
681     }
682   output_buffered_case (proc);
683 }
684
685 /* Executes the MATCH FILES command. */
686 static void
687 execute_match_files (struct comb_proc *proc)
688 {
689   union value *by;
690
691   while (case_matcher_match (proc->matcher, &by))
692     {
693       struct ccase *output;
694       size_t i;
695
696       output = create_output_case (proc);
697       for (i = proc->n_files; i-- > 0; )
698         {
699           struct comb_file *file = &proc->files[i];
700           if (file->type == COMB_FILE)
701             {
702               if (file->is_minimal)
703                 apply_file_case_and_advance (file, output, NULL);
704             }
705           else
706             {
707               if (scan_table (file, by))
708                 apply_case (file, output);
709             }
710         }
711       output_case (proc, output, by);
712     }
713   output_buffered_case (proc);
714 }
715
716 /* Executes the UPDATE command. */
717 static void
718 execute_update (struct comb_proc *proc)
719 {
720   union value *by;
721   size_t n_duplicates = 0;
722
723   while (case_matcher_match (proc->matcher, &by))
724     {
725       struct comb_file *first, *file;
726       struct ccase *output;
727
728       /* Find first nonnull case in array and make an output case
729          from it. */
730       output = create_output_case (proc);
731       for (first = &proc->files[0]; ; first++)
732         if (first->is_minimal)
733           break;
734       apply_file_case_and_advance (first, output, by);
735
736       /* Read additional cases and update the output case from
737          them.  (Don't update the output case from any duplicate
738          cases in the master file.) */
739       for (file = first + (first == proc->files);
740            file < &proc->files[proc->n_files]; file++)
741         {
742           while (file->is_minimal)
743             apply_file_case_and_advance (file, output, by);
744         }
745       casewriter_write (proc->output, output);
746
747       /* Write duplicate cases in the master file directly to the
748          output.  */
749       if (first == proc->files && first->is_minimal)
750         {
751           n_duplicates++;
752           while (first->is_minimal)
753             {
754               output = create_output_case (proc);
755               apply_file_case_and_advance (first, output, by);
756               casewriter_write (proc->output, output);
757             }
758         }
759     }
760
761   if (n_duplicates)
762     msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
763          n_duplicates);
764 }
765
766 /* Reads FILE, which must be of type COMB_TABLE, until it
767    encounters a case with BY or greater for its BY variables.
768    Returns true if a case with exactly BY for its BY variables
769    was found, otherwise false. */
770 static bool
771 scan_table (struct comb_file *file, union value by[])
772 {
773   while (file->data != NULL)
774     {
775       int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
776       if (cmp > 0)
777         {
778           case_unref (file->data);
779           file->data = casereader_read (file->reader);
780         }
781       else
782         return cmp == 0;
783     }
784   return false;
785 }
786
787 /* Creates and returns an output case for PROC, initializing each
788    of its values to system-missing or blanks, except that the
789    values of IN variables are set to 0. */
790 static struct ccase *
791 create_output_case (const struct comb_proc *proc)
792 {
793   size_t n_vars = dict_get_var_cnt (proc->dict);
794   struct ccase *output;
795   size_t i;
796
797   output = case_create (dict_get_proto (proc->dict));
798   for (i = 0; i < n_vars; i++)
799     {
800       struct variable *v = dict_get_var (proc->dict, i);
801       value_set_missing (case_data_rw (output, v), var_get_width (v));
802     }
803   for (i = 0; i < proc->n_files; i++)
804     {
805       struct comb_file *file = &proc->files[i];
806       if (file->in_var != NULL)
807         case_data_rw (output, file->in_var)->f = false;
808     }
809   return output;
810 }
811
812 /* Copies the data from FILE's case into output case OUTPUT.
813    If FILE has an IN variable, then it is set to 1 in OUTPUT. */
814 static void
815 apply_case (const struct comb_file *file, struct ccase *output)
816 {
817   subcase_copy (&file->src, file->data, &file->dst, output);
818   if (file->in_var != NULL)
819     case_data_rw (output, file->in_var)->f = true;
820 }
821
822 /* Like apply_case() above, but also advances FILE to its next
823    case.  Also, if BY is nonnull, then FILE's is_minimal member
824    is updated based on whether the new case's BY values still
825    match those in BY. */
826 static void
827 apply_file_case_and_advance (struct comb_file *file, struct ccase *output,
828                              union value by[])
829 {
830   apply_case (file, output);
831   case_unref (file->data);
832   file->data = casereader_read (file->reader);
833   if (by)
834     file->is_minimal = (file->data != NULL
835                         && subcase_equal_cx (&file->by_vars, file->data, by));
836 }
837
838 /* Writes OUTPUT, whose BY values has been extracted into BY, to
839    PROC's output file, first initializing any FIRST or LAST
840    variables in OUTPUT to the correct values. */
841 static void
842 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
843 {
844   if (proc->first == NULL && proc->last == NULL)
845     casewriter_write (proc->output, output);
846   else
847     {
848       /* It's harder with LAST, because we can't know whether
849          this case is the last in a group until we've prepared
850          the *next* case also.  Thus, we buffer the previous
851          output case until the next one is ready. */
852       bool new_BY;
853       if (proc->prev_BY != NULL)
854         {
855           new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
856           if (proc->last != NULL)
857             case_data_rw (proc->buffered_case, proc->last)->f = new_BY;
858           casewriter_write (proc->output, proc->buffered_case);
859         }
860       else
861         new_BY = true;
862
863       proc->buffered_case = output;
864       if (proc->first != NULL)
865         case_data_rw (proc->buffered_case, proc->first)->f = new_BY;
866
867       if (new_BY)
868         {
869           size_t n_values = subcase_get_n_fields (&proc->by_vars);
870           const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
871           if (proc->prev_BY == NULL)
872             {
873               proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
874               caseproto_init_values (proto, proc->prev_BY);
875             }
876           caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
877                           proc->prev_BY, by);
878         }
879     }
880 }
881
882 /* Writes a trailing buffered case to the output, if FIRST or
883    LAST is in use. */
884 static void
885 output_buffered_case (struct comb_proc *proc)
886 {
887   if (proc->prev_BY != NULL)
888     {
889       if (proc->last != NULL)
890         case_data_rw (proc->buffered_case, proc->last)->f = 1.0;
891       casewriter_write (proc->output, proc->buffered_case);
892       proc->buffered_case = NULL;
893     }
894 }