8b03ec95f809322e88edff3a47bd1a0d6ba7465f
[pspp-builds.git] / src / language / data-io / get.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-reader.h>
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case-source.h>
27 #include <data/case.h>
28 #include <data/casefile.h>
29 #include <data/fastfile.h>
30 #include <data/format.h>
31 #include <data/dictionary.h>
32 #include <data/por-file-writer.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/sys-file-writer.h>
37 #include <data/transformations.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/command.h>
41 #include <language/data-io/file-handle.h>
42 #include <language/lexer/lexer.h>
43 #include <language/lexer/variable-parser.h>
44 #include <libpspp/alloc.h>
45 #include <libpspp/assertion.h>
46 #include <libpspp/compiler.h>
47 #include <libpspp/hash.h>
48 #include <libpspp/message.h>
49 #include <libpspp/message.h>
50 #include <libpspp/misc.h>
51 #include <libpspp/str.h>
52
53 #include "gettext.h"
54 #define _(msgid) gettext (msgid)
55
56 /* Rearranging and reducing a dictionary. */
57 static void start_case_map (struct dictionary *);
58 static struct case_map *finish_case_map (struct dictionary *);
59 static void map_case (const struct case_map *,
60                       const struct ccase *, struct ccase *);
61 static void destroy_case_map (struct case_map *);
62
63 static bool parse_dict_trim (struct lexer *, struct dictionary *);
64 \f
65 /* Reading system and portable files. */
66
67 /* Type of command. */
68 enum reader_command 
69   {
70     GET_CMD,
71     IMPORT_CMD
72   };
73
74 /* Case reader input program. */
75 struct case_reader_pgm 
76   {
77     struct any_reader *reader;  /* File reader. */
78     struct case_map *map;       /* Map from file dict to active file dict. */
79     struct ccase bounce;        /* Bounce buffer. */
80   };
81
82 static const struct case_source_class case_reader_source_class;
83
84 static void case_reader_pgm_free (struct case_reader_pgm *);
85
86 /* Parses a GET or IMPORT command. */
87 static int
88 parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command type)
89 {
90   struct case_reader_pgm *pgm = NULL;
91   struct file_handle *fh = NULL;
92   struct dictionary *dict = NULL;
93
94   for (;;)
95     {
96       lex_match (lexer, '/');
97
98       if (lex_match_id (lexer, "FILE") || lex_token (lexer) == T_STRING)
99         {
100           lex_match (lexer, '=');
101
102           fh = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
103           if (fh == NULL)
104             goto error;
105         }
106       else if (type == IMPORT_CMD && lex_match_id (lexer, "TYPE"))
107         {
108           lex_match (lexer, '=');
109
110           if (lex_match_id (lexer, "COMM"))
111             type = PFM_COMM;
112           else if (lex_match_id (lexer, "TAPE"))
113             type = PFM_TAPE;
114           else
115             {
116               lex_error (lexer, _("expecting COMM or TAPE"));
117               goto error;
118             }
119         }
120       else
121         break; 
122     }
123   
124   if (fh == NULL) 
125     {
126       lex_sbc_missing (lexer, "FILE");
127       goto error;
128     }
129               
130   discard_variables (ds);
131
132   pgm = xmalloc (sizeof *pgm);
133   pgm->reader = any_reader_open (fh, &dict);
134   pgm->map = NULL;
135   case_nullify (&pgm->bounce);
136   if (pgm->reader == NULL)
137     goto error;
138
139   case_create (&pgm->bounce, dict_get_next_value_idx (dict));
140   
141   start_case_map (dict);
142
143   while (lex_token (lexer) != '.')
144     {
145       lex_match (lexer, '/');
146       if (!parse_dict_trim (lexer, dict))
147         goto error;
148     }
149
150   pgm->map = finish_case_map (dict);
151   
152   dict_destroy (dataset_dict (ds));
153   dataset_set_dict (ds, dict);
154
155   proc_set_source (ds, 
156                    create_case_source (&case_reader_source_class, pgm));
157
158   return CMD_SUCCESS;
159
160  error:
161   case_reader_pgm_free (pgm);
162   if (dict != NULL)
163     dict_destroy (dict);
164   return CMD_CASCADING_FAILURE;
165 }
166
167 /* Frees a struct case_reader_pgm. */
168 static void
169 case_reader_pgm_free (struct case_reader_pgm *pgm) 
170 {
171   if (pgm != NULL) 
172     {
173       any_reader_close (pgm->reader);
174       destroy_case_map (pgm->map);
175       case_destroy (&pgm->bounce);
176       free (pgm);
177     }
178 }
179
180 /* Reads one case into C.
181    Returns true if successful, false at end of file or if an
182    I/O error occurred. */
183 static bool
184 case_reader_source_read (struct case_source *source, struct ccase *c)
185 {
186   struct case_reader_pgm *pgm = source->aux;
187   if (any_reader_read (pgm->reader, pgm->map == NULL ? c : &pgm->bounce)) 
188     {
189       if (pgm->map != NULL)
190         map_case (pgm->map, &pgm->bounce, c);
191       return true;
192     }
193   else  
194     return false;
195 }
196
197 /* Destroys the source.
198    Returns true if successful read, false if an I/O occurred
199    during destruction or previously. */
200 static bool
201 case_reader_source_destroy (struct case_source *source)
202 {
203   struct case_reader_pgm *pgm = source->aux;
204   bool ok = !any_reader_error (pgm->reader); 
205   case_reader_pgm_free (pgm);
206   return ok;
207 }
208
209 static const struct case_source_class case_reader_source_class =
210   {
211     "case reader",
212     NULL,
213     case_reader_source_read,
214     case_reader_source_destroy,
215   };
216 \f
217 /* GET. */
218 int
219 cmd_get (struct lexer *lexer, struct dataset *ds) 
220 {
221   return parse_read_command (lexer, ds, GET_CMD);
222 }
223
224 /* IMPORT. */
225 int
226 cmd_import (struct lexer *lexer, struct dataset *ds) 
227 {
228   return parse_read_command (lexer, ds, IMPORT_CMD);
229 }
230 \f
231 /* Writing system and portable files. */ 
232
233 /* Type of output file. */
234 enum writer_type
235   {
236     SYSFILE_WRITER,     /* System file. */
237     PORFILE_WRITER      /* Portable file. */
238   };
239
240 /* Type of a command. */
241 enum command_type 
242   {
243     XFORM_CMD,          /* Transformation. */
244     PROC_CMD            /* Procedure. */
245   };
246
247 /* File writer plus a case map. */
248 struct case_writer
249   {
250     struct any_writer *writer;  /* File writer. */
251     struct case_map *map;       /* Map to output file dictionary
252                                    (null pointer for identity mapping). */
253     struct ccase bounce;        /* Bounce buffer for mapping (if needed). */
254   };
255
256 /* Destroys AW. */
257 static bool
258 case_writer_destroy (struct case_writer *aw)
259 {
260   bool ok = true;
261   if (aw != NULL) 
262     {
263       ok = any_writer_close (aw->writer);
264       destroy_case_map (aw->map);
265       case_destroy (&aw->bounce);
266       free (aw);
267     }
268   return ok;
269 }
270
271 /* Parses SAVE or XSAVE or EXPORT or XEXPORT command.
272    WRITER_TYPE identifies the type of file to write,
273    and COMMAND_TYPE identifies the type of command.
274
275    On success, returns a writer.
276    For procedures only, sets *RETAIN_UNSELECTED to true if cases
277    that would otherwise be excluded by FILTER or USE should be
278    included.
279
280    On failure, returns a null pointer. */
281 static struct case_writer *
282 parse_write_command (struct lexer *lexer, struct dataset *ds, 
283                      enum writer_type writer_type,
284                      enum command_type command_type,
285                      bool *retain_unselected)
286 {
287   /* Common data. */
288   struct file_handle *handle; /* Output file. */
289   struct dictionary *dict;    /* Dictionary for output file. */
290   struct case_writer *aw;      /* Writer. */  
291
292   /* Common options. */
293   bool print_map;             /* Print map?  TODO. */
294   bool print_short_names;     /* Print long-to-short name map.  TODO. */
295   struct sfm_write_options sysfile_opts;
296   struct pfm_write_options porfile_opts;
297
298   assert (writer_type == SYSFILE_WRITER || writer_type == PORFILE_WRITER);
299   assert (command_type == XFORM_CMD || command_type == PROC_CMD);
300   assert ((retain_unselected != NULL) == (command_type == PROC_CMD));
301
302   if (command_type == PROC_CMD)
303     *retain_unselected = true;
304
305   handle = NULL;
306   dict = dict_clone (dataset_dict (ds));
307   aw = xmalloc (sizeof *aw);
308   aw->writer = NULL;
309   aw->map = NULL;
310   case_nullify (&aw->bounce);
311   print_map = false;
312   print_short_names = false;
313   sysfile_opts = sfm_writer_default_options ();
314   porfile_opts = pfm_writer_default_options ();
315
316   start_case_map (dict);
317   dict_delete_scratch_vars (dict);
318
319   lex_match (lexer, '/');
320   for (;;)
321     {
322       if (lex_match_id (lexer, "OUTFILE"))
323         {
324           if (handle != NULL) 
325             {
326               lex_sbc_only_once ("OUTFILE");
327               goto error; 
328             }
329           
330           lex_match (lexer, '=');
331       
332           handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
333           if (handle == NULL)
334             goto error;
335         }
336       else if (lex_match_id (lexer, "NAMES"))
337         print_short_names = true;
338       else if (lex_match_id (lexer, "PERMISSIONS")) 
339         {
340           bool cw;
341           
342           lex_match (lexer, '=');
343           if (lex_match_id (lexer, "READONLY"))
344             cw = false;
345           else if (lex_match_id (lexer, "WRITEABLE"))
346             cw = true;
347           else
348             {
349               lex_error (lexer, _("expecting %s or %s"), "READONLY", "WRITEABLE");
350               goto error;
351             }
352           sysfile_opts.create_writeable = porfile_opts.create_writeable = cw;
353         }
354       else if (command_type == PROC_CMD && lex_match_id (lexer, "UNSELECTED")) 
355         {
356           lex_match (lexer, '=');
357           if (lex_match_id (lexer, "RETAIN"))
358             *retain_unselected = true;
359           else if (lex_match_id (lexer, "DELETE"))
360             *retain_unselected = false;
361           else
362             {
363               lex_error (lexer, _("expecting %s or %s"), "RETAIN", "DELETE");
364               goto error;
365             }
366         }
367       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "COMPRESSED"))
368         sysfile_opts.compress = true;
369       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "UNCOMPRESSED"))
370         sysfile_opts.compress = false;
371       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "VERSION"))
372         {
373           lex_match (lexer, '=');
374           if (!lex_force_int (lexer))
375             goto error;
376           sysfile_opts.version = lex_integer (lexer);
377           lex_get (lexer);
378         }
379       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "TYPE")) 
380         {
381           lex_match (lexer, '=');
382           if (lex_match_id (lexer, "COMMUNICATIONS"))
383             porfile_opts.type = PFM_COMM;
384           else if (lex_match_id (lexer, "TAPE"))
385             porfile_opts.type = PFM_TAPE;
386           else
387             {
388               lex_error (lexer, _("expecting %s or %s"), "COMM", "TAPE");
389               goto error;
390             }
391         }
392       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "DIGITS")) 
393         {
394           lex_match (lexer, '=');
395           if (!lex_force_int (lexer))
396             goto error;
397           porfile_opts.digits = lex_integer (lexer);
398           lex_get (lexer);
399         }
400       else if (!parse_dict_trim (lexer, dict))
401         goto error;
402       
403       if (!lex_match (lexer, '/'))
404         break;
405     }
406   if (lex_end_of_command (lexer) != CMD_SUCCESS)
407     goto error;
408
409   if (handle == NULL) 
410     {
411       lex_sbc_missing (lexer, "OUTFILE");
412       goto error;
413     }
414
415   dict_compact_values (dict);
416   aw->map = finish_case_map (dict);
417   if (aw->map != NULL)
418     case_create (&aw->bounce, dict_get_next_value_idx (dict));
419
420   if (fh_get_referent (handle) == FH_REF_FILE) 
421     {
422       switch (writer_type) 
423         {
424         case SYSFILE_WRITER:
425           aw->writer = any_writer_from_sfm_writer (
426             sfm_open_writer (handle, dict, sysfile_opts));
427           break;
428         case PORFILE_WRITER:
429           aw->writer = any_writer_from_pfm_writer (
430             pfm_open_writer (handle, dict, porfile_opts));
431           break;
432         }
433     }
434   else
435     aw->writer = any_writer_open (handle, dict);
436   if (aw->writer == NULL)
437     goto error;
438   dict_destroy (dict);
439   
440   return aw;
441
442  error:
443   case_writer_destroy (aw);
444   dict_destroy (dict);
445   return NULL;
446 }
447
448 /* Writes case C to writer AW. */
449 static bool
450 case_writer_write_case (struct case_writer *aw, const struct ccase *c) 
451 {
452   if (aw->map != NULL) 
453     {
454       map_case (aw->map, c, &aw->bounce);
455       c = &aw->bounce; 
456     }
457   return any_writer_write (aw->writer, c);
458 }
459 \f
460 /* SAVE and EXPORT. */
461
462 /* Parses and performs the SAVE or EXPORT procedure. */
463 static int
464 parse_output_proc (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type)
465 {
466   bool retain_unselected;
467   struct variable *saved_filter_variable;
468   struct case_writer *aw;
469   struct ccase *c;
470   bool ok = true;
471
472   aw = parse_write_command (lexer, ds, writer_type, PROC_CMD, &retain_unselected);
473   if (aw == NULL) 
474     return CMD_CASCADING_FAILURE;
475
476   saved_filter_variable = dict_get_filter (dataset_dict (ds));
477   if (retain_unselected) 
478     dict_set_filter (dataset_dict (ds), NULL);
479
480   proc_open (ds);
481   while (ok && proc_read (ds, &c))
482     ok = case_writer_write_case (aw, c);
483   ok = proc_close (ds) && ok;
484
485   dict_set_filter (dataset_dict (ds), saved_filter_variable);
486
487   case_writer_destroy (aw);
488   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
489 }
490
491 int
492 cmd_save (struct lexer *lexer, struct dataset *ds) 
493 {
494   return parse_output_proc (lexer, ds, SYSFILE_WRITER);
495 }
496
497 int
498 cmd_export (struct lexer *lexer, struct dataset *ds) 
499 {
500   return parse_output_proc (lexer, ds, PORFILE_WRITER);
501 }
502 \f
503 /* XSAVE and XEXPORT. */
504
505 /* Transformation. */
506 struct output_trns 
507   {
508     struct case_writer *aw;      /* Writer. */
509   };
510
511 static trns_proc_func output_trns_proc;
512 static trns_free_func output_trns_free;
513
514 /* Parses the XSAVE or XEXPORT transformation command. */
515 static int
516 parse_output_trns (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type) 
517 {
518   struct output_trns *t = xmalloc (sizeof *t);
519   t->aw = parse_write_command (lexer, ds, writer_type, XFORM_CMD, NULL);
520   if (t->aw == NULL) 
521     {
522       free (t);
523       return CMD_CASCADING_FAILURE;
524     }
525
526   add_transformation (ds, output_trns_proc, output_trns_free, t);
527   return CMD_SUCCESS;
528 }
529
530 /* Writes case C to the system file specified on XSAVE or XEXPORT. */
531 static int
532 output_trns_proc (void *trns_, struct ccase *c, casenumber case_num UNUSED)
533 {
534   struct output_trns *t = trns_;
535   case_writer_write_case (t->aw, c);
536   return TRNS_CONTINUE;
537 }
538
539 /* Frees an XSAVE or XEXPORT transformation.
540    Returns true if successful, false if an I/O error occurred. */
541 static bool
542 output_trns_free (void *trns_)
543 {
544   struct output_trns *t = trns_;
545   bool ok = true;
546
547   if (t != NULL)
548     {
549       ok = case_writer_destroy (t->aw);
550       free (t);
551     }
552   return ok;
553 }
554
555 /* XSAVE command. */
556 int
557 cmd_xsave (struct lexer *lexer, struct dataset *ds) 
558 {
559   return parse_output_trns (lexer, ds, SYSFILE_WRITER);
560 }
561
562 /* XEXPORT command. */
563 int
564 cmd_xexport (struct lexer *lexer, struct dataset *ds) 
565 {
566   return parse_output_trns (lexer, ds, PORFILE_WRITER);
567 }
568 \f
569 static bool rename_variables (struct lexer *lexer, struct dictionary *dict);
570 static bool drop_variables (struct lexer *, struct dictionary *dict);
571 static bool keep_variables (struct lexer *, struct dictionary *dict);
572
573 /* Commands that read and write system files share a great deal
574    of common syntactic structure for rearranging and dropping
575    variables.  This function parses this syntax and modifies DICT
576    appropriately.  Returns true on success, false on failure. */
577 static bool
578 parse_dict_trim (struct lexer *lexer, struct dictionary *dict)
579 {
580   if (lex_match_id (lexer, "MAP")) 
581     {
582       /* FIXME. */
583       return true;
584     }
585   else if (lex_match_id (lexer, "DROP"))
586     return drop_variables (lexer, dict);
587   else if (lex_match_id (lexer, "KEEP"))
588     return keep_variables (lexer, dict);
589   else if (lex_match_id (lexer, "RENAME"))
590     return rename_variables (lexer, dict);
591   else
592     {
593       lex_error (lexer, _("expecting a valid subcommand"));
594       return false;
595     }
596 }
597
598 /* Parses and performs the RENAME subcommand of GET and SAVE. */
599 static bool
600 rename_variables (struct lexer *lexer, struct dictionary *dict)
601 {
602   size_t i;
603
604   int success = 0;
605
606   struct variable **v;
607   char **new_names;
608   size_t nv, nn;
609   char *err_name;
610
611   int group;
612
613   lex_match (lexer, '=');
614   if (lex_token (lexer) != '(')
615     {
616       struct variable *v;
617
618       v = parse_variable (lexer, dict);
619       if (v == NULL)
620         return 0;
621       if (!lex_force_match (lexer, '=')
622           || !lex_force_id (lexer))
623         return 0;
624       if (dict_lookup_var (dict, lex_tokid (lexer)) != NULL)
625         {
626           msg (SE, _("Cannot rename %s as %s because there already exists "
627                      "a variable named %s.  To rename variables with "
628                      "overlapping names, use a single RENAME subcommand "
629                      "such as \"/RENAME (A=B)(B=C)(C=A)\", or equivalently, "
630                      "\"/RENAME (A B C=B C A)\"."),
631                var_get_name (v), lex_tokid (lexer), lex_tokid (lexer));
632           return 0;
633         }
634       
635       dict_rename_var (dict, v, lex_tokid (lexer));
636       lex_get (lexer);
637       return 1;
638     }
639
640   nv = nn = 0;
641   v = NULL;
642   new_names = 0;
643   group = 1;
644   while (lex_match (lexer, '('))
645     {
646       size_t old_nv = nv;
647
648       if (!parse_variables (lexer, dict, &v, &nv, PV_NO_DUPLICATE | PV_APPEND))
649         goto done;
650       if (!lex_match (lexer, '='))
651         {
652           msg (SE, _("`=' expected after variable list."));
653           goto done;
654         }
655       if (!parse_DATA_LIST_vars (lexer, &new_names, &nn, PV_APPEND | PV_NO_SCRATCH))
656         goto done;
657       if (nn != nv)
658         {
659           msg (SE, _("Number of variables on left side of `=' (%d) does not "
660                      "match number of variables on right side (%d), in "
661                      "parenthesized group %d of RENAME subcommand."),
662                (unsigned) (nv - old_nv), (unsigned) (nn - old_nv), group);
663           goto done;
664         }
665       if (!lex_force_match (lexer, ')'))
666         goto done;
667       group++;
668     }
669
670   if (!dict_rename_vars (dict, v, new_names, nv, &err_name)) 
671     {
672       msg (SE, _("Requested renaming duplicates variable name %s."), err_name);
673       goto done;
674     }
675   success = 1;
676
677  done:
678   for (i = 0; i < nn; i++)
679     free (new_names[i]);
680   free (new_names);
681   free (v);
682
683   return success;
684 }
685
686 /* Parses and performs the DROP subcommand of GET and SAVE.
687    Returns true if successful, false on failure.*/
688 static bool
689 drop_variables (struct lexer *lexer, struct dictionary *dict)
690 {
691   struct variable **v;
692   size_t nv;
693
694   lex_match (lexer, '=');
695   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
696     return false;
697   dict_delete_vars (dict, v, nv);
698   free (v);
699
700   if (dict_get_var_cnt (dict) == 0)
701     {
702       msg (SE, _("Cannot DROP all variables from dictionary."));
703       return false;
704     }
705   return true;
706 }
707
708 /* Parses and performs the KEEP subcommand of GET and SAVE.
709    Returns true if successful, false on failure.*/
710 static bool
711 keep_variables (struct lexer *lexer, struct dictionary *dict)
712 {
713   struct variable **v;
714   size_t nv;
715   size_t i;
716
717   lex_match (lexer, '=');
718   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
719     return false;
720
721   /* Move the specified variables to the beginning. */
722   dict_reorder_vars (dict, v, nv);
723           
724   /* Delete the remaining variables. */
725   v = xnrealloc (v, dict_get_var_cnt (dict) - nv, sizeof *v);
726   for (i = nv; i < dict_get_var_cnt (dict); i++)
727     v[i - nv] = dict_get_var (dict, i);
728   dict_delete_vars (dict, v, dict_get_var_cnt (dict) - nv);
729   free (v);
730
731   return true;
732 }
733 \f
734 /* MATCH FILES. */
735
736 /* File types. */
737 enum
738   {
739     MTF_FILE,                   /* Specified on FILE= subcommand. */
740     MTF_TABLE                   /* Specified on TABLE= subcommand. */
741   };
742
743 /* One of the files on MATCH FILES. */
744 struct mtf_file
745   {
746     struct mtf_file *next, *prev; /* Next, previous in the list of files. */
747     struct mtf_file *next_min;  /* Next in the chain of minimums. */
748     
749     int type;                   /* One of MTF_*. */
750     struct variable **by;       /* List of BY variables for this file. */
751     struct file_handle *handle; /* File handle. */
752     struct any_reader *reader;  /* File reader. */
753     struct dictionary *dict;    /* Dictionary from system file. */
754
755     /* IN subcommand. */
756     char *in_name;              /* Variable name. */
757     struct variable *in_var;    /* Variable (in master dictionary). */
758
759     struct ccase input_storage; /* Input record storage. */
760     struct ccase *input;        /* Input record. */
761   };
762
763 /* MATCH FILES procedure. */
764 struct mtf_proc 
765   {
766     struct mtf_file *head;      /* First file mentioned on FILE or TABLE. */
767     struct mtf_file *tail;      /* Last file mentioned on FILE or TABLE. */
768
769     bool ok;                    /* False if I/O error occurs. */
770
771     size_t by_cnt;              /* Number of variables on BY subcommand. */
772
773     /* Names of FIRST, LAST variables. */
774     char first[LONG_NAME_LEN + 1], last[LONG_NAME_LEN + 1];
775     
776     struct dictionary *dict;    /* Dictionary of output file. */
777     struct casefile *output;    /* MATCH FILES output. */
778     struct ccase mtf_case;      /* Case used for output. */
779
780     unsigned seq_num;           /* Have we initialized this variable? */
781     unsigned *seq_nums;         /* Sequence numbers for each var in dict. */
782   };
783
784 static bool mtf_free (struct mtf_proc *);
785 static bool mtf_close_file (struct mtf_file *);
786 static int mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
787 static bool mtf_read_records (struct mtf_proc *, struct dataset *);
788 static bool mtf_delete_file_in_place (struct mtf_proc *, struct mtf_file **);
789
790 static bool mtf_processing (struct mtf_proc *, struct dataset *);
791
792 static char *var_type_description (struct variable *);
793
794 static void set_master (struct variable *, struct variable *master);
795 static struct variable *get_master (struct variable *);
796
797 /* Parse and execute the MATCH FILES command. */
798 int
799 cmd_match_files (struct lexer *lexer, struct dataset *ds)
800 {
801   struct mtf_proc mtf;
802   struct mtf_file *first_table = NULL;
803   struct mtf_file *iter;
804   
805   bool used_active_file = false;
806   bool saw_table = false;
807   bool saw_in = false;
808
809   mtf.head = mtf.tail = NULL;
810   mtf.by_cnt = 0;
811   mtf.first[0] = '\0';
812   mtf.last[0] = '\0';
813   mtf.dict = dict_create ();
814   mtf.output = NULL;
815   case_nullify (&mtf.mtf_case);
816   mtf.seq_num = 0;
817   mtf.seq_nums = NULL;
818   dict_set_case_limit (mtf.dict, dict_get_case_limit (dataset_dict (ds)));
819
820   lex_match (lexer, '/');
821   while (lex_token (lexer) == T_ID
822          && (lex_id_match (ss_cstr ("FILE"), ss_cstr (lex_tokid (lexer)))
823              || lex_id_match (ss_cstr ("TABLE"), ss_cstr (lex_tokid (lexer)))))
824     {
825       struct mtf_file *file = xmalloc (sizeof *file);
826
827       if (lex_match_id (lexer, "FILE"))
828         file->type = MTF_FILE;
829       else if (lex_match_id (lexer, "TABLE"))
830         {
831           file->type = MTF_TABLE;
832           saw_table = true;
833         }
834       else
835         NOT_REACHED ();
836       lex_match (lexer, '=');
837
838       file->by = NULL;
839       file->handle = NULL;
840       file->reader = NULL;
841       file->dict = NULL;
842       file->in_name = NULL;
843       file->in_var = NULL;
844       case_nullify (&file->input_storage);
845       file->input = &file->input_storage;
846
847       /* FILEs go first, then TABLEs. */
848       if (file->type == MTF_TABLE || first_table == NULL)
849         {
850           file->next = NULL;
851           file->prev = mtf.tail;
852           if (mtf.tail)
853             mtf.tail->next = file;
854           mtf.tail = file;
855           if (mtf.head == NULL)
856             mtf.head = file;
857           if (file->type == MTF_TABLE && first_table == NULL)
858             first_table = file;
859         }
860       else 
861         {
862           assert (file->type == MTF_FILE);
863           file->next = first_table;
864           file->prev = first_table->prev;
865           if (first_table->prev)
866             first_table->prev->next = file;
867           else
868             mtf.head = file;
869           first_table->prev = file;
870         }
871
872       if (lex_match (lexer, '*'))
873         {
874           file->handle = NULL;
875           file->reader = NULL;
876               
877           if (used_active_file)
878             {
879               msg (SE, _("The active file may not be specified more "
880                          "than once."));
881               goto error;
882             }
883           used_active_file = true;
884
885           if (!proc_has_source (ds))
886             {
887               msg (SE, _("Cannot specify the active file since no active "
888                          "file has been defined."));
889               goto error;
890             }
891
892           if (proc_make_temporary_transformations_permanent (ds))
893             msg (SE,
894                  _("MATCH FILES may not be used after TEMPORARY when "
895                    "the active file is an input source.  "
896                    "Temporary transformations will be made permanent."));
897
898           file->dict = dataset_dict (ds);
899         }
900       else
901         {
902           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
903           if (file->handle == NULL)
904             goto error;
905
906           file->reader = any_reader_open (file->handle, &file->dict);
907           if (file->reader == NULL)
908             goto error;
909
910           case_create (&file->input_storage,
911                        dict_get_next_value_idx (file->dict));
912         }
913
914       while (lex_match (lexer, '/'))
915         if (lex_match_id (lexer, "RENAME")) 
916           {
917             if (!rename_variables (lexer, file->dict))
918               goto error; 
919           }
920         else if (lex_match_id (lexer, "IN"))
921           {
922             lex_match (lexer, '=');
923             if (lex_token (lexer) != T_ID)
924               {
925                 lex_error (lexer, NULL);
926                 goto error;
927               }
928
929             if (file->in_name != NULL)
930               {
931                 msg (SE, _("Multiple IN subcommands for a single FILE or "
932                            "TABLE."));
933                 goto error;
934               }
935             file->in_name = xstrdup (lex_tokid (lexer));
936             lex_get (lexer);
937             saw_in = true;
938           }
939
940       mtf_merge_dictionary (mtf.dict, file);
941     }
942   
943   while (lex_token (lexer) != '.')
944     {
945       if (lex_match (lexer, T_BY))
946         {
947           struct variable **by;
948           
949           if (mtf.by_cnt)
950             {
951               msg (SE, _("BY may appear at most once."));
952               goto error;
953             }
954               
955           lex_match (lexer, '=');
956           if (!parse_variables (lexer, mtf.dict, &by, &mtf.by_cnt,
957                                 PV_NO_DUPLICATE | PV_NO_SCRATCH))
958             goto error;
959
960           for (iter = mtf.head; iter != NULL; iter = iter->next)
961             {
962               size_t i;
963           
964               iter->by = xnmalloc (mtf.by_cnt, sizeof *iter->by);
965
966               for (i = 0; i < mtf.by_cnt; i++)
967                 {
968                   iter->by[i] = dict_lookup_var (iter->dict,
969                                                  var_get_name (by[i]));
970                   if (iter->by[i] == NULL)
971                     {
972                       msg (SE, _("File %s lacks BY variable %s."),
973                            iter->handle ? fh_get_name (iter->handle) : "*",
974                            var_get_name (by[i]));
975                       free (by);
976                       goto error;
977                     }
978                 }
979             }
980           free (by);
981         }
982       else if (lex_match_id (lexer, "FIRST")) 
983         {
984           if (mtf.first[0] != '\0')
985             {
986               msg (SE, _("FIRST may appear at most once."));
987               goto error;
988             }
989               
990           lex_match (lexer, '=');
991           if (!lex_force_id (lexer))
992             goto error;
993           strcpy (mtf.first, lex_tokid (lexer));
994           lex_get (lexer);
995         }
996       else if (lex_match_id (lexer, "LAST")) 
997         {
998           if (mtf.last[0] != '\0')
999             {
1000               msg (SE, _("LAST may appear at most once."));
1001               goto error;
1002             }
1003               
1004           lex_match (lexer, '=');
1005           if (!lex_force_id (lexer))
1006             goto error;
1007           strcpy (mtf.last, lex_tokid (lexer));
1008           lex_get (lexer);
1009         }
1010       else if (lex_match_id (lexer, "MAP"))
1011         {
1012           /* FIXME. */
1013         }
1014       else if (lex_match_id (lexer, "DROP")) 
1015         {
1016           if (!drop_variables (lexer, mtf.dict))
1017             goto error;
1018         }
1019       else if (lex_match_id (lexer, "KEEP")) 
1020         {
1021           if (!keep_variables (lexer, mtf.dict))
1022             goto error;
1023         }
1024       else
1025         {
1026           lex_error (lexer, NULL);
1027           goto error;
1028         }
1029
1030       if (!lex_match (lexer, '/') && lex_token (lexer) != '.') 
1031         {
1032           lex_end_of_command (lexer);
1033           goto error;
1034         }
1035     }
1036
1037   if (mtf.by_cnt == 0)
1038     {
1039       if (saw_table)
1040         {
1041           msg (SE, _("BY is required when TABLE is specified."));
1042           goto error;
1043         }
1044       if (saw_in)
1045         {
1046           msg (SE, _("BY is required when IN is specified."));
1047           goto error;
1048         }
1049     }
1050
1051   /* Set up mapping from each file's variables to master
1052      variables. */
1053   for (iter = mtf.head; iter != NULL; iter = iter->next)
1054     {
1055       struct dictionary *d = iter->dict;
1056       int i;
1057
1058       for (i = 0; i < dict_get_var_cnt (d); i++)
1059         {
1060           struct variable *v = dict_get_var (d, i);
1061           struct variable *mv = dict_lookup_var (mtf.dict, var_get_name (v));
1062           if (mv != NULL)
1063             set_master (v, mv);
1064         }
1065     }
1066
1067   /* Add IN variables to master dictionary. */
1068   for (iter = mtf.head; iter != NULL; iter = iter->next) 
1069     if (iter->in_name != NULL)
1070       {
1071         struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
1072         iter->in_var = dict_create_var (mtf.dict, iter->in_name, 0);
1073         if (iter->in_var == NULL)
1074           {
1075             msg (SE, _("IN variable name %s duplicates an "
1076                        "existing variable name."),
1077                  var_get_name (iter->in_var));
1078             goto error;
1079           }
1080         var_set_both_formats (iter->in_var, &format);
1081       }
1082     
1083   /* MATCH FILES performs an n-way merge on all its input files.
1084      Abstract algorithm:
1085
1086      1. Read one input record from every input FILE.
1087
1088      2. If no FILEs are left, stop.  Otherwise, proceed to step 3.
1089
1090      3. Find the FILE input record(s) that have minimum BY
1091      values.  Store all the values from these input records into
1092      the output record.
1093
1094      4. For every TABLE, read another record as long as the BY values
1095      on the TABLE's input record are less than the FILEs' BY values.
1096      If an exact match is found, store all the values from the TABLE
1097      input record into the output record.
1098
1099      5. Write the output record.
1100
1101      6. Read another record from each input file FILE and TABLE that
1102      we stored values from above.  If we come to the end of one of the
1103      input files, remove it from the list of input files.
1104
1105      7. Repeat from step 2.
1106
1107      FIXME: For merging large numbers of files (more than 10?) a
1108      better algorithm would use a heap for finding minimum
1109      values. */
1110
1111   if (used_active_file) 
1112     {
1113       proc_set_sink (ds, create_case_sink (&null_sink_class, 
1114                                            dataset_dict (ds), NULL));
1115       proc_open (ds); 
1116     }
1117   else
1118     discard_variables (ds);
1119
1120   dict_compact_values (mtf.dict);
1121   mtf.output = fastfile_create (dict_get_next_value_idx (mtf.dict));
1122   mtf.seq_nums = xcalloc (dict_get_var_cnt (mtf.dict), sizeof *mtf.seq_nums);
1123   case_create (&mtf.mtf_case, dict_get_next_value_idx (mtf.dict));
1124
1125   if (!mtf_read_records (&mtf, ds))
1126     goto error;
1127   while (mtf.head && mtf.head->type == MTF_FILE)
1128     if (!mtf_processing (&mtf, ds))
1129       goto error;
1130   if (!proc_close (ds))
1131     goto error;
1132
1133   discard_variables (ds);
1134
1135   dict_destroy (dataset_dict (ds));
1136   dataset_set_dict (ds, mtf.dict);
1137   mtf.dict = NULL;
1138   proc_set_source (ds, storage_source_create (mtf.output));
1139   mtf.output = NULL;
1140   
1141   return mtf_free (&mtf) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
1142   
1143  error:
1144   proc_close (ds);
1145   mtf_free (&mtf);
1146   return CMD_CASCADING_FAILURE;
1147 }
1148
1149 /* Return a string in a static buffer describing V's variable type and
1150    width. */
1151 static char *
1152 var_type_description (struct variable *v)
1153 {
1154   static char buf[2][32];
1155   static int x = 0;
1156   char *s;
1157
1158   x ^= 1;
1159   s = buf[x];
1160
1161   if (var_is_numeric (v))
1162     strcpy (s, "numeric");
1163   else
1164     sprintf (s, "string with width %d", var_get_width (v));
1165   return s;
1166 }
1167
1168 /* Closes FILE and frees its associated data.
1169    Returns true if successful, false if an I/O error
1170    occurred on FILE. */
1171 static bool
1172 mtf_close_file (struct mtf_file *file)
1173 {
1174   bool ok = file->reader == NULL || !any_reader_error (file->reader);
1175   free (file->by);
1176   any_reader_close (file->reader);
1177   if (file->handle != NULL)
1178     dict_destroy (file->dict);
1179   case_destroy (&file->input_storage);
1180   free (file->in_name);
1181   free (file);
1182   return ok;
1183 }
1184
1185 /* Free all the data for the MATCH FILES procedure.
1186    Returns true if successful, false if an I/O error
1187    occurred. */
1188 static bool
1189 mtf_free (struct mtf_proc *mtf)
1190 {
1191   struct mtf_file *iter, *next;
1192   bool ok = true;
1193
1194   for (iter = mtf->head; iter; iter = next)
1195     {
1196       next = iter->next;
1197       assert (iter->dict != mtf->dict);
1198       if (!mtf_close_file (iter))
1199         ok = false;
1200     }
1201   
1202   if (mtf->dict)
1203     dict_destroy (mtf->dict);
1204   case_destroy (&mtf->mtf_case);
1205   free (mtf->seq_nums);
1206
1207   return ok;
1208 }
1209
1210 /* Remove *FILE from the mtf_file chain.  Make *FILE point to the next
1211    file in the chain, or to NULL if was the last in the chain.
1212    Returns true if successful, false if an I/O error occurred. */
1213 static bool
1214 mtf_delete_file_in_place (struct mtf_proc *mtf, struct mtf_file **file)
1215 {
1216   struct mtf_file *f = *file;
1217   int i;
1218
1219   if (f->prev)
1220     f->prev->next = f->next;
1221   if (f->next)
1222     f->next->prev = f->prev;
1223   if (f == mtf->head)
1224     mtf->head = f->next;
1225   if (f == mtf->tail)
1226     mtf->tail = f->prev;
1227   *file = f->next;
1228
1229   if (f->in_var != NULL)
1230     case_data_rw (&mtf->mtf_case, f->in_var)->f = 0.;
1231   for (i = 0; i < dict_get_var_cnt (f->dict); i++)
1232     {
1233       struct variable *v = dict_get_var (f->dict, i);
1234       struct variable *mv = get_master (v);
1235       if (mv != NULL) 
1236         {
1237           union value *out = case_data_rw (&mtf->mtf_case, mv);
1238           
1239           if (var_is_numeric (v))
1240             out->f = SYSMIS;
1241           else
1242             memset (out->s, ' ', var_get_width (v));
1243         } 
1244     }
1245
1246   return mtf_close_file (f);
1247 }
1248
1249 /* Read a record from every input file.
1250    Returns true if successful, false if an I/O error occurred. */
1251 static bool
1252 mtf_read_records (struct mtf_proc *mtf, struct dataset *ds)
1253 {
1254   struct mtf_file *iter, *next;
1255   bool ok = true;
1256
1257   for (iter = mtf->head; ok && iter != NULL; iter = next)
1258     {
1259       next = iter->next;
1260       if (iter->handle
1261           ? !any_reader_read (iter->reader, iter->input)
1262           : !proc_read (ds, &iter->input)) 
1263         {
1264           if (!mtf_delete_file_in_place (mtf, &iter))
1265             ok = false; 
1266         }
1267     }
1268   return ok;
1269 }
1270
1271 /* Compare the BY variables for files A and B; return -1 if A < B, 0
1272    if A == B, 1 if A > B. */
1273 static inline int
1274 mtf_compare_BY_values (struct mtf_proc *mtf,
1275                        struct mtf_file *a, struct mtf_file *b)
1276 {
1277   return case_compare_2dict (a->input, b->input, a->by, b->by, mtf->by_cnt);
1278 }
1279
1280 /* Perform one iteration of steps 3...7 above.
1281    Returns true if successful, false if an I/O error occurred. */
1282 static bool
1283 mtf_processing (struct mtf_proc *mtf, struct dataset *ds)
1284 {
1285   struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
1286   struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
1287   struct mtf_file *iter, *next;
1288
1289   /* 3. Find the FILE input record(s) that have minimum BY
1290      values.  Store all the values from these input records into
1291      the output record. */
1292   min_head = min_tail = mtf->head;
1293   max_head = max_tail = NULL;
1294   for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
1295        iter = iter->next) 
1296     {
1297       int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1298       if (cmp < 0) 
1299         {
1300           if (max_head)
1301             max_tail = max_tail->next_min = iter;
1302           else
1303             max_head = max_tail = iter;
1304         }
1305       else if (cmp == 0) 
1306         min_tail = min_tail->next_min = iter;
1307       else /* cmp > 0 */
1308         {
1309           if (max_head)
1310             {
1311               max_tail->next_min = min_head;
1312               max_tail = min_tail;
1313             }
1314           else
1315             {
1316               max_head = min_head;
1317               max_tail = min_tail;
1318             }
1319           min_head = min_tail = iter;
1320         }
1321     }
1322       
1323   /* 4. For every TABLE, read another record as long as the BY
1324      values on the TABLE's input record are less than the FILEs'
1325      BY values.  If an exact match is found, store all the values
1326      from the TABLE input record into the output record. */
1327   for (; iter != NULL; iter = next)
1328     {
1329       assert (iter->type == MTF_TABLE);
1330       
1331       next = iter->next;
1332       for (;;) 
1333         {
1334           int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1335           if (cmp < 0) 
1336             {
1337               if (max_head)
1338                 max_tail = max_tail->next_min = iter;
1339               else
1340                 max_head = max_tail = iter;
1341             }
1342           else if (cmp == 0)
1343             min_tail = min_tail->next_min = iter;
1344           else /* cmp > 0 */
1345             {
1346               if (iter->handle
1347                   ? any_reader_read (iter->reader, iter->input)
1348                   : proc_read (ds, &iter->input))
1349                 continue;
1350               if (!mtf_delete_file_in_place (mtf, &iter))
1351                 return false;
1352             }
1353           break;
1354         }
1355     }
1356
1357   /* Next sequence number. */
1358   mtf->seq_num++;
1359
1360   /* Store data to all the records we are using. */
1361   if (min_tail)
1362     min_tail->next_min = NULL;
1363   for (iter = min_head; iter; iter = iter->next_min)
1364     {
1365       int i;
1366
1367       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1368         {
1369           struct variable *v = dict_get_var (iter->dict, i);
1370           struct variable *mv = get_master (v);
1371           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1372           
1373           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1374             {
1375               const struct ccase *record = iter->input;
1376               union value *out = case_data_rw (&mtf->mtf_case, mv);
1377
1378               mtf->seq_nums[mv_index] = mtf->seq_num;
1379               if (var_is_numeric (v))
1380                 out->f = case_num (record, v);
1381               else
1382                 memcpy (out->s, case_str (record, v), var_get_width (v));
1383             } 
1384         }
1385       if (iter->in_var != NULL)
1386         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 1.;
1387     }
1388
1389   /* Store missing values to all the records we're not using. */
1390   if (max_tail)
1391     max_tail->next_min = NULL;
1392   for (iter = max_head; iter; iter = iter->next_min)
1393     {
1394       int i;
1395
1396       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1397         {
1398           struct variable *v = dict_get_var (iter->dict, i);
1399           struct variable *mv = get_master (v);
1400           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1401
1402           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1403             {
1404               union value *out = case_data_rw (&mtf->mtf_case, mv);
1405               mtf->seq_nums[mv_index] = mtf->seq_num;
1406
1407               if (var_is_numeric (v))
1408                 out->f = SYSMIS;
1409               else
1410                 memset (out->s, ' ', var_get_width (v));
1411             }
1412         }
1413       if (iter->in_var != NULL)
1414         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 0.;
1415     }
1416
1417   /* 5. Write the output record. */
1418   casefile_append (mtf->output, &mtf->mtf_case);
1419
1420   /* 6. Read another record from each input file FILE and TABLE
1421      that we stored values from above.  If we come to the end of
1422      one of the input files, remove it from the list of input
1423      files. */
1424   for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
1425     {
1426       next = iter->next_min;
1427       if (iter->reader != NULL
1428           ? !any_reader_read (iter->reader, iter->input)
1429           : !proc_read (ds, &iter->input))
1430         if (!mtf_delete_file_in_place (mtf, &iter))
1431           return false;
1432     }
1433   return true;
1434 }
1435
1436 /* Merge the dictionary for file F into master dictionary M. */
1437 static int
1438 mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
1439 {
1440   struct dictionary *d = f->dict;
1441   const char *d_docs, *m_docs;
1442   int i;
1443
1444   if (dict_get_label (m) == NULL)
1445     dict_set_label (m, dict_get_label (d));
1446
1447   d_docs = dict_get_documents (d);
1448   m_docs = dict_get_documents (m);
1449   if (d_docs != NULL) 
1450     {
1451       if (m_docs == NULL)
1452         dict_set_documents (m, d_docs);
1453       else
1454         {
1455           char *new_docs;
1456           size_t new_len;
1457
1458           new_len = strlen (m_docs) + strlen (d_docs);
1459           new_docs = xmalloc (new_len + 1);
1460           strcpy (new_docs, m_docs);
1461           strcat (new_docs, d_docs);
1462           dict_set_documents (m, new_docs);
1463           free (new_docs);
1464         }
1465     }
1466   
1467   for (i = 0; i < dict_get_var_cnt (d); i++)
1468     {
1469       struct variable *dv = dict_get_var (d, i);
1470       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
1471
1472       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
1473         continue;
1474
1475       if (mv != NULL)
1476         {
1477           if (var_get_width (mv) != var_get_width (dv)) 
1478             {
1479               msg (SE, _("Variable %s in file %s (%s) has different "
1480                          "type or width from the same variable in "
1481                          "earlier file (%s)."),
1482                    var_get_name (dv), fh_get_name (f->handle),
1483                    var_type_description (dv), var_type_description (mv));
1484               return 0;
1485             }
1486         
1487           if (var_get_width (dv) == var_get_width (mv))
1488             {
1489               if (var_has_value_labels (dv) && !var_has_value_labels (mv))
1490                 var_set_value_labels (mv, var_get_value_labels (dv));
1491               if (var_has_missing_values (dv) && !var_has_missing_values (mv))
1492                 var_set_missing_values (mv, var_get_missing_values (dv));
1493             }
1494
1495           if (var_get_label (dv) && !var_get_label (mv))
1496             var_set_label (mv, var_get_label (dv));
1497         }
1498       else
1499         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
1500     }
1501
1502   return 1;
1503 }
1504
1505 /* Marks V's master variable as MASTER. */
1506 static void
1507 set_master (struct variable *v, struct variable *master) 
1508 {
1509   var_attach_aux (v, master, NULL);
1510 }
1511
1512 /* Returns the master variable corresponding to V,
1513    as set with set_master(). */
1514 static struct variable *
1515 get_master (struct variable *v) 
1516 {
1517   return var_get_aux (v);
1518 }
1519 \f
1520 /* Case map.
1521
1522    A case map copies data from a case that corresponds for one
1523    dictionary to a case that corresponds to a second dictionary
1524    derived from the first by, optionally, deleting, reordering,
1525    or renaming variables.  (No new variables may be created.)
1526    */
1527
1528 /* A case map. */
1529 struct case_map
1530   {
1531     size_t value_cnt;   /* Number of values in map. */
1532     int *map;           /* For each destination index, the
1533                            corresponding source index. */
1534   };
1535
1536 /* Prepares dictionary D for producing a case map.  Afterward,
1537    the caller may delete, reorder, or rename variables within D
1538    at will before using finish_case_map() to produce the case
1539    map.
1540
1541    Uses D's aux members, which must otherwise not be in use. */
1542 static void
1543 start_case_map (struct dictionary *d) 
1544 {
1545   size_t var_cnt = dict_get_var_cnt (d);
1546   size_t i;
1547   
1548   for (i = 0; i < var_cnt; i++)
1549     {
1550       struct variable *v = dict_get_var (d, i);
1551       int *src_fv = xmalloc (sizeof *src_fv);
1552       *src_fv = var_get_case_index (v);
1553       var_attach_aux (v, src_fv, var_dtor_free);
1554     }
1555 }
1556
1557 /* Produces a case map from dictionary D, which must have been
1558    previously prepared with start_case_map().
1559
1560    Does not retain any reference to D, and clears the aux members
1561    set up by start_case_map().
1562
1563    Returns the new case map, or a null pointer if no mapping is
1564    required (that is, no data has changed position). */
1565 static struct case_map *
1566 finish_case_map (struct dictionary *d) 
1567 {
1568   struct case_map *map;
1569   size_t var_cnt = dict_get_var_cnt (d);
1570   size_t i;
1571   int identity_map;
1572
1573   map = xmalloc (sizeof *map);
1574   map->value_cnt = dict_get_next_value_idx (d);
1575   map->map = xnmalloc (map->value_cnt, sizeof *map->map);
1576   for (i = 0; i < map->value_cnt; i++)
1577     map->map[i] = -1;
1578
1579   identity_map = 1;
1580   for (i = 0; i < var_cnt; i++) 
1581     {
1582       struct variable *v = dict_get_var (d, i);
1583       size_t value_cnt = var_get_value_cnt (v);
1584       int *src_fv = (int *) var_detach_aux (v);
1585       size_t idx;
1586
1587       if (var_get_case_index (v) != *src_fv)
1588         identity_map = 0;
1589       
1590       for (idx = 0; idx < value_cnt; idx++)
1591         {
1592           int src_idx = *src_fv + idx;
1593           int dst_idx = var_get_case_index (v) + idx;
1594           
1595           assert (map->map[dst_idx] == -1);
1596           map->map[dst_idx] = src_idx;
1597         }
1598       free (src_fv);
1599     }
1600
1601   if (identity_map) 
1602     {
1603       destroy_case_map (map);
1604       return NULL;
1605     }
1606
1607   while (map->value_cnt > 0 && map->map[map->value_cnt - 1] == -1)
1608     map->value_cnt--;
1609
1610   return map;
1611 }
1612
1613 /* Maps from SRC to DST, applying case map MAP. */
1614 static void
1615 map_case (const struct case_map *map,
1616           const struct ccase *src, struct ccase *dst) 
1617 {
1618   size_t dst_idx;
1619
1620   assert (map != NULL);
1621   assert (src != NULL);
1622   assert (dst != NULL);
1623   assert (src != dst);
1624
1625   for (dst_idx = 0; dst_idx < map->value_cnt; dst_idx++)
1626     {
1627       int src_idx = map->map[dst_idx];
1628       if (src_idx != -1)
1629         *case_data_rw_idx (dst, dst_idx) = *case_data_idx (src, src_idx);
1630     }
1631 }
1632
1633 /* Destroys case map MAP. */
1634 static void
1635 destroy_case_map (struct case_map *map) 
1636 {
1637   if (map != NULL) 
1638     {
1639       free (map->map);
1640       free (map);
1641     }
1642 }