425e8c512031b8645ad6f7d4d80043b7081a5e11
[pspp-builds.git] / src / language / data-io / get.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-reader.h>
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case-source.h>
27 #include <data/case.h>
28 #include <data/casefile.h>
29 #include <data/fastfile.h>
30 #include <data/format.h>
31 #include <data/dictionary.h>
32 #include <data/por-file-writer.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/sys-file-writer.h>
37 #include <data/transformations.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/command.h>
41 #include <language/data-io/file-handle.h>
42 #include <language/lexer/lexer.h>
43 #include <language/lexer/variable-parser.h>
44 #include <libpspp/alloc.h>
45 #include <libpspp/assertion.h>
46 #include <libpspp/compiler.h>
47 #include <libpspp/hash.h>
48 #include <libpspp/message.h>
49 #include <libpspp/message.h>
50 #include <libpspp/misc.h>
51 #include <libpspp/str.h>
52
53 #include "gettext.h"
54 #define _(msgid) gettext (msgid)
55
56 /* Rearranging and reducing a dictionary. */
57 static void start_case_map (struct dictionary *);
58 static struct case_map *finish_case_map (struct dictionary *);
59 static void map_case (const struct case_map *,
60                       const struct ccase *, struct ccase *);
61 static void destroy_case_map (struct case_map *);
62
63 static bool parse_dict_trim (struct lexer *, struct dictionary *);
64 \f
65 /* Reading system and portable files. */
66
67 /* Type of command. */
68 enum reader_command 
69   {
70     GET_CMD,
71     IMPORT_CMD
72   };
73
74 /* Case reader input program. */
75 struct case_reader_pgm 
76   {
77     struct any_reader *reader;  /* File reader. */
78     struct case_map *map;       /* Map from file dict to active file dict. */
79     struct ccase bounce;        /* Bounce buffer. */
80   };
81
82 static const struct case_source_class case_reader_source_class;
83
84 static void case_reader_pgm_free (struct case_reader_pgm *);
85
86 /* Parses a GET or IMPORT command. */
87 static int
88 parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command type)
89 {
90   struct case_reader_pgm *pgm = NULL;
91   struct file_handle *fh = NULL;
92   struct dictionary *dict = NULL;
93
94   for (;;)
95     {
96       lex_match (lexer, '/');
97
98       if (lex_match_id (lexer, "FILE") || lex_token (lexer) == T_STRING)
99         {
100           lex_match (lexer, '=');
101
102           fh = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
103           if (fh == NULL)
104             goto error;
105         }
106       else if (type == IMPORT_CMD && lex_match_id (lexer, "TYPE"))
107         {
108           lex_match (lexer, '=');
109
110           if (lex_match_id (lexer, "COMM"))
111             type = PFM_COMM;
112           else if (lex_match_id (lexer, "TAPE"))
113             type = PFM_TAPE;
114           else
115             {
116               lex_error (lexer, _("expecting COMM or TAPE"));
117               goto error;
118             }
119         }
120       else
121         break; 
122     }
123   
124   if (fh == NULL) 
125     {
126       lex_sbc_missing (lexer, "FILE");
127       goto error;
128     }
129               
130   discard_variables (ds);
131
132   pgm = xmalloc (sizeof *pgm);
133   pgm->reader = any_reader_open (fh, &dict);
134   pgm->map = NULL;
135   case_nullify (&pgm->bounce);
136   if (pgm->reader == NULL)
137     goto error;
138
139   case_create (&pgm->bounce, dict_get_next_value_idx (dict));
140
141   start_case_map (dict);
142
143   while (lex_token (lexer) != '.')
144     {
145       lex_match (lexer, '/');
146       if (!parse_dict_trim (lexer, dict))
147         goto error;
148     }
149
150   pgm->map = finish_case_map (dict);
151
152   dataset_set_dict (ds, dict);
153
154   proc_set_source (ds,
155                    create_case_source (&case_reader_source_class, pgm));
156
157   return CMD_SUCCESS;
158
159  error:
160   case_reader_pgm_free (pgm);
161   if (dict != NULL)
162     dict_destroy (dict);
163   return CMD_CASCADING_FAILURE;
164 }
165
166 /* Frees a struct case_reader_pgm. */
167 static void
168 case_reader_pgm_free (struct case_reader_pgm *pgm) 
169 {
170   if (pgm != NULL) 
171     {
172       any_reader_close (pgm->reader);
173       destroy_case_map (pgm->map);
174       case_destroy (&pgm->bounce);
175       free (pgm);
176     }
177 }
178
179 /* Reads one case into C.
180    Returns true if successful, false at end of file or if an
181    I/O error occurred. */
182 static bool
183 case_reader_source_read (struct case_source *source, struct ccase *c)
184 {
185   struct case_reader_pgm *pgm = source->aux;
186   if (any_reader_read (pgm->reader, pgm->map == NULL ? c : &pgm->bounce)) 
187     {
188       if (pgm->map != NULL)
189         map_case (pgm->map, &pgm->bounce, c);
190       return true;
191     }
192   else  
193     return false;
194 }
195
196 /* Destroys the source.
197    Returns true if successful read, false if an I/O occurred
198    during destruction or previously. */
199 static bool
200 case_reader_source_destroy (struct case_source *source)
201 {
202   struct case_reader_pgm *pgm = source->aux;
203   bool ok = !any_reader_error (pgm->reader); 
204   case_reader_pgm_free (pgm);
205   return ok;
206 }
207
208 static const struct case_source_class case_reader_source_class =
209   {
210     "case reader",
211     NULL,
212     case_reader_source_read,
213     case_reader_source_destroy,
214   };
215 \f
216 /* GET. */
217 int
218 cmd_get (struct lexer *lexer, struct dataset *ds) 
219 {
220   return parse_read_command (lexer, ds, GET_CMD);
221 }
222
223 /* IMPORT. */
224 int
225 cmd_import (struct lexer *lexer, struct dataset *ds) 
226 {
227   return parse_read_command (lexer, ds, IMPORT_CMD);
228 }
229 \f
230 /* Writing system and portable files. */ 
231
232 /* Type of output file. */
233 enum writer_type
234   {
235     SYSFILE_WRITER,     /* System file. */
236     PORFILE_WRITER      /* Portable file. */
237   };
238
239 /* Type of a command. */
240 enum command_type 
241   {
242     XFORM_CMD,          /* Transformation. */
243     PROC_CMD            /* Procedure. */
244   };
245
246 /* File writer plus a case map. */
247 struct case_writer
248   {
249     struct any_writer *writer;  /* File writer. */
250     struct case_map *map;       /* Map to output file dictionary
251                                    (null pointer for identity mapping). */
252     struct ccase bounce;        /* Bounce buffer for mapping (if needed). */
253   };
254
255 /* Destroys AW. */
256 static bool
257 case_writer_destroy (struct case_writer *aw)
258 {
259   bool ok = true;
260   if (aw != NULL) 
261     {
262       ok = any_writer_close (aw->writer);
263       destroy_case_map (aw->map);
264       case_destroy (&aw->bounce);
265       free (aw);
266     }
267   return ok;
268 }
269
270 /* Parses SAVE or XSAVE or EXPORT or XEXPORT command.
271    WRITER_TYPE identifies the type of file to write,
272    and COMMAND_TYPE identifies the type of command.
273
274    On success, returns a writer.
275    For procedures only, sets *RETAIN_UNSELECTED to true if cases
276    that would otherwise be excluded by FILTER or USE should be
277    included.
278
279    On failure, returns a null pointer. */
280 static struct case_writer *
281 parse_write_command (struct lexer *lexer, struct dataset *ds, 
282                      enum writer_type writer_type,
283                      enum command_type command_type,
284                      bool *retain_unselected)
285 {
286   /* Common data. */
287   struct file_handle *handle; /* Output file. */
288   struct dictionary *dict;    /* Dictionary for output file. */
289   struct case_writer *aw;      /* Writer. */  
290
291   /* Common options. */
292   bool print_map;             /* Print map?  TODO. */
293   bool print_short_names;     /* Print long-to-short name map.  TODO. */
294   struct sfm_write_options sysfile_opts;
295   struct pfm_write_options porfile_opts;
296
297   assert (writer_type == SYSFILE_WRITER || writer_type == PORFILE_WRITER);
298   assert (command_type == XFORM_CMD || command_type == PROC_CMD);
299   assert ((retain_unselected != NULL) == (command_type == PROC_CMD));
300
301   if (command_type == PROC_CMD)
302     *retain_unselected = true;
303
304   handle = NULL;
305   dict = dict_clone (dataset_dict (ds));
306   aw = xmalloc (sizeof *aw);
307   aw->writer = NULL;
308   aw->map = NULL;
309   case_nullify (&aw->bounce);
310   print_map = false;
311   print_short_names = false;
312   sysfile_opts = sfm_writer_default_options ();
313   porfile_opts = pfm_writer_default_options ();
314
315   start_case_map (dict);
316   dict_delete_scratch_vars (dict);
317
318   lex_match (lexer, '/');
319   for (;;)
320     {
321       if (lex_match_id (lexer, "OUTFILE"))
322         {
323           if (handle != NULL) 
324             {
325               lex_sbc_only_once ("OUTFILE");
326               goto error; 
327             }
328           
329           lex_match (lexer, '=');
330       
331           handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
332           if (handle == NULL)
333             goto error;
334         }
335       else if (lex_match_id (lexer, "NAMES"))
336         print_short_names = true;
337       else if (lex_match_id (lexer, "PERMISSIONS")) 
338         {
339           bool cw;
340           
341           lex_match (lexer, '=');
342           if (lex_match_id (lexer, "READONLY"))
343             cw = false;
344           else if (lex_match_id (lexer, "WRITEABLE"))
345             cw = true;
346           else
347             {
348               lex_error (lexer, _("expecting %s or %s"), "READONLY", "WRITEABLE");
349               goto error;
350             }
351           sysfile_opts.create_writeable = porfile_opts.create_writeable = cw;
352         }
353       else if (command_type == PROC_CMD && lex_match_id (lexer, "UNSELECTED")) 
354         {
355           lex_match (lexer, '=');
356           if (lex_match_id (lexer, "RETAIN"))
357             *retain_unselected = true;
358           else if (lex_match_id (lexer, "DELETE"))
359             *retain_unselected = false;
360           else
361             {
362               lex_error (lexer, _("expecting %s or %s"), "RETAIN", "DELETE");
363               goto error;
364             }
365         }
366       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "COMPRESSED"))
367         sysfile_opts.compress = true;
368       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "UNCOMPRESSED"))
369         sysfile_opts.compress = false;
370       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "VERSION"))
371         {
372           lex_match (lexer, '=');
373           if (!lex_force_int (lexer))
374             goto error;
375           sysfile_opts.version = lex_integer (lexer);
376           lex_get (lexer);
377         }
378       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "TYPE")) 
379         {
380           lex_match (lexer, '=');
381           if (lex_match_id (lexer, "COMMUNICATIONS"))
382             porfile_opts.type = PFM_COMM;
383           else if (lex_match_id (lexer, "TAPE"))
384             porfile_opts.type = PFM_TAPE;
385           else
386             {
387               lex_error (lexer, _("expecting %s or %s"), "COMM", "TAPE");
388               goto error;
389             }
390         }
391       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "DIGITS")) 
392         {
393           lex_match (lexer, '=');
394           if (!lex_force_int (lexer))
395             goto error;
396           porfile_opts.digits = lex_integer (lexer);
397           lex_get (lexer);
398         }
399       else if (!parse_dict_trim (lexer, dict))
400         goto error;
401       
402       if (!lex_match (lexer, '/'))
403         break;
404     }
405   if (lex_end_of_command (lexer) != CMD_SUCCESS)
406     goto error;
407
408   if (handle == NULL) 
409     {
410       lex_sbc_missing (lexer, "OUTFILE");
411       goto error;
412     }
413
414   dict_compact_values (dict);
415   aw->map = finish_case_map (dict);
416   if (aw->map != NULL)
417     case_create (&aw->bounce, dict_get_next_value_idx (dict));
418
419   if (fh_get_referent (handle) == FH_REF_FILE) 
420     {
421       switch (writer_type) 
422         {
423         case SYSFILE_WRITER:
424           aw->writer = any_writer_from_sfm_writer (
425             sfm_open_writer (handle, dict, sysfile_opts));
426           break;
427         case PORFILE_WRITER:
428           aw->writer = any_writer_from_pfm_writer (
429             pfm_open_writer (handle, dict, porfile_opts));
430           break;
431         }
432     }
433   else
434     aw->writer = any_writer_open (handle, dict);
435   if (aw->writer == NULL)
436     goto error;
437   dict_destroy (dict);
438   
439   return aw;
440
441  error:
442   case_writer_destroy (aw);
443   dict_destroy (dict);
444   return NULL;
445 }
446
447 /* Writes case C to writer AW. */
448 static bool
449 case_writer_write_case (struct case_writer *aw, const struct ccase *c) 
450 {
451   if (aw->map != NULL) 
452     {
453       map_case (aw->map, c, &aw->bounce);
454       c = &aw->bounce; 
455     }
456   return any_writer_write (aw->writer, c);
457 }
458 \f
459 /* SAVE and EXPORT. */
460
461 /* Parses and performs the SAVE or EXPORT procedure. */
462 static int
463 parse_output_proc (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type)
464 {
465   bool retain_unselected;
466   struct variable *saved_filter_variable;
467   struct case_writer *aw;
468   struct ccase *c;
469   bool ok = true;
470
471   aw = parse_write_command (lexer, ds, writer_type, PROC_CMD, &retain_unselected);
472   if (aw == NULL) 
473     return CMD_CASCADING_FAILURE;
474
475   saved_filter_variable = dict_get_filter (dataset_dict (ds));
476   if (retain_unselected) 
477     dict_set_filter (dataset_dict (ds), NULL);
478
479   proc_open (ds);
480   while (ok && proc_read (ds, &c))
481     ok = case_writer_write_case (aw, c);
482   ok = proc_close (ds) && ok;
483
484   dict_set_filter (dataset_dict (ds), saved_filter_variable);
485
486   case_writer_destroy (aw);
487   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
488 }
489
490 int
491 cmd_save (struct lexer *lexer, struct dataset *ds) 
492 {
493   return parse_output_proc (lexer, ds, SYSFILE_WRITER);
494 }
495
496 int
497 cmd_export (struct lexer *lexer, struct dataset *ds) 
498 {
499   return parse_output_proc (lexer, ds, PORFILE_WRITER);
500 }
501 \f
502 /* XSAVE and XEXPORT. */
503
504 /* Transformation. */
505 struct output_trns 
506   {
507     struct case_writer *aw;      /* Writer. */
508   };
509
510 static trns_proc_func output_trns_proc;
511 static trns_free_func output_trns_free;
512
513 /* Parses the XSAVE or XEXPORT transformation command. */
514 static int
515 parse_output_trns (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type) 
516 {
517   struct output_trns *t = xmalloc (sizeof *t);
518   t->aw = parse_write_command (lexer, ds, writer_type, XFORM_CMD, NULL);
519   if (t->aw == NULL) 
520     {
521       free (t);
522       return CMD_CASCADING_FAILURE;
523     }
524
525   add_transformation (ds, output_trns_proc, output_trns_free, t);
526   return CMD_SUCCESS;
527 }
528
529 /* Writes case C to the system file specified on XSAVE or XEXPORT. */
530 static int
531 output_trns_proc (void *trns_, struct ccase *c, casenumber case_num UNUSED)
532 {
533   struct output_trns *t = trns_;
534   case_writer_write_case (t->aw, c);
535   return TRNS_CONTINUE;
536 }
537
538 /* Frees an XSAVE or XEXPORT transformation.
539    Returns true if successful, false if an I/O error occurred. */
540 static bool
541 output_trns_free (void *trns_)
542 {
543   struct output_trns *t = trns_;
544   bool ok = true;
545
546   if (t != NULL)
547     {
548       ok = case_writer_destroy (t->aw);
549       free (t);
550     }
551   return ok;
552 }
553
554 /* XSAVE command. */
555 int
556 cmd_xsave (struct lexer *lexer, struct dataset *ds) 
557 {
558   return parse_output_trns (lexer, ds, SYSFILE_WRITER);
559 }
560
561 /* XEXPORT command. */
562 int
563 cmd_xexport (struct lexer *lexer, struct dataset *ds) 
564 {
565   return parse_output_trns (lexer, ds, PORFILE_WRITER);
566 }
567 \f
568 static bool rename_variables (struct lexer *lexer, struct dictionary *dict);
569 static bool drop_variables (struct lexer *, struct dictionary *dict);
570 static bool keep_variables (struct lexer *, struct dictionary *dict);
571
572 /* Commands that read and write system files share a great deal
573    of common syntactic structure for rearranging and dropping
574    variables.  This function parses this syntax and modifies DICT
575    appropriately.  Returns true on success, false on failure. */
576 static bool
577 parse_dict_trim (struct lexer *lexer, struct dictionary *dict)
578 {
579   if (lex_match_id (lexer, "MAP")) 
580     {
581       /* FIXME. */
582       return true;
583     }
584   else if (lex_match_id (lexer, "DROP"))
585     return drop_variables (lexer, dict);
586   else if (lex_match_id (lexer, "KEEP"))
587     return keep_variables (lexer, dict);
588   else if (lex_match_id (lexer, "RENAME"))
589     return rename_variables (lexer, dict);
590   else
591     {
592       lex_error (lexer, _("expecting a valid subcommand"));
593       return false;
594     }
595 }
596
597 /* Parses and performs the RENAME subcommand of GET and SAVE. */
598 static bool
599 rename_variables (struct lexer *lexer, struct dictionary *dict)
600 {
601   size_t i;
602
603   int success = 0;
604
605   struct variable **v;
606   char **new_names;
607   size_t nv, nn;
608   char *err_name;
609
610   int group;
611
612   lex_match (lexer, '=');
613   if (lex_token (lexer) != '(')
614     {
615       struct variable *v;
616
617       v = parse_variable (lexer, dict);
618       if (v == NULL)
619         return 0;
620       if (!lex_force_match (lexer, '=')
621           || !lex_force_id (lexer))
622         return 0;
623       if (dict_lookup_var (dict, lex_tokid (lexer)) != NULL)
624         {
625           msg (SE, _("Cannot rename %s as %s because there already exists "
626                      "a variable named %s.  To rename variables with "
627                      "overlapping names, use a single RENAME subcommand "
628                      "such as \"/RENAME (A=B)(B=C)(C=A)\", or equivalently, "
629                      "\"/RENAME (A B C=B C A)\"."),
630                var_get_name (v), lex_tokid (lexer), lex_tokid (lexer));
631           return 0;
632         }
633       
634       dict_rename_var (dict, v, lex_tokid (lexer));
635       lex_get (lexer);
636       return 1;
637     }
638
639   nv = nn = 0;
640   v = NULL;
641   new_names = 0;
642   group = 1;
643   while (lex_match (lexer, '('))
644     {
645       size_t old_nv = nv;
646
647       if (!parse_variables (lexer, dict, &v, &nv, PV_NO_DUPLICATE | PV_APPEND))
648         goto done;
649       if (!lex_match (lexer, '='))
650         {
651           msg (SE, _("`=' expected after variable list."));
652           goto done;
653         }
654       if (!parse_DATA_LIST_vars (lexer, &new_names, &nn, PV_APPEND | PV_NO_SCRATCH))
655         goto done;
656       if (nn != nv)
657         {
658           msg (SE, _("Number of variables on left side of `=' (%d) does not "
659                      "match number of variables on right side (%d), in "
660                      "parenthesized group %d of RENAME subcommand."),
661                (unsigned) (nv - old_nv), (unsigned) (nn - old_nv), group);
662           goto done;
663         }
664       if (!lex_force_match (lexer, ')'))
665         goto done;
666       group++;
667     }
668
669   if (!dict_rename_vars (dict, v, new_names, nv, &err_name)) 
670     {
671       msg (SE, _("Requested renaming duplicates variable name %s."), err_name);
672       goto done;
673     }
674   success = 1;
675
676  done:
677   for (i = 0; i < nn; i++)
678     free (new_names[i]);
679   free (new_names);
680   free (v);
681
682   return success;
683 }
684
685 /* Parses and performs the DROP subcommand of GET and SAVE.
686    Returns true if successful, false on failure.*/
687 static bool
688 drop_variables (struct lexer *lexer, struct dictionary *dict)
689 {
690   struct variable **v;
691   size_t nv;
692
693   lex_match (lexer, '=');
694   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
695     return false;
696   dict_delete_vars (dict, v, nv);
697   free (v);
698
699   if (dict_get_var_cnt (dict) == 0)
700     {
701       msg (SE, _("Cannot DROP all variables from dictionary."));
702       return false;
703     }
704   return true;
705 }
706
707 /* Parses and performs the KEEP subcommand of GET and SAVE.
708    Returns true if successful, false on failure.*/
709 static bool
710 keep_variables (struct lexer *lexer, struct dictionary *dict)
711 {
712   struct variable **v;
713   size_t nv;
714   size_t i;
715
716   lex_match (lexer, '=');
717   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
718     return false;
719
720   /* Move the specified variables to the beginning. */
721   dict_reorder_vars (dict, v, nv);
722           
723   /* Delete the remaining variables. */
724   v = xnrealloc (v, dict_get_var_cnt (dict) - nv, sizeof *v);
725   for (i = nv; i < dict_get_var_cnt (dict); i++)
726     v[i - nv] = dict_get_var (dict, i);
727   dict_delete_vars (dict, v, dict_get_var_cnt (dict) - nv);
728   free (v);
729
730   return true;
731 }
732 \f
733 /* MATCH FILES. */
734
735 /* File types. */
736 enum
737   {
738     MTF_FILE,                   /* Specified on FILE= subcommand. */
739     MTF_TABLE                   /* Specified on TABLE= subcommand. */
740   };
741
742 /* One of the files on MATCH FILES. */
743 struct mtf_file
744   {
745     struct mtf_file *next, *prev; /* Next, previous in the list of files. */
746     struct mtf_file *next_min;  /* Next in the chain of minimums. */
747     
748     int type;                   /* One of MTF_*. */
749     struct variable **by;       /* List of BY variables for this file. */
750     struct file_handle *handle; /* File handle. */
751     struct any_reader *reader;  /* File reader. */
752     struct dictionary *dict;    /* Dictionary from system file. */
753
754     /* IN subcommand. */
755     char *in_name;              /* Variable name. */
756     struct variable *in_var;    /* Variable (in master dictionary). */
757
758     struct ccase input_storage; /* Input record storage. */
759     struct ccase *input;        /* Input record. */
760   };
761
762 /* MATCH FILES procedure. */
763 struct mtf_proc 
764   {
765     struct mtf_file *head;      /* First file mentioned on FILE or TABLE. */
766     struct mtf_file *tail;      /* Last file mentioned on FILE or TABLE. */
767
768     bool ok;                    /* False if I/O error occurs. */
769
770     size_t by_cnt;              /* Number of variables on BY subcommand. */
771
772     /* Names of FIRST, LAST variables. */
773     char first[LONG_NAME_LEN + 1], last[LONG_NAME_LEN + 1];
774     
775     struct dictionary *dict;    /* Dictionary of output file. */
776     struct casefile *output;    /* MATCH FILES output. */
777     struct ccase mtf_case;      /* Case used for output. */
778
779     unsigned seq_num;           /* Have we initialized this variable? */
780     unsigned *seq_nums;         /* Sequence numbers for each var in dict. */
781   };
782
783 static bool mtf_free (struct mtf_proc *);
784 static bool mtf_close_file (struct mtf_file *);
785 static int mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
786 static bool mtf_read_records (struct mtf_proc *, struct dataset *);
787 static bool mtf_delete_file_in_place (struct mtf_proc *, struct mtf_file **);
788
789 static bool mtf_processing (struct mtf_proc *, struct dataset *);
790
791 static char *var_type_description (struct variable *);
792
793 static void set_master (struct variable *, struct variable *master);
794 static struct variable *get_master (struct variable *);
795
796 /* Parse and execute the MATCH FILES command. */
797 int
798 cmd_match_files (struct lexer *lexer, struct dataset *ds)
799 {
800   struct mtf_proc mtf;
801   struct mtf_file *first_table = NULL;
802   struct mtf_file *iter;
803   
804   bool used_active_file = false;
805   bool saw_table = false;
806   bool saw_in = false;
807
808   mtf.head = mtf.tail = NULL;
809   mtf.by_cnt = 0;
810   mtf.first[0] = '\0';
811   mtf.last[0] = '\0';
812   mtf.dict = dict_create ();
813   mtf.output = NULL;
814   case_nullify (&mtf.mtf_case);
815   mtf.seq_num = 0;
816   mtf.seq_nums = NULL;
817   dict_set_case_limit (mtf.dict, dict_get_case_limit (dataset_dict (ds)));
818
819   lex_match (lexer, '/');
820   while (lex_token (lexer) == T_ID
821          && (lex_id_match (ss_cstr ("FILE"), ss_cstr (lex_tokid (lexer)))
822              || lex_id_match (ss_cstr ("TABLE"), ss_cstr (lex_tokid (lexer)))))
823     {
824       struct mtf_file *file = xmalloc (sizeof *file);
825
826       if (lex_match_id (lexer, "FILE"))
827         file->type = MTF_FILE;
828       else if (lex_match_id (lexer, "TABLE"))
829         {
830           file->type = MTF_TABLE;
831           saw_table = true;
832         }
833       else
834         NOT_REACHED ();
835       lex_match (lexer, '=');
836
837       file->by = NULL;
838       file->handle = NULL;
839       file->reader = NULL;
840       file->dict = NULL;
841       file->in_name = NULL;
842       file->in_var = NULL;
843       case_nullify (&file->input_storage);
844       file->input = &file->input_storage;
845
846       /* FILEs go first, then TABLEs. */
847       if (file->type == MTF_TABLE || first_table == NULL)
848         {
849           file->next = NULL;
850           file->prev = mtf.tail;
851           if (mtf.tail)
852             mtf.tail->next = file;
853           mtf.tail = file;
854           if (mtf.head == NULL)
855             mtf.head = file;
856           if (file->type == MTF_TABLE && first_table == NULL)
857             first_table = file;
858         }
859       else 
860         {
861           assert (file->type == MTF_FILE);
862           file->next = first_table;
863           file->prev = first_table->prev;
864           if (first_table->prev)
865             first_table->prev->next = file;
866           else
867             mtf.head = file;
868           first_table->prev = file;
869         }
870
871       if (lex_match (lexer, '*'))
872         {
873           file->handle = NULL;
874           file->reader = NULL;
875               
876           if (used_active_file)
877             {
878               msg (SE, _("The active file may not be specified more "
879                          "than once."));
880               goto error;
881             }
882           used_active_file = true;
883
884           if (!proc_has_source (ds))
885             {
886               msg (SE, _("Cannot specify the active file since no active "
887                          "file has been defined."));
888               goto error;
889             }
890
891           if (proc_make_temporary_transformations_permanent (ds))
892             msg (SE,
893                  _("MATCH FILES may not be used after TEMPORARY when "
894                    "the active file is an input source.  "
895                    "Temporary transformations will be made permanent."));
896
897           file->dict = dataset_dict (ds);
898         }
899       else
900         {
901           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
902           if (file->handle == NULL)
903             goto error;
904
905           file->reader = any_reader_open (file->handle, &file->dict);
906           if (file->reader == NULL)
907             goto error;
908
909           case_create (&file->input_storage,
910                        dict_get_next_value_idx (file->dict));
911         }
912
913       while (lex_match (lexer, '/'))
914         if (lex_match_id (lexer, "RENAME")) 
915           {
916             if (!rename_variables (lexer, file->dict))
917               goto error; 
918           }
919         else if (lex_match_id (lexer, "IN"))
920           {
921             lex_match (lexer, '=');
922             if (lex_token (lexer) != T_ID)
923               {
924                 lex_error (lexer, NULL);
925                 goto error;
926               }
927
928             if (file->in_name != NULL)
929               {
930                 msg (SE, _("Multiple IN subcommands for a single FILE or "
931                            "TABLE."));
932                 goto error;
933               }
934             file->in_name = xstrdup (lex_tokid (lexer));
935             lex_get (lexer);
936             saw_in = true;
937           }
938
939       mtf_merge_dictionary (mtf.dict, file);
940     }
941   
942   while (lex_token (lexer) != '.')
943     {
944       if (lex_match (lexer, T_BY))
945         {
946           struct variable **by;
947           
948           if (mtf.by_cnt)
949             {
950               msg (SE, _("BY may appear at most once."));
951               goto error;
952             }
953               
954           lex_match (lexer, '=');
955           if (!parse_variables (lexer, mtf.dict, &by, &mtf.by_cnt,
956                                 PV_NO_DUPLICATE | PV_NO_SCRATCH))
957             goto error;
958
959           for (iter = mtf.head; iter != NULL; iter = iter->next)
960             {
961               size_t i;
962           
963               iter->by = xnmalloc (mtf.by_cnt, sizeof *iter->by);
964
965               for (i = 0; i < mtf.by_cnt; i++)
966                 {
967                   iter->by[i] = dict_lookup_var (iter->dict,
968                                                  var_get_name (by[i]));
969                   if (iter->by[i] == NULL)
970                     {
971                       msg (SE, _("File %s lacks BY variable %s."),
972                            iter->handle ? fh_get_name (iter->handle) : "*",
973                            var_get_name (by[i]));
974                       free (by);
975                       goto error;
976                     }
977                 }
978             }
979           free (by);
980         }
981       else if (lex_match_id (lexer, "FIRST")) 
982         {
983           if (mtf.first[0] != '\0')
984             {
985               msg (SE, _("FIRST may appear at most once."));
986               goto error;
987             }
988               
989           lex_match (lexer, '=');
990           if (!lex_force_id (lexer))
991             goto error;
992           strcpy (mtf.first, lex_tokid (lexer));
993           lex_get (lexer);
994         }
995       else if (lex_match_id (lexer, "LAST")) 
996         {
997           if (mtf.last[0] != '\0')
998             {
999               msg (SE, _("LAST may appear at most once."));
1000               goto error;
1001             }
1002               
1003           lex_match (lexer, '=');
1004           if (!lex_force_id (lexer))
1005             goto error;
1006           strcpy (mtf.last, lex_tokid (lexer));
1007           lex_get (lexer);
1008         }
1009       else if (lex_match_id (lexer, "MAP"))
1010         {
1011           /* FIXME. */
1012         }
1013       else if (lex_match_id (lexer, "DROP")) 
1014         {
1015           if (!drop_variables (lexer, mtf.dict))
1016             goto error;
1017         }
1018       else if (lex_match_id (lexer, "KEEP")) 
1019         {
1020           if (!keep_variables (lexer, mtf.dict))
1021             goto error;
1022         }
1023       else
1024         {
1025           lex_error (lexer, NULL);
1026           goto error;
1027         }
1028
1029       if (!lex_match (lexer, '/') && lex_token (lexer) != '.') 
1030         {
1031           lex_end_of_command (lexer);
1032           goto error;
1033         }
1034     }
1035
1036   if (mtf.by_cnt == 0)
1037     {
1038       if (saw_table)
1039         {
1040           msg (SE, _("BY is required when TABLE is specified."));
1041           goto error;
1042         }
1043       if (saw_in)
1044         {
1045           msg (SE, _("BY is required when IN is specified."));
1046           goto error;
1047         }
1048     }
1049
1050   /* Set up mapping from each file's variables to master
1051      variables. */
1052   for (iter = mtf.head; iter != NULL; iter = iter->next)
1053     {
1054       struct dictionary *d = iter->dict;
1055       int i;
1056
1057       for (i = 0; i < dict_get_var_cnt (d); i++)
1058         {
1059           struct variable *v = dict_get_var (d, i);
1060           struct variable *mv = dict_lookup_var (mtf.dict, var_get_name (v));
1061           if (mv != NULL)
1062             set_master (v, mv);
1063         }
1064     }
1065
1066   /* Add IN variables to master dictionary. */
1067   for (iter = mtf.head; iter != NULL; iter = iter->next) 
1068     if (iter->in_name != NULL)
1069       {
1070         struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
1071         iter->in_var = dict_create_var (mtf.dict, iter->in_name, 0);
1072         if (iter->in_var == NULL)
1073           {
1074             msg (SE, _("IN variable name %s duplicates an "
1075                        "existing variable name."),
1076                  var_get_name (iter->in_var));
1077             goto error;
1078           }
1079         var_set_both_formats (iter->in_var, &format);
1080       }
1081     
1082   /* MATCH FILES performs an n-way merge on all its input files.
1083      Abstract algorithm:
1084
1085      1. Read one input record from every input FILE.
1086
1087      2. If no FILEs are left, stop.  Otherwise, proceed to step 3.
1088
1089      3. Find the FILE input record(s) that have minimum BY
1090      values.  Store all the values from these input records into
1091      the output record.
1092
1093      4. For every TABLE, read another record as long as the BY values
1094      on the TABLE's input record are less than the FILEs' BY values.
1095      If an exact match is found, store all the values from the TABLE
1096      input record into the output record.
1097
1098      5. Write the output record.
1099
1100      6. Read another record from each input file FILE and TABLE that
1101      we stored values from above.  If we come to the end of one of the
1102      input files, remove it from the list of input files.
1103
1104      7. Repeat from step 2.
1105
1106      FIXME: For merging large numbers of files (more than 10?) a
1107      better algorithm would use a heap for finding minimum
1108      values. */
1109
1110   if (used_active_file) 
1111     {
1112       proc_set_sink (ds, create_case_sink (&null_sink_class, 
1113                                            dataset_dict (ds),
1114                                            dataset_get_casefile_factory (ds),
1115                                            NULL));
1116       proc_open (ds); 
1117     }
1118   else
1119     discard_variables (ds);
1120
1121   dict_compact_values (mtf.dict);
1122   mtf.output = dataset_get_casefile_factory (ds)->create_casefile
1123     (dataset_get_casefile_factory (ds),
1124      dict_get_next_value_idx (mtf.dict));
1125
1126   mtf.seq_nums = xcalloc (dict_get_var_cnt (mtf.dict), sizeof *mtf.seq_nums);
1127   case_create (&mtf.mtf_case, dict_get_next_value_idx (mtf.dict));
1128
1129   if (!mtf_read_records (&mtf, ds))
1130     goto error;
1131   while (mtf.head && mtf.head->type == MTF_FILE)
1132     if (!mtf_processing (&mtf, ds))
1133       goto error;
1134   if (!proc_close (ds))
1135     goto error;
1136
1137   discard_variables (ds);
1138
1139   dataset_set_dict (ds, mtf.dict);
1140   mtf.dict = NULL;
1141   proc_set_source (ds, storage_source_create (mtf.output));
1142   mtf.output = NULL;
1143
1144   return mtf_free (&mtf) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
1145
1146  error:
1147   proc_close (ds);
1148   mtf_free (&mtf);
1149   return CMD_CASCADING_FAILURE;
1150 }
1151
1152 /* Return a string in a static buffer describing V's variable type and
1153    width. */
1154 static char *
1155 var_type_description (struct variable *v)
1156 {
1157   static char buf[2][32];
1158   static int x = 0;
1159   char *s;
1160
1161   x ^= 1;
1162   s = buf[x];
1163
1164   if (var_is_numeric (v))
1165     strcpy (s, "numeric");
1166   else
1167     sprintf (s, "string with width %d", var_get_width (v));
1168   return s;
1169 }
1170
1171 /* Closes FILE and frees its associated data.
1172    Returns true if successful, false if an I/O error
1173    occurred on FILE. */
1174 static bool
1175 mtf_close_file (struct mtf_file *file)
1176 {
1177   bool ok = file->reader == NULL || !any_reader_error (file->reader);
1178   free (file->by);
1179   any_reader_close (file->reader);
1180   if (file->handle != NULL)
1181     dict_destroy (file->dict);
1182   case_destroy (&file->input_storage);
1183   free (file->in_name);
1184   free (file);
1185   return ok;
1186 }
1187
1188 /* Free all the data for the MATCH FILES procedure.
1189    Returns true if successful, false if an I/O error
1190    occurred. */
1191 static bool
1192 mtf_free (struct mtf_proc *mtf)
1193 {
1194   struct mtf_file *iter, *next;
1195   bool ok = true;
1196
1197   for (iter = mtf->head; iter; iter = next)
1198     {
1199       next = iter->next;
1200       assert (iter->dict != mtf->dict);
1201       if (!mtf_close_file (iter))
1202         ok = false;
1203     }
1204   
1205   if (mtf->dict)
1206     dict_destroy (mtf->dict);
1207   case_destroy (&mtf->mtf_case);
1208   free (mtf->seq_nums);
1209
1210   return ok;
1211 }
1212
1213 /* Remove *FILE from the mtf_file chain.  Make *FILE point to the next
1214    file in the chain, or to NULL if was the last in the chain.
1215    Returns true if successful, false if an I/O error occurred. */
1216 static bool
1217 mtf_delete_file_in_place (struct mtf_proc *mtf, struct mtf_file **file)
1218 {
1219   struct mtf_file *f = *file;
1220   int i;
1221
1222   if (f->prev)
1223     f->prev->next = f->next;
1224   if (f->next)
1225     f->next->prev = f->prev;
1226   if (f == mtf->head)
1227     mtf->head = f->next;
1228   if (f == mtf->tail)
1229     mtf->tail = f->prev;
1230   *file = f->next;
1231
1232   if (f->in_var != NULL)
1233     case_data_rw (&mtf->mtf_case, f->in_var)->f = 0.;
1234   for (i = 0; i < dict_get_var_cnt (f->dict); i++)
1235     {
1236       struct variable *v = dict_get_var (f->dict, i);
1237       struct variable *mv = get_master (v);
1238       if (mv != NULL) 
1239         {
1240           union value *out = case_data_rw (&mtf->mtf_case, mv);
1241           
1242           if (var_is_numeric (v))
1243             out->f = SYSMIS;
1244           else
1245             memset (out->s, ' ', var_get_width (v));
1246         } 
1247     }
1248
1249   return mtf_close_file (f);
1250 }
1251
1252 /* Read a record from every input file.
1253    Returns true if successful, false if an I/O error occurred. */
1254 static bool
1255 mtf_read_records (struct mtf_proc *mtf, struct dataset *ds)
1256 {
1257   struct mtf_file *iter, *next;
1258   bool ok = true;
1259
1260   for (iter = mtf->head; ok && iter != NULL; iter = next)
1261     {
1262       next = iter->next;
1263       if (iter->handle
1264           ? !any_reader_read (iter->reader, iter->input)
1265           : !proc_read (ds, &iter->input)) 
1266         {
1267           if (!mtf_delete_file_in_place (mtf, &iter))
1268             ok = false; 
1269         }
1270     }
1271   return ok;
1272 }
1273
1274 /* Compare the BY variables for files A and B; return -1 if A < B, 0
1275    if A == B, 1 if A > B. */
1276 static inline int
1277 mtf_compare_BY_values (struct mtf_proc *mtf,
1278                        struct mtf_file *a, struct mtf_file *b)
1279 {
1280   return case_compare_2dict (a->input, b->input, a->by, b->by, mtf->by_cnt);
1281 }
1282
1283 /* Perform one iteration of steps 3...7 above.
1284    Returns true if successful, false if an I/O error occurred. */
1285 static bool
1286 mtf_processing (struct mtf_proc *mtf, struct dataset *ds)
1287 {
1288   struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
1289   struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
1290   struct mtf_file *iter, *next;
1291
1292   /* 3. Find the FILE input record(s) that have minimum BY
1293      values.  Store all the values from these input records into
1294      the output record. */
1295   min_head = min_tail = mtf->head;
1296   max_head = max_tail = NULL;
1297   for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
1298        iter = iter->next) 
1299     {
1300       int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1301       if (cmp < 0) 
1302         {
1303           if (max_head)
1304             max_tail = max_tail->next_min = iter;
1305           else
1306             max_head = max_tail = iter;
1307         }
1308       else if (cmp == 0) 
1309         min_tail = min_tail->next_min = iter;
1310       else /* cmp > 0 */
1311         {
1312           if (max_head)
1313             {
1314               max_tail->next_min = min_head;
1315               max_tail = min_tail;
1316             }
1317           else
1318             {
1319               max_head = min_head;
1320               max_tail = min_tail;
1321             }
1322           min_head = min_tail = iter;
1323         }
1324     }
1325       
1326   /* 4. For every TABLE, read another record as long as the BY
1327      values on the TABLE's input record are less than the FILEs'
1328      BY values.  If an exact match is found, store all the values
1329      from the TABLE input record into the output record. */
1330   for (; iter != NULL; iter = next)
1331     {
1332       assert (iter->type == MTF_TABLE);
1333       
1334       next = iter->next;
1335       for (;;) 
1336         {
1337           int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1338           if (cmp < 0) 
1339             {
1340               if (max_head)
1341                 max_tail = max_tail->next_min = iter;
1342               else
1343                 max_head = max_tail = iter;
1344             }
1345           else if (cmp == 0)
1346             min_tail = min_tail->next_min = iter;
1347           else /* cmp > 0 */
1348             {
1349               if (iter->handle
1350                   ? any_reader_read (iter->reader, iter->input)
1351                   : proc_read (ds, &iter->input))
1352                 continue;
1353               if (!mtf_delete_file_in_place (mtf, &iter))
1354                 return false;
1355             }
1356           break;
1357         }
1358     }
1359
1360   /* Next sequence number. */
1361   mtf->seq_num++;
1362
1363   /* Store data to all the records we are using. */
1364   if (min_tail)
1365     min_tail->next_min = NULL;
1366   for (iter = min_head; iter; iter = iter->next_min)
1367     {
1368       int i;
1369
1370       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1371         {
1372           struct variable *v = dict_get_var (iter->dict, i);
1373           struct variable *mv = get_master (v);
1374           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1375           
1376           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1377             {
1378               const struct ccase *record = iter->input;
1379               union value *out = case_data_rw (&mtf->mtf_case, mv);
1380
1381               mtf->seq_nums[mv_index] = mtf->seq_num;
1382               if (var_is_numeric (v))
1383                 out->f = case_num (record, v);
1384               else
1385                 memcpy (out->s, case_str (record, v), var_get_width (v));
1386             } 
1387         }
1388       if (iter->in_var != NULL)
1389         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 1.;
1390     }
1391
1392   /* Store missing values to all the records we're not using. */
1393   if (max_tail)
1394     max_tail->next_min = NULL;
1395   for (iter = max_head; iter; iter = iter->next_min)
1396     {
1397       int i;
1398
1399       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1400         {
1401           struct variable *v = dict_get_var (iter->dict, i);
1402           struct variable *mv = get_master (v);
1403           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1404
1405           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1406             {
1407               union value *out = case_data_rw (&mtf->mtf_case, mv);
1408               mtf->seq_nums[mv_index] = mtf->seq_num;
1409
1410               if (var_is_numeric (v))
1411                 out->f = SYSMIS;
1412               else
1413                 memset (out->s, ' ', var_get_width (v));
1414             }
1415         }
1416       if (iter->in_var != NULL)
1417         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 0.;
1418     }
1419
1420   /* 5. Write the output record. */
1421   casefile_append (mtf->output, &mtf->mtf_case);
1422
1423   /* 6. Read another record from each input file FILE and TABLE
1424      that we stored values from above.  If we come to the end of
1425      one of the input files, remove it from the list of input
1426      files. */
1427   for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
1428     {
1429       next = iter->next_min;
1430       if (iter->reader != NULL
1431           ? !any_reader_read (iter->reader, iter->input)
1432           : !proc_read (ds, &iter->input))
1433         if (!mtf_delete_file_in_place (mtf, &iter))
1434           return false;
1435     }
1436   return true;
1437 }
1438
1439 /* Merge the dictionary for file F into master dictionary M. */
1440 static int
1441 mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
1442 {
1443   struct dictionary *d = f->dict;
1444   const char *d_docs, *m_docs;
1445   int i;
1446
1447   if (dict_get_label (m) == NULL)
1448     dict_set_label (m, dict_get_label (d));
1449
1450   d_docs = dict_get_documents (d);
1451   m_docs = dict_get_documents (m);
1452   if (d_docs != NULL) 
1453     {
1454       if (m_docs == NULL)
1455         dict_set_documents (m, d_docs);
1456       else
1457         {
1458           char *new_docs;
1459           size_t new_len;
1460
1461           new_len = strlen (m_docs) + strlen (d_docs);
1462           new_docs = xmalloc (new_len + 1);
1463           strcpy (new_docs, m_docs);
1464           strcat (new_docs, d_docs);
1465           dict_set_documents (m, new_docs);
1466           free (new_docs);
1467         }
1468     }
1469   
1470   for (i = 0; i < dict_get_var_cnt (d); i++)
1471     {
1472       struct variable *dv = dict_get_var (d, i);
1473       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
1474
1475       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
1476         continue;
1477
1478       if (mv != NULL)
1479         {
1480           if (var_get_width (mv) != var_get_width (dv)) 
1481             {
1482               msg (SE, _("Variable %s in file %s (%s) has different "
1483                          "type or width from the same variable in "
1484                          "earlier file (%s)."),
1485                    var_get_name (dv), fh_get_name (f->handle),
1486                    var_type_description (dv), var_type_description (mv));
1487               return 0;
1488             }
1489         
1490           if (var_get_width (dv) == var_get_width (mv))
1491             {
1492               if (var_has_value_labels (dv) && !var_has_value_labels (mv))
1493                 var_set_value_labels (mv, var_get_value_labels (dv));
1494               if (var_has_missing_values (dv) && !var_has_missing_values (mv))
1495                 var_set_missing_values (mv, var_get_missing_values (dv));
1496             }
1497
1498           if (var_get_label (dv) && !var_get_label (mv))
1499             var_set_label (mv, var_get_label (dv));
1500         }
1501       else
1502         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
1503     }
1504
1505   return 1;
1506 }
1507
1508 /* Marks V's master variable as MASTER. */
1509 static void
1510 set_master (struct variable *v, struct variable *master) 
1511 {
1512   var_attach_aux (v, master, NULL);
1513 }
1514
1515 /* Returns the master variable corresponding to V,
1516    as set with set_master(). */
1517 static struct variable *
1518 get_master (struct variable *v) 
1519 {
1520   return var_get_aux (v);
1521 }
1522 \f
1523 /* Case map.
1524
1525    A case map copies data from a case that corresponds for one
1526    dictionary to a case that corresponds to a second dictionary
1527    derived from the first by, optionally, deleting, reordering,
1528    or renaming variables.  (No new variables may be created.)
1529    */
1530
1531 /* A case map. */
1532 struct case_map
1533   {
1534     size_t value_cnt;   /* Number of values in map. */
1535     int *map;           /* For each destination index, the
1536                            corresponding source index. */
1537   };
1538
1539 /* Prepares dictionary D for producing a case map.  Afterward,
1540    the caller may delete, reorder, or rename variables within D
1541    at will before using finish_case_map() to produce the case
1542    map.
1543
1544    Uses D's aux members, which must otherwise not be in use. */
1545 static void
1546 start_case_map (struct dictionary *d) 
1547 {
1548   size_t var_cnt = dict_get_var_cnt (d);
1549   size_t i;
1550   
1551   for (i = 0; i < var_cnt; i++)
1552     {
1553       struct variable *v = dict_get_var (d, i);
1554       int *src_fv = xmalloc (sizeof *src_fv);
1555       *src_fv = var_get_case_index (v);
1556       var_attach_aux (v, src_fv, var_dtor_free);
1557     }
1558 }
1559
1560 /* Produces a case map from dictionary D, which must have been
1561    previously prepared with start_case_map().
1562
1563    Does not retain any reference to D, and clears the aux members
1564    set up by start_case_map().
1565
1566    Returns the new case map, or a null pointer if no mapping is
1567    required (that is, no data has changed position). */
1568 static struct case_map *
1569 finish_case_map (struct dictionary *d) 
1570 {
1571   struct case_map *map;
1572   size_t var_cnt = dict_get_var_cnt (d);
1573   size_t i;
1574   int identity_map;
1575
1576   map = xmalloc (sizeof *map);
1577   map->value_cnt = dict_get_next_value_idx (d);
1578   map->map = xnmalloc (map->value_cnt, sizeof *map->map);
1579   for (i = 0; i < map->value_cnt; i++)
1580     map->map[i] = -1;
1581
1582   identity_map = 1;
1583   for (i = 0; i < var_cnt; i++) 
1584     {
1585       struct variable *v = dict_get_var (d, i);
1586       size_t value_cnt = var_get_value_cnt (v);
1587       int *src_fv = (int *) var_detach_aux (v);
1588       size_t idx;
1589
1590       if (var_get_case_index (v) != *src_fv)
1591         identity_map = 0;
1592       
1593       for (idx = 0; idx < value_cnt; idx++)
1594         {
1595           int src_idx = *src_fv + idx;
1596           int dst_idx = var_get_case_index (v) + idx;
1597           
1598           assert (map->map[dst_idx] == -1);
1599           map->map[dst_idx] = src_idx;
1600         }
1601       free (src_fv);
1602     }
1603
1604   if (identity_map) 
1605     {
1606       destroy_case_map (map);
1607       return NULL;
1608     }
1609
1610   while (map->value_cnt > 0 && map->map[map->value_cnt - 1] == -1)
1611     map->value_cnt--;
1612
1613   return map;
1614 }
1615
1616 /* Maps from SRC to DST, applying case map MAP. */
1617 static void
1618 map_case (const struct case_map *map,
1619           const struct ccase *src, struct ccase *dst) 
1620 {
1621   size_t dst_idx;
1622
1623   assert (map != NULL);
1624   assert (src != NULL);
1625   assert (dst != NULL);
1626   assert (src != dst);
1627
1628   for (dst_idx = 0; dst_idx < map->value_cnt; dst_idx++)
1629     {
1630       int src_idx = map->map[dst_idx];
1631       if (src_idx != -1)
1632         *case_data_rw_idx (dst, dst_idx) = *case_data_idx (src, src_idx);
1633     }
1634 }
1635
1636 /* Destroys case map MAP. */
1637 static void
1638 destroy_case_map (struct case_map *map) 
1639 {
1640   if (map != NULL) 
1641     {
1642       free (map->map);
1643       free (map);
1644     }
1645 }