Updated the docs and the parser for EXAMINE to make it clearer which options
[pspp] / src / language / data-io / get.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-reader.h>
25 #include <data/any-writer.h>
26 #include <data/case-sink.h>
27 #include <data/case-source.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/por-file-writer.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/sys-file-writer.h>
37 #include <data/transformations.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/command.h>
41 #include <language/data-io/file-handle.h>
42 #include <language/lexer/lexer.h>
43 #include <language/lexer/variable-parser.h>
44 #include <libpspp/alloc.h>
45 #include <libpspp/assertion.h>
46 #include <libpspp/compiler.h>
47 #include <libpspp/hash.h>
48 #include <libpspp/message.h>
49 #include <libpspp/message.h>
50 #include <libpspp/misc.h>
51 #include <libpspp/str.h>
52
53 #include "gettext.h"
54 #define _(msgid) gettext (msgid)
55
56 /* Rearranging and reducing a dictionary. */
57 static void start_case_map (struct dictionary *);
58 static struct case_map *finish_case_map (struct dictionary *);
59 static void map_case (const struct case_map *,
60                       const struct ccase *, struct ccase *);
61 static void destroy_case_map (struct case_map *);
62
63 static bool parse_dict_trim (struct dictionary *);
64 \f
65 /* Reading system and portable files. */
66
67 /* Type of command. */
68 enum reader_command 
69   {
70     GET_CMD,
71     IMPORT_CMD
72   };
73
74 /* Case reader input program. */
75 struct case_reader_pgm 
76   {
77     struct any_reader *reader;  /* File reader. */
78     struct case_map *map;       /* Map from file dict to active file dict. */
79     struct ccase bounce;        /* Bounce buffer. */
80   };
81
82 static const struct case_source_class case_reader_source_class;
83
84 static void case_reader_pgm_free (struct case_reader_pgm *);
85
86 /* Parses a GET or IMPORT command. */
87 static int
88 parse_read_command (struct dataset *ds, enum reader_command type)
89 {
90   struct case_reader_pgm *pgm = NULL;
91   struct file_handle *fh = NULL;
92   struct dictionary *dict = NULL;
93
94   for (;;)
95     {
96       lex_match ('/');
97
98       if (lex_match_id ("FILE") || token == T_STRING)
99         {
100           lex_match ('=');
101
102           fh = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
103           if (fh == NULL)
104             goto error;
105         }
106       else if (type == IMPORT_CMD && lex_match_id ("TYPE"))
107         {
108           lex_match ('=');
109
110           if (lex_match_id ("COMM"))
111             type = PFM_COMM;
112           else if (lex_match_id ("TAPE"))
113             type = PFM_TAPE;
114           else
115             {
116               lex_error (_("expecting COMM or TAPE"));
117               goto error;
118             }
119         }
120       else
121         break; 
122     }
123   
124   if (fh == NULL) 
125     {
126       lex_sbc_missing ("FILE");
127       goto error;
128     }
129               
130   discard_variables (ds);
131
132   pgm = xmalloc (sizeof *pgm);
133   pgm->reader = any_reader_open (fh, &dict);
134   pgm->map = NULL;
135   case_nullify (&pgm->bounce);
136   if (pgm->reader == NULL)
137     goto error;
138
139   case_create (&pgm->bounce, dict_get_next_value_idx (dict));
140   
141   start_case_map (dict);
142
143   while (token != '.')
144     {
145       lex_match ('/');
146       if (!parse_dict_trim (dict))
147         goto error;
148     }
149
150   pgm->map = finish_case_map (dict);
151   
152   dict_destroy (dataset_dict (ds));
153   dataset_set_dict (ds, dict);
154
155   proc_set_source (ds, 
156                    create_case_source (&case_reader_source_class, pgm));
157
158   return CMD_SUCCESS;
159
160  error:
161   case_reader_pgm_free (pgm);
162   if (dict != NULL)
163     dict_destroy (dict);
164   return CMD_CASCADING_FAILURE;
165 }
166
167 /* Frees a struct case_reader_pgm. */
168 static void
169 case_reader_pgm_free (struct case_reader_pgm *pgm) 
170 {
171   if (pgm != NULL) 
172     {
173       any_reader_close (pgm->reader);
174       destroy_case_map (pgm->map);
175       case_destroy (&pgm->bounce);
176       free (pgm);
177     }
178 }
179
180 /* Clears internal state related to case reader input procedure. */
181 static void
182 case_reader_source_destroy (struct case_source *source)
183 {
184   struct case_reader_pgm *pgm = source->aux;
185   case_reader_pgm_free (pgm);
186 }
187
188 /* Reads all the cases from the data file into C and passes them
189    to WRITE_CASE one by one, passing WC_DATA.
190    Returns true if successful, false if an I/O error occurred. */
191 static bool
192 case_reader_source_read (struct case_source *source,
193                          struct ccase *c,
194                          write_case_func *write_case, write_case_data wc_data)
195 {
196   struct case_reader_pgm *pgm = source->aux;
197   bool ok = true;
198
199   do
200     {
201       bool got_case;
202       if (pgm->map == NULL)
203         got_case = any_reader_read (pgm->reader, c);
204       else
205         {
206           got_case = any_reader_read (pgm->reader, &pgm->bounce);
207           if (got_case)
208             map_case (pgm->map, &pgm->bounce, c);
209         }
210       if (!got_case)
211         break;
212
213       ok = write_case (wc_data);
214     }
215   while (ok);
216
217   return ok && !any_reader_error (pgm->reader);
218 }
219
220 static const struct case_source_class case_reader_source_class =
221   {
222     "case reader",
223     NULL,
224     case_reader_source_read,
225     case_reader_source_destroy,
226   };
227 \f
228 /* GET. */
229 int
230 cmd_get (struct dataset *ds) 
231 {
232   return parse_read_command (ds, GET_CMD);
233 }
234
235 /* IMPORT. */
236 int
237 cmd_import (struct dataset *ds) 
238 {
239   return parse_read_command (ds, IMPORT_CMD);
240 }
241 \f
242 /* Writing system and portable files. */ 
243
244 /* Type of output file. */
245 enum writer_type
246   {
247     SYSFILE_WRITER,     /* System file. */
248     PORFILE_WRITER      /* Portable file. */
249   };
250
251 /* Type of a command. */
252 enum command_type 
253   {
254     XFORM_CMD,          /* Transformation. */
255     PROC_CMD            /* Procedure. */
256   };
257
258 /* File writer plus a case map. */
259 struct case_writer
260   {
261     struct any_writer *writer;  /* File writer. */
262     struct case_map *map;       /* Map to output file dictionary
263                                    (null pointer for identity mapping). */
264     struct ccase bounce;        /* Bounce buffer for mapping (if needed). */
265   };
266
267 /* Destroys AW. */
268 static bool
269 case_writer_destroy (struct case_writer *aw)
270 {
271   bool ok = true;
272   if (aw != NULL) 
273     {
274       ok = any_writer_close (aw->writer);
275       destroy_case_map (aw->map);
276       case_destroy (&aw->bounce);
277       free (aw);
278     }
279   return ok;
280 }
281
282 /* Parses SAVE or XSAVE or EXPORT or XEXPORT command.
283    WRITER_TYPE identifies the type of file to write,
284    and COMMAND_TYPE identifies the type of command.
285
286    On success, returns a writer.
287    For procedures only, sets *RETAIN_UNSELECTED to true if cases
288    that would otherwise be excluded by FILTER or USE should be
289    included.
290
291    On failure, returns a null pointer. */
292 static struct case_writer *
293 parse_write_command (struct dataset *ds, 
294                      enum writer_type writer_type,
295                      enum command_type command_type,
296                      bool *retain_unselected)
297 {
298   /* Common data. */
299   struct file_handle *handle; /* Output file. */
300   struct dictionary *dict;    /* Dictionary for output file. */
301   struct case_writer *aw;      /* Writer. */  
302
303   /* Common options. */
304   bool print_map;             /* Print map?  TODO. */
305   bool print_short_names;     /* Print long-to-short name map.  TODO. */
306   struct sfm_write_options sysfile_opts;
307   struct pfm_write_options porfile_opts;
308
309   assert (writer_type == SYSFILE_WRITER || writer_type == PORFILE_WRITER);
310   assert (command_type == XFORM_CMD || command_type == PROC_CMD);
311   assert ((retain_unselected != NULL) == (command_type == PROC_CMD));
312
313   if (command_type == PROC_CMD)
314     *retain_unselected = true;
315
316   handle = NULL;
317   dict = dict_clone (dataset_dict (ds));
318   aw = xmalloc (sizeof *aw);
319   aw->writer = NULL;
320   aw->map = NULL;
321   case_nullify (&aw->bounce);
322   print_map = false;
323   print_short_names = false;
324   sysfile_opts = sfm_writer_default_options ();
325   porfile_opts = pfm_writer_default_options ();
326
327   start_case_map (dict);
328   dict_delete_scratch_vars (dict);
329
330   lex_match ('/');
331   for (;;)
332     {
333       if (lex_match_id ("OUTFILE"))
334         {
335           if (handle != NULL) 
336             {
337               lex_sbc_only_once ("OUTFILE");
338               goto error; 
339             }
340           
341           lex_match ('=');
342       
343           handle = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
344           if (handle == NULL)
345             goto error;
346         }
347       else if (lex_match_id ("NAMES"))
348         print_short_names = true;
349       else if (lex_match_id ("PERMISSIONS")) 
350         {
351           bool cw;
352           
353           lex_match ('=');
354           if (lex_match_id ("READONLY"))
355             cw = false;
356           else if (lex_match_id ("WRITEABLE"))
357             cw = true;
358           else
359             {
360               lex_error (_("expecting %s or %s"), "READONLY", "WRITEABLE");
361               goto error;
362             }
363           sysfile_opts.create_writeable = porfile_opts.create_writeable = cw;
364         }
365       else if (command_type == PROC_CMD && lex_match_id ("UNSELECTED")) 
366         {
367           lex_match ('=');
368           if (lex_match_id ("RETAIN"))
369             *retain_unselected = true;
370           else if (lex_match_id ("DELETE"))
371             *retain_unselected = false;
372           else
373             {
374               lex_error (_("expecting %s or %s"), "RETAIN", "DELETE");
375               goto error;
376             }
377         }
378       else if (writer_type == SYSFILE_WRITER && lex_match_id ("COMPRESSED"))
379         sysfile_opts.compress = true;
380       else if (writer_type == SYSFILE_WRITER && lex_match_id ("UNCOMPRESSED"))
381         sysfile_opts.compress = false;
382       else if (writer_type == SYSFILE_WRITER && lex_match_id ("VERSION"))
383         {
384           lex_match ('=');
385           if (!lex_force_int ())
386             goto error;
387           sysfile_opts.version = lex_integer ();
388           lex_get ();
389         }
390       else if (writer_type == PORFILE_WRITER && lex_match_id ("TYPE")) 
391         {
392           lex_match ('=');
393           if (lex_match_id ("COMMUNICATIONS"))
394             porfile_opts.type = PFM_COMM;
395           else if (lex_match_id ("TAPE"))
396             porfile_opts.type = PFM_TAPE;
397           else
398             {
399               lex_error (_("expecting %s or %s"), "COMM", "TAPE");
400               goto error;
401             }
402         }
403       else if (writer_type == PORFILE_WRITER && lex_match_id ("DIGITS")) 
404         {
405           lex_match ('=');
406           if (!lex_force_int ())
407             goto error;
408           porfile_opts.digits = lex_integer ();
409           lex_get ();
410         }
411       else if (!parse_dict_trim (dict))
412         goto error;
413       
414       if (!lex_match ('/'))
415         break;
416     }
417   if (lex_end_of_command () != CMD_SUCCESS)
418     goto error;
419
420   if (handle == NULL) 
421     {
422       lex_sbc_missing ("OUTFILE");
423       goto error;
424     }
425
426   dict_compact_values (dict);
427   aw->map = finish_case_map (dict);
428   if (aw->map != NULL)
429     case_create (&aw->bounce, dict_get_next_value_idx (dict));
430
431   if (fh_get_referent (handle) == FH_REF_FILE) 
432     {
433       switch (writer_type) 
434         {
435         case SYSFILE_WRITER:
436           aw->writer = any_writer_from_sfm_writer (
437             sfm_open_writer (handle, dict, sysfile_opts));
438           break;
439         case PORFILE_WRITER:
440           aw->writer = any_writer_from_pfm_writer (
441             pfm_open_writer (handle, dict, porfile_opts));
442           break;
443         }
444     }
445   else
446     aw->writer = any_writer_open (handle, dict);
447   if (aw->writer == NULL)
448     goto error;
449   dict_destroy (dict);
450   
451   return aw;
452
453  error:
454   case_writer_destroy (aw);
455   dict_destroy (dict);
456   return NULL;
457 }
458
459 /* Writes case C to writer AW. */
460 static bool
461 case_writer_write_case (struct case_writer *aw, const struct ccase *c) 
462 {
463   if (aw->map != NULL) 
464     {
465       map_case (aw->map, c, &aw->bounce);
466       c = &aw->bounce; 
467     }
468   return any_writer_write (aw->writer, c);
469 }
470 \f
471 /* SAVE and EXPORT. */
472
473 static bool output_proc (const struct ccase *, void *, const struct dataset *);
474
475 /* Parses and performs the SAVE or EXPORT procedure. */
476 static int
477 parse_output_proc (struct dataset *ds, enum writer_type writer_type)
478 {
479   bool retain_unselected;
480   struct variable *saved_filter_variable;
481   struct case_writer *aw;
482   bool ok;
483
484   aw = parse_write_command (ds, writer_type, PROC_CMD, &retain_unselected);
485   if (aw == NULL) 
486     return CMD_CASCADING_FAILURE;
487
488   saved_filter_variable = dict_get_filter (dataset_dict (ds));
489   if (retain_unselected) 
490     dict_set_filter (dataset_dict (ds), NULL);
491   ok = procedure (ds, output_proc, aw);
492   dict_set_filter (dataset_dict (ds), saved_filter_variable);
493
494   case_writer_destroy (aw);
495   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
496 }
497
498 /* Writes case C to file. */
499 static bool
500 output_proc (const struct ccase *c, void *aw_, const struct dataset *ds UNUSED) 
501 {
502   struct case_writer *aw = aw_;
503   return case_writer_write_case (aw, c);
504 }
505
506 int
507 cmd_save (struct dataset *ds) 
508 {
509   return parse_output_proc (ds, SYSFILE_WRITER);
510 }
511
512 int
513 cmd_export (struct dataset *ds) 
514 {
515   return parse_output_proc (ds, PORFILE_WRITER);
516 }
517 \f
518 /* XSAVE and XEXPORT. */
519
520 /* Transformation. */
521 struct output_trns 
522   {
523     struct case_writer *aw;      /* Writer. */
524   };
525
526 static trns_proc_func output_trns_proc;
527 static trns_free_func output_trns_free;
528
529 /* Parses the XSAVE or XEXPORT transformation command. */
530 static int
531 parse_output_trns (struct dataset *ds, enum writer_type writer_type) 
532 {
533   struct output_trns *t = xmalloc (sizeof *t);
534   t->aw = parse_write_command (ds, writer_type, XFORM_CMD, NULL);
535   if (t->aw == NULL) 
536     {
537       free (t);
538       return CMD_CASCADING_FAILURE;
539     }
540
541   add_transformation (ds, output_trns_proc, output_trns_free, t);
542   return CMD_SUCCESS;
543 }
544
545 /* Writes case C to the system file specified on XSAVE or XEXPORT. */
546 static int
547 output_trns_proc (void *trns_, struct ccase *c, casenumber case_num UNUSED)
548 {
549   struct output_trns *t = trns_;
550   case_writer_write_case (t->aw, c);
551   return TRNS_CONTINUE;
552 }
553
554 /* Frees an XSAVE or XEXPORT transformation.
555    Returns true if successful, false if an I/O error occurred. */
556 static bool
557 output_trns_free (void *trns_)
558 {
559   struct output_trns *t = trns_;
560   bool ok = true;
561
562   if (t != NULL)
563     {
564       ok = case_writer_destroy (t->aw);
565       free (t);
566     }
567   return ok;
568 }
569
570 /* XSAVE command. */
571 int
572 cmd_xsave (struct dataset *ds) 
573 {
574   return parse_output_trns (ds, SYSFILE_WRITER);
575 }
576
577 /* XEXPORT command. */
578 int
579 cmd_xexport (struct dataset *ds) 
580 {
581   return parse_output_trns (ds, PORFILE_WRITER);
582 }
583 \f
584 static bool rename_variables (struct dictionary *dict);
585 static bool drop_variables (struct dictionary *dict);
586 static bool keep_variables (struct dictionary *dict);
587
588 /* Commands that read and write system files share a great deal
589    of common syntactic structure for rearranging and dropping
590    variables.  This function parses this syntax and modifies DICT
591    appropriately.  Returns true on success, false on failure. */
592 static bool
593 parse_dict_trim (struct dictionary *dict)
594 {
595   if (lex_match_id ("MAP")) 
596     {
597       /* FIXME. */
598       return true;
599     }
600   else if (lex_match_id ("DROP"))
601     return drop_variables (dict);
602   else if (lex_match_id ("KEEP"))
603     return keep_variables (dict);
604   else if (lex_match_id ("RENAME"))
605     return rename_variables (dict);
606   else
607     {
608       lex_error (_("expecting a valid subcommand"));
609       return false;
610     }
611 }
612
613 /* Parses and performs the RENAME subcommand of GET and SAVE. */
614 static bool
615 rename_variables (struct dictionary *dict)
616 {
617   size_t i;
618
619   int success = 0;
620
621   struct variable **v;
622   char **new_names;
623   size_t nv, nn;
624   char *err_name;
625
626   int group;
627
628   lex_match ('=');
629   if (token != '(')
630     {
631       struct variable *v;
632
633       v = parse_variable (dict);
634       if (v == NULL)
635         return 0;
636       if (!lex_force_match ('=')
637           || !lex_force_id ())
638         return 0;
639       if (dict_lookup_var (dict, tokid) != NULL)
640         {
641           msg (SE, _("Cannot rename %s as %s because there already exists "
642                      "a variable named %s.  To rename variables with "
643                      "overlapping names, use a single RENAME subcommand "
644                      "such as \"/RENAME (A=B)(B=C)(C=A)\", or equivalently, "
645                      "\"/RENAME (A B C=B C A)\"."), v->name, tokid, tokid);
646           return 0;
647         }
648       
649       dict_rename_var (dict, v, tokid);
650       lex_get ();
651       return 1;
652     }
653
654   nv = nn = 0;
655   v = NULL;
656   new_names = 0;
657   group = 1;
658   while (lex_match ('('))
659     {
660       size_t old_nv = nv;
661
662       if (!parse_variables (dict, &v, &nv, PV_NO_DUPLICATE | PV_APPEND))
663         goto done;
664       if (!lex_match ('='))
665         {
666           msg (SE, _("`=' expected after variable list."));
667           goto done;
668         }
669       if (!parse_DATA_LIST_vars (&new_names, &nn, PV_APPEND | PV_NO_SCRATCH))
670         goto done;
671       if (nn != nv)
672         {
673           msg (SE, _("Number of variables on left side of `=' (%d) does not "
674                      "match number of variables on right side (%d), in "
675                      "parenthesized group %d of RENAME subcommand."),
676                (unsigned) (nv - old_nv), (unsigned) (nn - old_nv), group);
677           goto done;
678         }
679       if (!lex_force_match (')'))
680         goto done;
681       group++;
682     }
683
684   if (!dict_rename_vars (dict, v, new_names, nv, &err_name)) 
685     {
686       msg (SE, _("Requested renaming duplicates variable name %s."), err_name);
687       goto done;
688     }
689   success = 1;
690
691  done:
692   for (i = 0; i < nn; i++)
693     free (new_names[i]);
694   free (new_names);
695   free (v);
696
697   return success;
698 }
699
700 /* Parses and performs the DROP subcommand of GET and SAVE.
701    Returns true if successful, false on failure.*/
702 static bool
703 drop_variables (struct dictionary *dict)
704 {
705   struct variable **v;
706   size_t nv;
707
708   lex_match ('=');
709   if (!parse_variables (dict, &v, &nv, PV_NONE))
710     return false;
711   dict_delete_vars (dict, v, nv);
712   free (v);
713
714   if (dict_get_var_cnt (dict) == 0)
715     {
716       msg (SE, _("Cannot DROP all variables from dictionary."));
717       return false;
718     }
719   return true;
720 }
721
722 /* Parses and performs the KEEP subcommand of GET and SAVE.
723    Returns true if successful, false on failure.*/
724 static bool
725 keep_variables (struct dictionary *dict)
726 {
727   struct variable **v;
728   size_t nv;
729   size_t i;
730
731   lex_match ('=');
732   if (!parse_variables (dict, &v, &nv, PV_NONE))
733     return false;
734
735   /* Move the specified variables to the beginning. */
736   dict_reorder_vars (dict, v, nv);
737           
738   /* Delete the remaining variables. */
739   v = xnrealloc (v, dict_get_var_cnt (dict) - nv, sizeof *v);
740   for (i = nv; i < dict_get_var_cnt (dict); i++)
741     v[i - nv] = dict_get_var (dict, i);
742   dict_delete_vars (dict, v, dict_get_var_cnt (dict) - nv);
743   free (v);
744
745   return true;
746 }
747 \f
748 /* MATCH FILES. */
749
750 /* File types. */
751 enum
752   {
753     MTF_FILE,                   /* Specified on FILE= subcommand. */
754     MTF_TABLE                   /* Specified on TABLE= subcommand. */
755   };
756
757 /* One of the files on MATCH FILES. */
758 struct mtf_file
759   {
760     struct mtf_file *next, *prev; /* Next, previous in the list of files. */
761     struct mtf_file *next_min;  /* Next in the chain of minimums. */
762     
763     int type;                   /* One of MTF_*. */
764     struct variable **by;       /* List of BY variables for this file. */
765     struct file_handle *handle; /* File handle. */
766     struct any_reader *reader;  /* File reader. */
767     struct dictionary *dict;    /* Dictionary from system file. */
768
769     /* IN subcommand. */
770     char *in_name;              /* Variable name. */
771     struct variable *in_var;    /* Variable (in master dictionary). */
772
773     struct ccase input;         /* Input record. */
774   };
775
776 /* MATCH FILES procedure. */
777 struct mtf_proc 
778   {
779     struct mtf_file *head;      /* First file mentioned on FILE or TABLE. */
780     struct mtf_file *tail;      /* Last file mentioned on FILE or TABLE. */
781
782     bool ok;                    /* False if I/O error occurs. */
783
784     size_t by_cnt;              /* Number of variables on BY subcommand. */
785
786     /* Names of FIRST, LAST variables. */
787     char first[LONG_NAME_LEN + 1], last[LONG_NAME_LEN + 1];
788     
789     struct dictionary *dict;    /* Dictionary of output file. */
790     struct casefile *output;    /* MATCH FILES output. */
791     struct ccase mtf_case;      /* Case used for output. */
792
793     unsigned seq_num;           /* Have we initialized this variable? */
794     unsigned *seq_nums;         /* Sequence numbers for each var in dict. */
795   };
796
797 static bool mtf_free (struct mtf_proc *);
798 static bool mtf_close_file (struct mtf_file *);
799 static int mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
800 static bool mtf_delete_file_in_place (struct mtf_proc *, struct mtf_file **);
801
802 static bool mtf_read_nonactive_records (void *);
803 static bool mtf_processing_finish (void *, const struct dataset *);
804 static bool mtf_processing (const struct ccase *, void *, const struct dataset *);
805
806 static char *var_type_description (struct variable *);
807
808 static void set_master (struct variable *, struct variable *master);
809 static struct variable *get_master (struct variable *);
810
811 /* Parse and execute the MATCH FILES command. */
812 int
813 cmd_match_files (struct dataset *ds)
814 {
815   struct mtf_proc mtf;
816   struct mtf_file *first_table = NULL;
817   struct mtf_file *iter;
818   
819   bool used_active_file = false;
820   bool saw_table = false;
821   bool saw_in = false;
822
823   bool ok;
824   
825   mtf.head = mtf.tail = NULL;
826   mtf.by_cnt = 0;
827   mtf.first[0] = '\0';
828   mtf.last[0] = '\0';
829   mtf.dict = dict_create ();
830   mtf.output = NULL;
831   case_nullify (&mtf.mtf_case);
832   mtf.seq_num = 0;
833   mtf.seq_nums = NULL;
834   dict_set_case_limit (mtf.dict, dict_get_case_limit (dataset_dict (ds)));
835
836   lex_match ('/');
837   while (token == T_ID
838          && (lex_id_match ("FILE", tokid) || lex_id_match ("TABLE", tokid)))
839     {
840       struct mtf_file *file = xmalloc (sizeof *file);
841
842       if (lex_match_id ("FILE"))
843         file->type = MTF_FILE;
844       else if (lex_match_id ("TABLE"))
845         {
846           file->type = MTF_TABLE;
847           saw_table = true;
848         }
849       else
850         NOT_REACHED ();
851       lex_match ('=');
852
853       file->by = NULL;
854       file->handle = NULL;
855       file->reader = NULL;
856       file->dict = NULL;
857       file->in_name = NULL;
858       file->in_var = NULL;
859       case_nullify (&file->input);
860
861       /* FILEs go first, then TABLEs. */
862       if (file->type == MTF_TABLE || first_table == NULL)
863         {
864           file->next = NULL;
865           file->prev = mtf.tail;
866           if (mtf.tail)
867             mtf.tail->next = file;
868           mtf.tail = file;
869           if (mtf.head == NULL)
870             mtf.head = file;
871           if (file->type == MTF_TABLE && first_table == NULL)
872             first_table = file;
873         }
874       else 
875         {
876           assert (file->type == MTF_FILE);
877           file->next = first_table;
878           file->prev = first_table->prev;
879           if (first_table->prev)
880             first_table->prev->next = file;
881           else
882             mtf.head = file;
883           first_table->prev = file;
884         }
885
886       if (lex_match ('*'))
887         {
888           file->handle = NULL;
889           file->reader = NULL;
890               
891           if (used_active_file)
892             {
893               msg (SE, _("The active file may not be specified more "
894                          "than once."));
895               goto error;
896             }
897           used_active_file = true;
898
899           if (!proc_has_source (ds))
900             {
901               msg (SE, _("Cannot specify the active file since no active "
902                          "file has been defined."));
903               goto error;
904             }
905
906           if (proc_make_temporary_transformations_permanent (ds))
907             msg (SE,
908                  _("MATCH FILES may not be used after TEMPORARY when "
909                    "the active file is an input source.  "
910                    "Temporary transformations will be made permanent."));
911
912           file->dict = dataset_dict (ds);
913         }
914       else
915         {
916           file->handle = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
917           if (file->handle == NULL)
918             goto error;
919
920           file->reader = any_reader_open (file->handle, &file->dict);
921           if (file->reader == NULL)
922             goto error;
923
924           case_create (&file->input, dict_get_next_value_idx (file->dict));
925         }
926
927       while (lex_match ('/'))
928         if (lex_match_id ("RENAME")) 
929           {
930             if (!rename_variables (file->dict))
931               goto error; 
932           }
933         else if (lex_match_id ("IN"))
934           {
935             lex_match ('=');
936             if (token != T_ID)
937               {
938                 lex_error (NULL);
939                 goto error;
940               }
941
942             if (file->in_name != NULL)
943               {
944                 msg (SE, _("Multiple IN subcommands for a single FILE or "
945                            "TABLE."));
946                 goto error;
947               }
948             file->in_name = xstrdup (tokid);
949             lex_get ();
950             saw_in = true;
951           }
952
953       mtf_merge_dictionary (mtf.dict, file);
954     }
955   
956   while (token != '.')
957     {
958       if (lex_match (T_BY))
959         {
960           struct variable **by;
961           
962           if (mtf.by_cnt)
963             {
964               msg (SE, _("BY may appear at most once."));
965               goto error;
966             }
967               
968           lex_match ('=');
969           if (!parse_variables (mtf.dict, &by, &mtf.by_cnt,
970                                 PV_NO_DUPLICATE | PV_NO_SCRATCH))
971             goto error;
972
973           for (iter = mtf.head; iter != NULL; iter = iter->next)
974             {
975               size_t i;
976           
977               iter->by = xnmalloc (mtf.by_cnt, sizeof *iter->by);
978
979               for (i = 0; i < mtf.by_cnt; i++)
980                 {
981                   iter->by[i] = dict_lookup_var (iter->dict, by[i]->name);
982                   if (iter->by[i] == NULL)
983                     {
984                       msg (SE, _("File %s lacks BY variable %s."),
985                            iter->handle ? fh_get_name (iter->handle) : "*",
986                            by[i]->name);
987                       free (by);
988                       goto error;
989                     }
990                 }
991             }
992           free (by);
993         }
994       else if (lex_match_id ("FIRST")) 
995         {
996           if (mtf.first[0] != '\0')
997             {
998               msg (SE, _("FIRST may appear at most once."));
999               goto error;
1000             }
1001               
1002           lex_match ('=');
1003           if (!lex_force_id ())
1004             goto error;
1005           strcpy (mtf.first, tokid);
1006           lex_get ();
1007         }
1008       else if (lex_match_id ("LAST")) 
1009         {
1010           if (mtf.last[0] != '\0')
1011             {
1012               msg (SE, _("LAST may appear at most once."));
1013               goto error;
1014             }
1015               
1016           lex_match ('=');
1017           if (!lex_force_id ())
1018             goto error;
1019           strcpy (mtf.last, tokid);
1020           lex_get ();
1021         }
1022       else if (lex_match_id ("MAP"))
1023         {
1024           /* FIXME. */
1025         }
1026       else if (lex_match_id ("DROP")) 
1027         {
1028           if (!drop_variables (mtf.dict))
1029             goto error;
1030         }
1031       else if (lex_match_id ("KEEP")) 
1032         {
1033           if (!keep_variables (mtf.dict))
1034             goto error;
1035         }
1036       else
1037         {
1038           lex_error (NULL);
1039           goto error;
1040         }
1041
1042       if (!lex_match ('/') && token != '.') 
1043         {
1044           lex_end_of_command ();
1045           goto error;
1046         }
1047     }
1048
1049   if (mtf.by_cnt == 0)
1050     {
1051       if (saw_table)
1052         {
1053           msg (SE, _("BY is required when TABLE is specified."));
1054           goto error;
1055         }
1056       if (saw_in)
1057         {
1058           msg (SE, _("BY is required when IN is specified."));
1059           goto error;
1060         }
1061     }
1062
1063   /* Set up mapping from each file's variables to master
1064      variables. */
1065   for (iter = mtf.head; iter != NULL; iter = iter->next)
1066     {
1067       struct dictionary *d = iter->dict;
1068       int i;
1069
1070       for (i = 0; i < dict_get_var_cnt (d); i++)
1071         {
1072           struct variable *v = dict_get_var (d, i);
1073           struct variable *mv = dict_lookup_var (mtf.dict, v->name);
1074           if (mv != NULL)
1075             set_master (v, mv);
1076         }
1077     }
1078
1079   /* Add IN variables to master dictionary. */
1080   for (iter = mtf.head; iter != NULL; iter = iter->next) 
1081     if (iter->in_name != NULL)
1082       {
1083         iter->in_var = dict_create_var (mtf.dict, iter->in_name, 0);
1084         if (iter->in_var == NULL)
1085           {
1086             msg (SE, _("IN variable name %s duplicates an "
1087                        "existing variable name."),
1088                  iter->in_var->name);
1089             goto error;
1090           }
1091         iter->in_var->print = iter->in_var->write
1092           = fmt_for_output (FMT_F, 1, 0);
1093       }
1094     
1095   /* MATCH FILES performs an n-way merge on all its input files.
1096      Abstract algorithm:
1097
1098      1. Read one input record from every input FILE.
1099
1100      2. If no FILEs are left, stop.  Otherwise, proceed to step 3.
1101
1102      3. Find the FILE input record(s) that have minimum BY
1103      values.  Store all the values from these input records into
1104      the output record.
1105
1106      4. For every TABLE, read another record as long as the BY values
1107      on the TABLE's input record are less than the FILEs' BY values.
1108      If an exact match is found, store all the values from the TABLE
1109      input record into the output record.
1110
1111      5. Write the output record.
1112
1113      6. Read another record from each input file FILE and TABLE that
1114      we stored values from above.  If we come to the end of one of the
1115      input files, remove it from the list of input files.
1116
1117      7. Repeat from step 2.
1118
1119      Unfortunately, this algorithm can't be implemented in a
1120      straightforward way because there's no function to read a
1121      record from the active file.  Instead, it has to be written
1122      as a state machine.
1123
1124      FIXME: For merging large numbers of files (more than 10?) a
1125      better algorithm would use a heap for finding minimum
1126      values. */
1127
1128   if (!used_active_file)
1129     discard_variables (ds);
1130
1131   dict_compact_values (mtf.dict);
1132   mtf.output = fastfile_create (dict_get_next_value_idx (mtf.dict));
1133   mtf.seq_nums = xcalloc (dict_get_var_cnt (mtf.dict), sizeof *mtf.seq_nums);
1134   case_create (&mtf.mtf_case, dict_get_next_value_idx (mtf.dict));
1135
1136   if (!mtf_read_nonactive_records (&mtf))
1137     goto error;
1138
1139   if (used_active_file) 
1140     {
1141       proc_set_sink (ds, 
1142                      create_case_sink (&null_sink_class, 
1143                                        dataset_dict (ds), NULL));
1144       ok = 
1145         ( procedure (ds, mtf_processing, &mtf) && 
1146           mtf_processing_finish (&mtf, ds) ); 
1147     }
1148   else
1149     ok = mtf_processing_finish (&mtf, ds);
1150
1151   discard_variables (ds);
1152
1153   dict_destroy (dataset_dict (ds));
1154   dataset_set_dict (ds, mtf.dict);
1155   mtf.dict = NULL;
1156   proc_set_source (ds, storage_source_create (mtf.output));
1157   mtf.output = NULL;
1158   
1159   if (!mtf_free (&mtf))
1160     ok = false;
1161   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
1162   
1163  error:
1164   mtf_free (&mtf);
1165   return CMD_CASCADING_FAILURE;
1166 }
1167
1168 /* Repeats 2...7 an arbitrary number of times. */
1169 static bool
1170 mtf_processing_finish (void *mtf_, const struct dataset *ds)
1171 {
1172   struct mtf_proc *mtf = mtf_;
1173   struct mtf_file *iter;
1174
1175   /* Find the active file and delete it. */
1176   for (iter = mtf->head; iter; iter = iter->next)
1177     if (iter->handle == NULL)
1178       {
1179         if (!mtf_delete_file_in_place (mtf, &iter))
1180           NOT_REACHED ();
1181         break;
1182       }
1183   
1184   while (mtf->head && mtf->head->type == MTF_FILE)
1185     if (!mtf_processing (NULL, mtf, ds))
1186       return false;
1187
1188   return true;
1189 }
1190
1191 /* Return a string in a static buffer describing V's variable type and
1192    width. */
1193 static char *
1194 var_type_description (struct variable *v)
1195 {
1196   static char buf[2][32];
1197   static int x = 0;
1198   char *s;
1199
1200   x ^= 1;
1201   s = buf[x];
1202
1203   if (v->type == NUMERIC)
1204     strcpy (s, "numeric");
1205   else
1206     {
1207       assert (v->type == ALPHA);
1208       sprintf (s, "string with width %d", v->width);
1209     }
1210   return s;
1211 }
1212
1213 /* Closes FILE and frees its associated data.
1214    Returns true if successful, false if an I/O error
1215    occurred on FILE. */
1216 static bool
1217 mtf_close_file (struct mtf_file *file)
1218 {
1219   bool ok = file->reader == NULL || !any_reader_error (file->reader);
1220   free (file->by);
1221   any_reader_close (file->reader);
1222   if (file->handle != NULL)
1223     dict_destroy (file->dict);
1224   case_destroy (&file->input);
1225   free (file->in_name);
1226   free (file);
1227   return ok;
1228 }
1229
1230 /* Free all the data for the MATCH FILES procedure.
1231    Returns true if successful, false if an I/O error
1232    occurred. */
1233 static bool
1234 mtf_free (struct mtf_proc *mtf)
1235 {
1236   struct mtf_file *iter, *next;
1237   bool ok = true;
1238
1239   for (iter = mtf->head; iter; iter = next)
1240     {
1241       next = iter->next;
1242       assert (iter->dict != mtf->dict);
1243       if (!mtf_close_file (iter))
1244         ok = false;
1245     }
1246   
1247   if (mtf->dict)
1248     dict_destroy (mtf->dict);
1249   case_destroy (&mtf->mtf_case);
1250   free (mtf->seq_nums);
1251
1252   return ok;
1253 }
1254
1255 /* Remove *FILE from the mtf_file chain.  Make *FILE point to the next
1256    file in the chain, or to NULL if was the last in the chain.
1257    Returns true if successful, false if an I/O error occurred. */
1258 static bool
1259 mtf_delete_file_in_place (struct mtf_proc *mtf, struct mtf_file **file)
1260 {
1261   struct mtf_file *f = *file;
1262   int i;
1263
1264   if (f->prev)
1265     f->prev->next = f->next;
1266   if (f->next)
1267     f->next->prev = f->prev;
1268   if (f == mtf->head)
1269     mtf->head = f->next;
1270   if (f == mtf->tail)
1271     mtf->tail = f->prev;
1272   *file = f->next;
1273
1274   if (f->in_var != NULL)
1275     case_data_rw (&mtf->mtf_case, f->in_var->fv)->f = 0.;
1276   for (i = 0; i < dict_get_var_cnt (f->dict); i++)
1277     {
1278       struct variable *v = dict_get_var (f->dict, i);
1279       struct variable *mv = get_master (v);
1280       if (mv != NULL) 
1281         {
1282           union value *out = case_data_rw (&mtf->mtf_case, mv->fv);
1283           
1284           if (v->type == NUMERIC)
1285             out->f = SYSMIS;
1286           else
1287             memset (out->s, ' ', v->width);
1288         } 
1289     }
1290
1291   return mtf_close_file (f);
1292 }
1293
1294 /* Read a record from every input file except the active file.
1295    Returns true if successful, false if an I/O error occurred. */
1296 static bool
1297 mtf_read_nonactive_records (void *mtf_)
1298 {
1299   struct mtf_proc *mtf = mtf_;
1300   struct mtf_file *iter, *next;
1301   bool ok = true;
1302
1303   for (iter = mtf->head; ok && iter != NULL; iter = next)
1304     {
1305       next = iter->next;
1306       if (iter->handle && !any_reader_read (iter->reader, &iter->input)) 
1307         if (!mtf_delete_file_in_place (mtf, &iter))
1308           ok = false;
1309     }
1310   return ok;
1311 }
1312
1313 /* Compare the BY variables for files A and B; return -1 if A < B, 0
1314    if A == B, 1 if A > B. */
1315 static inline int
1316 mtf_compare_BY_values (struct mtf_proc *mtf,
1317                        struct mtf_file *a, struct mtf_file *b,
1318                        const struct ccase *c)
1319 {
1320   const struct ccase *ca = case_is_null (&a->input) ? c : &a->input;
1321   const struct ccase *cb = case_is_null (&b->input) ? c : &b->input;
1322   assert ((a == NULL) + (b == NULL) + (c == NULL) <= 1);
1323   return case_compare_2dict (ca, cb, a->by, b->by, mtf->by_cnt);
1324 }
1325
1326 /* Perform one iteration of steps 3...7 above.
1327    Returns true if successful, false if an I/O error occurred. */
1328 static bool
1329 mtf_processing (const struct ccase *c, void *mtf_, const struct dataset *ds UNUSED)
1330 {
1331   struct mtf_proc *mtf = mtf_;
1332
1333   /* Do we need another record from the active file? */
1334   bool read_active_file;
1335
1336   assert (mtf->head != NULL);
1337   if (mtf->head->type == MTF_TABLE)
1338     return true;
1339   
1340   do
1341     {
1342       struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
1343       struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
1344       struct mtf_file *iter, *next;
1345
1346       read_active_file = false;
1347       
1348       /* 3. Find the FILE input record(s) that have minimum BY
1349          values.  Store all the values from these input records into
1350          the output record. */
1351       min_head = min_tail = mtf->head;
1352       max_head = max_tail = NULL;
1353       for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
1354            iter = iter->next) 
1355         {
1356           int cmp = mtf_compare_BY_values (mtf, min_head, iter, c);
1357           if (cmp < 0) 
1358             {
1359               if (max_head)
1360                 max_tail = max_tail->next_min = iter;
1361               else
1362                 max_head = max_tail = iter;
1363             }
1364           else if (cmp == 0) 
1365             min_tail = min_tail->next_min = iter;
1366           else /* cmp > 0 */
1367             {
1368               if (max_head)
1369                 {
1370                   max_tail->next_min = min_head;
1371                   max_tail = min_tail;
1372                 }
1373               else
1374                 {
1375                   max_head = min_head;
1376                   max_tail = min_tail;
1377                 }
1378               min_head = min_tail = iter;
1379             }
1380         }
1381       
1382       /* 4. For every TABLE, read another record as long as the BY
1383          values on the TABLE's input record are less than the FILEs'
1384          BY values.  If an exact match is found, store all the values
1385          from the TABLE input record into the output record. */
1386       for (; iter != NULL; iter = next)
1387         {
1388           assert (iter->type == MTF_TABLE);
1389       
1390           next = iter->next;
1391           for (;;) 
1392             {
1393               int cmp = mtf_compare_BY_values (mtf, min_head, iter, c);
1394               if (cmp < 0) 
1395                 {
1396                   if (max_head)
1397                     max_tail = max_tail->next_min = iter;
1398                   else
1399                     max_head = max_tail = iter;
1400                 }
1401               else if (cmp == 0)
1402                 min_tail = min_tail->next_min = iter;
1403               else /* cmp > 0 */
1404                 {
1405                   if (iter->handle == NULL)
1406                     return true;
1407                   if (any_reader_read (iter->reader, &iter->input))
1408                     continue;
1409                   if (!mtf_delete_file_in_place (mtf, &iter))
1410                     return false;
1411                 }
1412               break;
1413             }
1414         }
1415
1416       /* Next sequence number. */
1417       mtf->seq_num++;
1418
1419       /* Store data to all the records we are using. */
1420       if (min_tail)
1421         min_tail->next_min = NULL;
1422       for (iter = min_head; iter; iter = iter->next_min)
1423         {
1424           int i;
1425
1426           for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1427             {
1428               struct variable *v = dict_get_var (iter->dict, i);
1429               struct variable *mv = get_master (v);
1430           
1431               if (mv != NULL && mtf->seq_nums[mv->index] != mtf->seq_num) 
1432                 {
1433                   const struct ccase *record
1434                     = case_is_null (&iter->input) ? c : &iter->input;
1435                   union value *out = case_data_rw (&mtf->mtf_case, mv->fv);
1436
1437                   mtf->seq_nums[mv->index] = mtf->seq_num;
1438                   if (v->type == NUMERIC)
1439                     out->f = case_num (record, v->fv);
1440                   else
1441                     memcpy (out->s, case_str (record, v->fv), v->width);
1442                 } 
1443             }
1444           if (iter->in_var != NULL)
1445             case_data_rw (&mtf->mtf_case, iter->in_var->fv)->f = 1.;
1446
1447           if (iter->type == MTF_FILE && iter->handle == NULL)
1448             read_active_file = true;
1449         }
1450
1451       /* Store missing values to all the records we're not
1452          using. */
1453       if (max_tail)
1454         max_tail->next_min = NULL;
1455       for (iter = max_head; iter; iter = iter->next_min)
1456         {
1457           int i;
1458
1459           for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1460             {
1461               struct variable *v = dict_get_var (iter->dict, i);
1462               struct variable *mv = get_master (v);
1463
1464               if (mv != NULL && mtf->seq_nums[mv->index] != mtf->seq_num) 
1465                 {
1466                   union value *out = case_data_rw (&mtf->mtf_case, mv->fv);
1467                   mtf->seq_nums[mv->index] = mtf->seq_num;
1468
1469                   if (v->type == NUMERIC)
1470                     out->f = SYSMIS;
1471                   else
1472                     memset (out->s, ' ', v->width);
1473                 }
1474             }
1475           if (iter->in_var != NULL)
1476             case_data_rw (&mtf->mtf_case, iter->in_var->fv)->f = 0.;
1477         }
1478
1479       /* 5. Write the output record. */
1480       casefile_append (mtf->output, &mtf->mtf_case);
1481
1482       /* 6. Read another record from each input file FILE and TABLE
1483          that we stored values from above.  If we come to the end of
1484          one of the input files, remove it from the list of input
1485          files. */
1486       for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
1487         {
1488           next = iter->next_min;
1489           if (iter->reader != NULL
1490               && !any_reader_read (iter->reader, &iter->input))
1491             if (!mtf_delete_file_in_place (mtf, &iter))
1492               return false;
1493         }
1494     }
1495   while (!read_active_file
1496          && mtf->head != NULL && mtf->head->type == MTF_FILE);
1497
1498   return true;
1499 }
1500
1501 /* Merge the dictionary for file F into master dictionary M. */
1502 static int
1503 mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
1504 {
1505   struct dictionary *d = f->dict;
1506   const char *d_docs, *m_docs;
1507   int i;
1508
1509   if (dict_get_label (m) == NULL)
1510     dict_set_label (m, dict_get_label (d));
1511
1512   d_docs = dict_get_documents (d);
1513   m_docs = dict_get_documents (m);
1514   if (d_docs != NULL) 
1515     {
1516       if (m_docs == NULL)
1517         dict_set_documents (m, d_docs);
1518       else
1519         {
1520           char *new_docs;
1521           size_t new_len;
1522
1523           new_len = strlen (m_docs) + strlen (d_docs);
1524           new_docs = xmalloc (new_len + 1);
1525           strcpy (new_docs, m_docs);
1526           strcat (new_docs, d_docs);
1527           dict_set_documents (m, new_docs);
1528           free (new_docs);
1529         }
1530     }
1531   
1532   for (i = 0; i < dict_get_var_cnt (d); i++)
1533     {
1534       struct variable *dv = dict_get_var (d, i);
1535       struct variable *mv = dict_lookup_var (m, dv->name);
1536
1537       if (dict_class_from_id (dv->name) == DC_SCRATCH)
1538         continue;
1539
1540       if (mv != NULL)
1541         {
1542           if (mv->width != dv->width) 
1543             {
1544               msg (SE, _("Variable %s in file %s (%s) has different "
1545                          "type or width from the same variable in "
1546                          "earlier file (%s)."),
1547                    dv->name, fh_get_name (f->handle),
1548                    var_type_description (dv), var_type_description (mv));
1549               return 0;
1550             }
1551         
1552           if (dv->width == mv->width)
1553             {
1554               if (val_labs_count (dv->val_labs)
1555                   && !val_labs_count (mv->val_labs)) 
1556                 {
1557                   val_labs_destroy (mv->val_labs);
1558                   mv->val_labs = val_labs_copy (dv->val_labs); 
1559                 }
1560               if (!mv_is_empty (&dv->miss) && mv_is_empty (&mv->miss))
1561                 mv_copy (&mv->miss, &dv->miss);
1562             }
1563
1564           if (dv->label && !mv->label)
1565             mv->label = xstrdup (dv->label);
1566         }
1567       else
1568         mv = dict_clone_var_assert (m, dv, dv->name);
1569     }
1570
1571   return 1;
1572 }
1573
1574 /* Marks V's master variable as MASTER. */
1575 static void
1576 set_master (struct variable *v, struct variable *master) 
1577 {
1578   var_attach_aux (v, master, NULL);
1579 }
1580
1581 /* Returns the master variable corresponding to V,
1582    as set with set_master(). */
1583 static struct variable *
1584 get_master (struct variable *v) 
1585 {
1586   return v->aux;
1587 }
1588 \f
1589
1590 \f
1591 /* Case map.
1592
1593    A case map copies data from a case that corresponds for one
1594    dictionary to a case that corresponds to a second dictionary
1595    derived from the first by, optionally, deleting, reordering,
1596    or renaming variables.  (No new variables may be created.)
1597    */
1598
1599 /* A case map. */
1600 struct case_map
1601   {
1602     size_t value_cnt;   /* Number of values in map. */
1603     int *map;           /* For each destination index, the
1604                            corresponding source index. */
1605   };
1606
1607 /* Prepares dictionary D for producing a case map.  Afterward,
1608    the caller may delete, reorder, or rename variables within D
1609    at will before using finish_case_map() to produce the case
1610    map.
1611
1612    Uses D's aux members, which must otherwise not be in use. */
1613 static void
1614 start_case_map (struct dictionary *d) 
1615 {
1616   size_t var_cnt = dict_get_var_cnt (d);
1617   size_t i;
1618   
1619   for (i = 0; i < var_cnt; i++)
1620     {
1621       struct variable *v = dict_get_var (d, i);
1622       int *src_fv = xmalloc (sizeof *src_fv);
1623       *src_fv = v->fv;
1624       var_attach_aux (v, src_fv, var_dtor_free);
1625     }
1626 }
1627
1628 /* Produces a case map from dictionary D, which must have been
1629    previously prepared with start_case_map().
1630
1631    Does not retain any reference to D, and clears the aux members
1632    set up by start_case_map().
1633
1634    Returns the new case map, or a null pointer if no mapping is
1635    required (that is, no data has changed position). */
1636 static struct case_map *
1637 finish_case_map (struct dictionary *d) 
1638 {
1639   struct case_map *map;
1640   size_t var_cnt = dict_get_var_cnt (d);
1641   size_t i;
1642   int identity_map;
1643
1644   map = xmalloc (sizeof *map);
1645   map->value_cnt = dict_get_next_value_idx (d);
1646   map->map = xnmalloc (map->value_cnt, sizeof *map->map);
1647   for (i = 0; i < map->value_cnt; i++)
1648     map->map[i] = -1;
1649
1650   identity_map = 1;
1651   for (i = 0; i < var_cnt; i++) 
1652     {
1653       struct variable *v = dict_get_var (d, i);
1654       int *src_fv = (int *) var_detach_aux (v);
1655       size_t idx;
1656
1657       if (v->fv != *src_fv)
1658         identity_map = 0;
1659       
1660       for (idx = 0; idx < v->nv; idx++)
1661         {
1662           int src_idx = *src_fv + idx;
1663           int dst_idx = v->fv + idx;
1664           
1665           assert (map->map[dst_idx] == -1);
1666           map->map[dst_idx] = src_idx;
1667         }
1668       free (src_fv);
1669     }
1670
1671   if (identity_map) 
1672     {
1673       destroy_case_map (map);
1674       return NULL;
1675     }
1676
1677   while (map->value_cnt > 0 && map->map[map->value_cnt - 1] == -1)
1678     map->value_cnt--;
1679
1680   return map;
1681 }
1682
1683 /* Maps from SRC to DST, applying case map MAP. */
1684 static void
1685 map_case (const struct case_map *map,
1686           const struct ccase *src, struct ccase *dst) 
1687 {
1688   size_t dst_idx;
1689
1690   assert (map != NULL);
1691   assert (src != NULL);
1692   assert (dst != NULL);
1693   assert (src != dst);
1694
1695   for (dst_idx = 0; dst_idx < map->value_cnt; dst_idx++)
1696     {
1697       int src_idx = map->map[dst_idx];
1698       if (src_idx != -1)
1699         *case_data_rw (dst, dst_idx) = *case_data (src, src_idx);
1700     }
1701 }
1702
1703 /* Destroys case map MAP. */
1704 static void
1705 destroy_case_map (struct case_map *map) 
1706 {
1707   if (map != NULL) 
1708     {
1709       free (map->map);
1710       free (map);
1711     }
1712 }