Added abstract factory to create casefiles. Updated procedures to use
[pspp-builds.git] / src / language / data-io / get.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-reader.h>
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case-source.h>
27 #include <data/case.h>
28 #include <data/casefile.h>
29 #include <data/fastfile.h>
30 #include <data/format.h>
31 #include <data/dictionary.h>
32 #include <data/por-file-writer.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/sys-file-writer.h>
37 #include <data/transformations.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/command.h>
41 #include <language/data-io/file-handle.h>
42 #include <language/lexer/lexer.h>
43 #include <language/lexer/variable-parser.h>
44 #include <libpspp/alloc.h>
45 #include <libpspp/assertion.h>
46 #include <libpspp/compiler.h>
47 #include <libpspp/hash.h>
48 #include <libpspp/message.h>
49 #include <libpspp/message.h>
50 #include <libpspp/misc.h>
51 #include <libpspp/str.h>
52
53 #include "gettext.h"
54 #define _(msgid) gettext (msgid)
55
56 /* Rearranging and reducing a dictionary. */
57 static void start_case_map (struct dictionary *);
58 static struct case_map *finish_case_map (struct dictionary *);
59 static void map_case (const struct case_map *,
60                       const struct ccase *, struct ccase *);
61 static void destroy_case_map (struct case_map *);
62
63 static bool parse_dict_trim (struct lexer *, struct dictionary *);
64 \f
65 /* Reading system and portable files. */
66
67 /* Type of command. */
68 enum reader_command 
69   {
70     GET_CMD,
71     IMPORT_CMD
72   };
73
74 /* Case reader input program. */
75 struct case_reader_pgm 
76   {
77     struct any_reader *reader;  /* File reader. */
78     struct case_map *map;       /* Map from file dict to active file dict. */
79     struct ccase bounce;        /* Bounce buffer. */
80   };
81
82 static const struct case_source_class case_reader_source_class;
83
84 static void case_reader_pgm_free (struct case_reader_pgm *);
85
86 /* Parses a GET or IMPORT command. */
87 static int
88 parse_read_command (struct lexer *lexer, struct dataset *ds, enum reader_command type)
89 {
90   struct case_reader_pgm *pgm = NULL;
91   struct file_handle *fh = NULL;
92   struct dictionary *dict = NULL;
93
94   for (;;)
95     {
96       lex_match (lexer, '/');
97
98       if (lex_match_id (lexer, "FILE") || lex_token (lexer) == T_STRING)
99         {
100           lex_match (lexer, '=');
101
102           fh = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
103           if (fh == NULL)
104             goto error;
105         }
106       else if (type == IMPORT_CMD && lex_match_id (lexer, "TYPE"))
107         {
108           lex_match (lexer, '=');
109
110           if (lex_match_id (lexer, "COMM"))
111             type = PFM_COMM;
112           else if (lex_match_id (lexer, "TAPE"))
113             type = PFM_TAPE;
114           else
115             {
116               lex_error (lexer, _("expecting COMM or TAPE"));
117               goto error;
118             }
119         }
120       else
121         break; 
122     }
123   
124   if (fh == NULL) 
125     {
126       lex_sbc_missing (lexer, "FILE");
127       goto error;
128     }
129               
130   discard_variables (ds);
131
132   pgm = xmalloc (sizeof *pgm);
133   pgm->reader = any_reader_open (fh, &dict);
134   pgm->map = NULL;
135   case_nullify (&pgm->bounce);
136   if (pgm->reader == NULL)
137     goto error;
138
139   case_create (&pgm->bounce, dict_get_next_value_idx (dict));
140   
141   start_case_map (dict);
142
143   while (lex_token (lexer) != '.')
144     {
145       lex_match (lexer, '/');
146       if (!parse_dict_trim (lexer, dict))
147         goto error;
148     }
149
150   pgm->map = finish_case_map (dict);
151   
152   dict_destroy (dataset_dict (ds));
153   dataset_set_dict (ds, dict);
154
155   proc_set_source (ds, 
156                    create_case_source (&case_reader_source_class, pgm));
157
158   return CMD_SUCCESS;
159
160  error:
161   case_reader_pgm_free (pgm);
162   if (dict != NULL)
163     dict_destroy (dict);
164   return CMD_CASCADING_FAILURE;
165 }
166
167 /* Frees a struct case_reader_pgm. */
168 static void
169 case_reader_pgm_free (struct case_reader_pgm *pgm) 
170 {
171   if (pgm != NULL) 
172     {
173       any_reader_close (pgm->reader);
174       destroy_case_map (pgm->map);
175       case_destroy (&pgm->bounce);
176       free (pgm);
177     }
178 }
179
180 /* Reads one case into C.
181    Returns true if successful, false at end of file or if an
182    I/O error occurred. */
183 static bool
184 case_reader_source_read (struct case_source *source, struct ccase *c)
185 {
186   struct case_reader_pgm *pgm = source->aux;
187   if (any_reader_read (pgm->reader, pgm->map == NULL ? c : &pgm->bounce)) 
188     {
189       if (pgm->map != NULL)
190         map_case (pgm->map, &pgm->bounce, c);
191       return true;
192     }
193   else  
194     return false;
195 }
196
197 /* Destroys the source.
198    Returns true if successful read, false if an I/O occurred
199    during destruction or previously. */
200 static bool
201 case_reader_source_destroy (struct case_source *source)
202 {
203   struct case_reader_pgm *pgm = source->aux;
204   bool ok = !any_reader_error (pgm->reader); 
205   case_reader_pgm_free (pgm);
206   return ok;
207 }
208
209 static const struct case_source_class case_reader_source_class =
210   {
211     "case reader",
212     NULL,
213     case_reader_source_read,
214     case_reader_source_destroy,
215   };
216 \f
217 /* GET. */
218 int
219 cmd_get (struct lexer *lexer, struct dataset *ds) 
220 {
221   return parse_read_command (lexer, ds, GET_CMD);
222 }
223
224 /* IMPORT. */
225 int
226 cmd_import (struct lexer *lexer, struct dataset *ds) 
227 {
228   return parse_read_command (lexer, ds, IMPORT_CMD);
229 }
230 \f
231 /* Writing system and portable files. */ 
232
233 /* Type of output file. */
234 enum writer_type
235   {
236     SYSFILE_WRITER,     /* System file. */
237     PORFILE_WRITER      /* Portable file. */
238   };
239
240 /* Type of a command. */
241 enum command_type 
242   {
243     XFORM_CMD,          /* Transformation. */
244     PROC_CMD            /* Procedure. */
245   };
246
247 /* File writer plus a case map. */
248 struct case_writer
249   {
250     struct any_writer *writer;  /* File writer. */
251     struct case_map *map;       /* Map to output file dictionary
252                                    (null pointer for identity mapping). */
253     struct ccase bounce;        /* Bounce buffer for mapping (if needed). */
254   };
255
256 /* Destroys AW. */
257 static bool
258 case_writer_destroy (struct case_writer *aw)
259 {
260   bool ok = true;
261   if (aw != NULL) 
262     {
263       ok = any_writer_close (aw->writer);
264       destroy_case_map (aw->map);
265       case_destroy (&aw->bounce);
266       free (aw);
267     }
268   return ok;
269 }
270
271 /* Parses SAVE or XSAVE or EXPORT or XEXPORT command.
272    WRITER_TYPE identifies the type of file to write,
273    and COMMAND_TYPE identifies the type of command.
274
275    On success, returns a writer.
276    For procedures only, sets *RETAIN_UNSELECTED to true if cases
277    that would otherwise be excluded by FILTER or USE should be
278    included.
279
280    On failure, returns a null pointer. */
281 static struct case_writer *
282 parse_write_command (struct lexer *lexer, struct dataset *ds, 
283                      enum writer_type writer_type,
284                      enum command_type command_type,
285                      bool *retain_unselected)
286 {
287   /* Common data. */
288   struct file_handle *handle; /* Output file. */
289   struct dictionary *dict;    /* Dictionary for output file. */
290   struct case_writer *aw;      /* Writer. */  
291
292   /* Common options. */
293   bool print_map;             /* Print map?  TODO. */
294   bool print_short_names;     /* Print long-to-short name map.  TODO. */
295   struct sfm_write_options sysfile_opts;
296   struct pfm_write_options porfile_opts;
297
298   assert (writer_type == SYSFILE_WRITER || writer_type == PORFILE_WRITER);
299   assert (command_type == XFORM_CMD || command_type == PROC_CMD);
300   assert ((retain_unselected != NULL) == (command_type == PROC_CMD));
301
302   if (command_type == PROC_CMD)
303     *retain_unselected = true;
304
305   handle = NULL;
306   dict = dict_clone (dataset_dict (ds));
307   aw = xmalloc (sizeof *aw);
308   aw->writer = NULL;
309   aw->map = NULL;
310   case_nullify (&aw->bounce);
311   print_map = false;
312   print_short_names = false;
313   sysfile_opts = sfm_writer_default_options ();
314   porfile_opts = pfm_writer_default_options ();
315
316   start_case_map (dict);
317   dict_delete_scratch_vars (dict);
318
319   lex_match (lexer, '/');
320   for (;;)
321     {
322       if (lex_match_id (lexer, "OUTFILE"))
323         {
324           if (handle != NULL) 
325             {
326               lex_sbc_only_once ("OUTFILE");
327               goto error; 
328             }
329           
330           lex_match (lexer, '=');
331       
332           handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
333           if (handle == NULL)
334             goto error;
335         }
336       else if (lex_match_id (lexer, "NAMES"))
337         print_short_names = true;
338       else if (lex_match_id (lexer, "PERMISSIONS")) 
339         {
340           bool cw;
341           
342           lex_match (lexer, '=');
343           if (lex_match_id (lexer, "READONLY"))
344             cw = false;
345           else if (lex_match_id (lexer, "WRITEABLE"))
346             cw = true;
347           else
348             {
349               lex_error (lexer, _("expecting %s or %s"), "READONLY", "WRITEABLE");
350               goto error;
351             }
352           sysfile_opts.create_writeable = porfile_opts.create_writeable = cw;
353         }
354       else if (command_type == PROC_CMD && lex_match_id (lexer, "UNSELECTED")) 
355         {
356           lex_match (lexer, '=');
357           if (lex_match_id (lexer, "RETAIN"))
358             *retain_unselected = true;
359           else if (lex_match_id (lexer, "DELETE"))
360             *retain_unselected = false;
361           else
362             {
363               lex_error (lexer, _("expecting %s or %s"), "RETAIN", "DELETE");
364               goto error;
365             }
366         }
367       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "COMPRESSED"))
368         sysfile_opts.compress = true;
369       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "UNCOMPRESSED"))
370         sysfile_opts.compress = false;
371       else if (writer_type == SYSFILE_WRITER && lex_match_id (lexer, "VERSION"))
372         {
373           lex_match (lexer, '=');
374           if (!lex_force_int (lexer))
375             goto error;
376           sysfile_opts.version = lex_integer (lexer);
377           lex_get (lexer);
378         }
379       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "TYPE")) 
380         {
381           lex_match (lexer, '=');
382           if (lex_match_id (lexer, "COMMUNICATIONS"))
383             porfile_opts.type = PFM_COMM;
384           else if (lex_match_id (lexer, "TAPE"))
385             porfile_opts.type = PFM_TAPE;
386           else
387             {
388               lex_error (lexer, _("expecting %s or %s"), "COMM", "TAPE");
389               goto error;
390             }
391         }
392       else if (writer_type == PORFILE_WRITER && lex_match_id (lexer, "DIGITS")) 
393         {
394           lex_match (lexer, '=');
395           if (!lex_force_int (lexer))
396             goto error;
397           porfile_opts.digits = lex_integer (lexer);
398           lex_get (lexer);
399         }
400       else if (!parse_dict_trim (lexer, dict))
401         goto error;
402       
403       if (!lex_match (lexer, '/'))
404         break;
405     }
406   if (lex_end_of_command (lexer) != CMD_SUCCESS)
407     goto error;
408
409   if (handle == NULL) 
410     {
411       lex_sbc_missing (lexer, "OUTFILE");
412       goto error;
413     }
414
415   dict_compact_values (dict);
416   aw->map = finish_case_map (dict);
417   if (aw->map != NULL)
418     case_create (&aw->bounce, dict_get_next_value_idx (dict));
419
420   if (fh_get_referent (handle) == FH_REF_FILE) 
421     {
422       switch (writer_type) 
423         {
424         case SYSFILE_WRITER:
425           aw->writer = any_writer_from_sfm_writer (
426             sfm_open_writer (handle, dict, sysfile_opts));
427           break;
428         case PORFILE_WRITER:
429           aw->writer = any_writer_from_pfm_writer (
430             pfm_open_writer (handle, dict, porfile_opts));
431           break;
432         }
433     }
434   else
435     aw->writer = any_writer_open (handle, dict);
436   if (aw->writer == NULL)
437     goto error;
438   dict_destroy (dict);
439   
440   return aw;
441
442  error:
443   case_writer_destroy (aw);
444   dict_destroy (dict);
445   return NULL;
446 }
447
448 /* Writes case C to writer AW. */
449 static bool
450 case_writer_write_case (struct case_writer *aw, const struct ccase *c) 
451 {
452   if (aw->map != NULL) 
453     {
454       map_case (aw->map, c, &aw->bounce);
455       c = &aw->bounce; 
456     }
457   return any_writer_write (aw->writer, c);
458 }
459 \f
460 /* SAVE and EXPORT. */
461
462 /* Parses and performs the SAVE or EXPORT procedure. */
463 static int
464 parse_output_proc (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type)
465 {
466   bool retain_unselected;
467   struct variable *saved_filter_variable;
468   struct case_writer *aw;
469   struct ccase *c;
470   bool ok = true;
471
472   aw = parse_write_command (lexer, ds, writer_type, PROC_CMD, &retain_unselected);
473   if (aw == NULL) 
474     return CMD_CASCADING_FAILURE;
475
476   saved_filter_variable = dict_get_filter (dataset_dict (ds));
477   if (retain_unselected) 
478     dict_set_filter (dataset_dict (ds), NULL);
479
480   proc_open (ds);
481   while (ok && proc_read (ds, &c))
482     ok = case_writer_write_case (aw, c);
483   ok = proc_close (ds) && ok;
484
485   dict_set_filter (dataset_dict (ds), saved_filter_variable);
486
487   case_writer_destroy (aw);
488   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
489 }
490
491 int
492 cmd_save (struct lexer *lexer, struct dataset *ds) 
493 {
494   return parse_output_proc (lexer, ds, SYSFILE_WRITER);
495 }
496
497 int
498 cmd_export (struct lexer *lexer, struct dataset *ds) 
499 {
500   return parse_output_proc (lexer, ds, PORFILE_WRITER);
501 }
502 \f
503 /* XSAVE and XEXPORT. */
504
505 /* Transformation. */
506 struct output_trns 
507   {
508     struct case_writer *aw;      /* Writer. */
509   };
510
511 static trns_proc_func output_trns_proc;
512 static trns_free_func output_trns_free;
513
514 /* Parses the XSAVE or XEXPORT transformation command. */
515 static int
516 parse_output_trns (struct lexer *lexer, struct dataset *ds, enum writer_type writer_type) 
517 {
518   struct output_trns *t = xmalloc (sizeof *t);
519   t->aw = parse_write_command (lexer, ds, writer_type, XFORM_CMD, NULL);
520   if (t->aw == NULL) 
521     {
522       free (t);
523       return CMD_CASCADING_FAILURE;
524     }
525
526   add_transformation (ds, output_trns_proc, output_trns_free, t);
527   return CMD_SUCCESS;
528 }
529
530 /* Writes case C to the system file specified on XSAVE or XEXPORT. */
531 static int
532 output_trns_proc (void *trns_, struct ccase *c, casenumber case_num UNUSED)
533 {
534   struct output_trns *t = trns_;
535   case_writer_write_case (t->aw, c);
536   return TRNS_CONTINUE;
537 }
538
539 /* Frees an XSAVE or XEXPORT transformation.
540    Returns true if successful, false if an I/O error occurred. */
541 static bool
542 output_trns_free (void *trns_)
543 {
544   struct output_trns *t = trns_;
545   bool ok = true;
546
547   if (t != NULL)
548     {
549       ok = case_writer_destroy (t->aw);
550       free (t);
551     }
552   return ok;
553 }
554
555 /* XSAVE command. */
556 int
557 cmd_xsave (struct lexer *lexer, struct dataset *ds) 
558 {
559   return parse_output_trns (lexer, ds, SYSFILE_WRITER);
560 }
561
562 /* XEXPORT command. */
563 int
564 cmd_xexport (struct lexer *lexer, struct dataset *ds) 
565 {
566   return parse_output_trns (lexer, ds, PORFILE_WRITER);
567 }
568 \f
569 static bool rename_variables (struct lexer *lexer, struct dictionary *dict);
570 static bool drop_variables (struct lexer *, struct dictionary *dict);
571 static bool keep_variables (struct lexer *, struct dictionary *dict);
572
573 /* Commands that read and write system files share a great deal
574    of common syntactic structure for rearranging and dropping
575    variables.  This function parses this syntax and modifies DICT
576    appropriately.  Returns true on success, false on failure. */
577 static bool
578 parse_dict_trim (struct lexer *lexer, struct dictionary *dict)
579 {
580   if (lex_match_id (lexer, "MAP")) 
581     {
582       /* FIXME. */
583       return true;
584     }
585   else if (lex_match_id (lexer, "DROP"))
586     return drop_variables (lexer, dict);
587   else if (lex_match_id (lexer, "KEEP"))
588     return keep_variables (lexer, dict);
589   else if (lex_match_id (lexer, "RENAME"))
590     return rename_variables (lexer, dict);
591   else
592     {
593       lex_error (lexer, _("expecting a valid subcommand"));
594       return false;
595     }
596 }
597
598 /* Parses and performs the RENAME subcommand of GET and SAVE. */
599 static bool
600 rename_variables (struct lexer *lexer, struct dictionary *dict)
601 {
602   size_t i;
603
604   int success = 0;
605
606   struct variable **v;
607   char **new_names;
608   size_t nv, nn;
609   char *err_name;
610
611   int group;
612
613   lex_match (lexer, '=');
614   if (lex_token (lexer) != '(')
615     {
616       struct variable *v;
617
618       v = parse_variable (lexer, dict);
619       if (v == NULL)
620         return 0;
621       if (!lex_force_match (lexer, '=')
622           || !lex_force_id (lexer))
623         return 0;
624       if (dict_lookup_var (dict, lex_tokid (lexer)) != NULL)
625         {
626           msg (SE, _("Cannot rename %s as %s because there already exists "
627                      "a variable named %s.  To rename variables with "
628                      "overlapping names, use a single RENAME subcommand "
629                      "such as \"/RENAME (A=B)(B=C)(C=A)\", or equivalently, "
630                      "\"/RENAME (A B C=B C A)\"."),
631                var_get_name (v), lex_tokid (lexer), lex_tokid (lexer));
632           return 0;
633         }
634       
635       dict_rename_var (dict, v, lex_tokid (lexer));
636       lex_get (lexer);
637       return 1;
638     }
639
640   nv = nn = 0;
641   v = NULL;
642   new_names = 0;
643   group = 1;
644   while (lex_match (lexer, '('))
645     {
646       size_t old_nv = nv;
647
648       if (!parse_variables (lexer, dict, &v, &nv, PV_NO_DUPLICATE | PV_APPEND))
649         goto done;
650       if (!lex_match (lexer, '='))
651         {
652           msg (SE, _("`=' expected after variable list."));
653           goto done;
654         }
655       if (!parse_DATA_LIST_vars (lexer, &new_names, &nn, PV_APPEND | PV_NO_SCRATCH))
656         goto done;
657       if (nn != nv)
658         {
659           msg (SE, _("Number of variables on left side of `=' (%d) does not "
660                      "match number of variables on right side (%d), in "
661                      "parenthesized group %d of RENAME subcommand."),
662                (unsigned) (nv - old_nv), (unsigned) (nn - old_nv), group);
663           goto done;
664         }
665       if (!lex_force_match (lexer, ')'))
666         goto done;
667       group++;
668     }
669
670   if (!dict_rename_vars (dict, v, new_names, nv, &err_name)) 
671     {
672       msg (SE, _("Requested renaming duplicates variable name %s."), err_name);
673       goto done;
674     }
675   success = 1;
676
677  done:
678   for (i = 0; i < nn; i++)
679     free (new_names[i]);
680   free (new_names);
681   free (v);
682
683   return success;
684 }
685
686 /* Parses and performs the DROP subcommand of GET and SAVE.
687    Returns true if successful, false on failure.*/
688 static bool
689 drop_variables (struct lexer *lexer, struct dictionary *dict)
690 {
691   struct variable **v;
692   size_t nv;
693
694   lex_match (lexer, '=');
695   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
696     return false;
697   dict_delete_vars (dict, v, nv);
698   free (v);
699
700   if (dict_get_var_cnt (dict) == 0)
701     {
702       msg (SE, _("Cannot DROP all variables from dictionary."));
703       return false;
704     }
705   return true;
706 }
707
708 /* Parses and performs the KEEP subcommand of GET and SAVE.
709    Returns true if successful, false on failure.*/
710 static bool
711 keep_variables (struct lexer *lexer, struct dictionary *dict)
712 {
713   struct variable **v;
714   size_t nv;
715   size_t i;
716
717   lex_match (lexer, '=');
718   if (!parse_variables (lexer, dict, &v, &nv, PV_NONE))
719     return false;
720
721   /* Move the specified variables to the beginning. */
722   dict_reorder_vars (dict, v, nv);
723           
724   /* Delete the remaining variables. */
725   v = xnrealloc (v, dict_get_var_cnt (dict) - nv, sizeof *v);
726   for (i = nv; i < dict_get_var_cnt (dict); i++)
727     v[i - nv] = dict_get_var (dict, i);
728   dict_delete_vars (dict, v, dict_get_var_cnt (dict) - nv);
729   free (v);
730
731   return true;
732 }
733 \f
734 /* MATCH FILES. */
735
736 /* File types. */
737 enum
738   {
739     MTF_FILE,                   /* Specified on FILE= subcommand. */
740     MTF_TABLE                   /* Specified on TABLE= subcommand. */
741   };
742
743 /* One of the files on MATCH FILES. */
744 struct mtf_file
745   {
746     struct mtf_file *next, *prev; /* Next, previous in the list of files. */
747     struct mtf_file *next_min;  /* Next in the chain of minimums. */
748     
749     int type;                   /* One of MTF_*. */
750     struct variable **by;       /* List of BY variables for this file. */
751     struct file_handle *handle; /* File handle. */
752     struct any_reader *reader;  /* File reader. */
753     struct dictionary *dict;    /* Dictionary from system file. */
754
755     /* IN subcommand. */
756     char *in_name;              /* Variable name. */
757     struct variable *in_var;    /* Variable (in master dictionary). */
758
759     struct ccase input_storage; /* Input record storage. */
760     struct ccase *input;        /* Input record. */
761   };
762
763 /* MATCH FILES procedure. */
764 struct mtf_proc 
765   {
766     struct mtf_file *head;      /* First file mentioned on FILE or TABLE. */
767     struct mtf_file *tail;      /* Last file mentioned on FILE or TABLE. */
768
769     bool ok;                    /* False if I/O error occurs. */
770
771     size_t by_cnt;              /* Number of variables on BY subcommand. */
772
773     /* Names of FIRST, LAST variables. */
774     char first[LONG_NAME_LEN + 1], last[LONG_NAME_LEN + 1];
775     
776     struct dictionary *dict;    /* Dictionary of output file. */
777     struct casefile *output;    /* MATCH FILES output. */
778     struct ccase mtf_case;      /* Case used for output. */
779
780     unsigned seq_num;           /* Have we initialized this variable? */
781     unsigned *seq_nums;         /* Sequence numbers for each var in dict. */
782   };
783
784 static bool mtf_free (struct mtf_proc *);
785 static bool mtf_close_file (struct mtf_file *);
786 static int mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
787 static bool mtf_read_records (struct mtf_proc *, struct dataset *);
788 static bool mtf_delete_file_in_place (struct mtf_proc *, struct mtf_file **);
789
790 static bool mtf_processing (struct mtf_proc *, struct dataset *);
791
792 static char *var_type_description (struct variable *);
793
794 static void set_master (struct variable *, struct variable *master);
795 static struct variable *get_master (struct variable *);
796
797 /* Parse and execute the MATCH FILES command. */
798 int
799 cmd_match_files (struct lexer *lexer, struct dataset *ds)
800 {
801   struct mtf_proc mtf;
802   struct mtf_file *first_table = NULL;
803   struct mtf_file *iter;
804   
805   bool used_active_file = false;
806   bool saw_table = false;
807   bool saw_in = false;
808
809   mtf.head = mtf.tail = NULL;
810   mtf.by_cnt = 0;
811   mtf.first[0] = '\0';
812   mtf.last[0] = '\0';
813   mtf.dict = dict_create ();
814   mtf.output = NULL;
815   case_nullify (&mtf.mtf_case);
816   mtf.seq_num = 0;
817   mtf.seq_nums = NULL;
818   dict_set_case_limit (mtf.dict, dict_get_case_limit (dataset_dict (ds)));
819
820   lex_match (lexer, '/');
821   while (lex_token (lexer) == T_ID
822          && (lex_id_match (ss_cstr ("FILE"), ss_cstr (lex_tokid (lexer)))
823              || lex_id_match (ss_cstr ("TABLE"), ss_cstr (lex_tokid (lexer)))))
824     {
825       struct mtf_file *file = xmalloc (sizeof *file);
826
827       if (lex_match_id (lexer, "FILE"))
828         file->type = MTF_FILE;
829       else if (lex_match_id (lexer, "TABLE"))
830         {
831           file->type = MTF_TABLE;
832           saw_table = true;
833         }
834       else
835         NOT_REACHED ();
836       lex_match (lexer, '=');
837
838       file->by = NULL;
839       file->handle = NULL;
840       file->reader = NULL;
841       file->dict = NULL;
842       file->in_name = NULL;
843       file->in_var = NULL;
844       case_nullify (&file->input_storage);
845       file->input = &file->input_storage;
846
847       /* FILEs go first, then TABLEs. */
848       if (file->type == MTF_TABLE || first_table == NULL)
849         {
850           file->next = NULL;
851           file->prev = mtf.tail;
852           if (mtf.tail)
853             mtf.tail->next = file;
854           mtf.tail = file;
855           if (mtf.head == NULL)
856             mtf.head = file;
857           if (file->type == MTF_TABLE && first_table == NULL)
858             first_table = file;
859         }
860       else 
861         {
862           assert (file->type == MTF_FILE);
863           file->next = first_table;
864           file->prev = first_table->prev;
865           if (first_table->prev)
866             first_table->prev->next = file;
867           else
868             mtf.head = file;
869           first_table->prev = file;
870         }
871
872       if (lex_match (lexer, '*'))
873         {
874           file->handle = NULL;
875           file->reader = NULL;
876               
877           if (used_active_file)
878             {
879               msg (SE, _("The active file may not be specified more "
880                          "than once."));
881               goto error;
882             }
883           used_active_file = true;
884
885           if (!proc_has_source (ds))
886             {
887               msg (SE, _("Cannot specify the active file since no active "
888                          "file has been defined."));
889               goto error;
890             }
891
892           if (proc_make_temporary_transformations_permanent (ds))
893             msg (SE,
894                  _("MATCH FILES may not be used after TEMPORARY when "
895                    "the active file is an input source.  "
896                    "Temporary transformations will be made permanent."));
897
898           file->dict = dataset_dict (ds);
899         }
900       else
901         {
902           file->handle = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
903           if (file->handle == NULL)
904             goto error;
905
906           file->reader = any_reader_open (file->handle, &file->dict);
907           if (file->reader == NULL)
908             goto error;
909
910           case_create (&file->input_storage,
911                        dict_get_next_value_idx (file->dict));
912         }
913
914       while (lex_match (lexer, '/'))
915         if (lex_match_id (lexer, "RENAME")) 
916           {
917             if (!rename_variables (lexer, file->dict))
918               goto error; 
919           }
920         else if (lex_match_id (lexer, "IN"))
921           {
922             lex_match (lexer, '=');
923             if (lex_token (lexer) != T_ID)
924               {
925                 lex_error (lexer, NULL);
926                 goto error;
927               }
928
929             if (file->in_name != NULL)
930               {
931                 msg (SE, _("Multiple IN subcommands for a single FILE or "
932                            "TABLE."));
933                 goto error;
934               }
935             file->in_name = xstrdup (lex_tokid (lexer));
936             lex_get (lexer);
937             saw_in = true;
938           }
939
940       mtf_merge_dictionary (mtf.dict, file);
941     }
942   
943   while (lex_token (lexer) != '.')
944     {
945       if (lex_match (lexer, T_BY))
946         {
947           struct variable **by;
948           
949           if (mtf.by_cnt)
950             {
951               msg (SE, _("BY may appear at most once."));
952               goto error;
953             }
954               
955           lex_match (lexer, '=');
956           if (!parse_variables (lexer, mtf.dict, &by, &mtf.by_cnt,
957                                 PV_NO_DUPLICATE | PV_NO_SCRATCH))
958             goto error;
959
960           for (iter = mtf.head; iter != NULL; iter = iter->next)
961             {
962               size_t i;
963           
964               iter->by = xnmalloc (mtf.by_cnt, sizeof *iter->by);
965
966               for (i = 0; i < mtf.by_cnt; i++)
967                 {
968                   iter->by[i] = dict_lookup_var (iter->dict,
969                                                  var_get_name (by[i]));
970                   if (iter->by[i] == NULL)
971                     {
972                       msg (SE, _("File %s lacks BY variable %s."),
973                            iter->handle ? fh_get_name (iter->handle) : "*",
974                            var_get_name (by[i]));
975                       free (by);
976                       goto error;
977                     }
978                 }
979             }
980           free (by);
981         }
982       else if (lex_match_id (lexer, "FIRST")) 
983         {
984           if (mtf.first[0] != '\0')
985             {
986               msg (SE, _("FIRST may appear at most once."));
987               goto error;
988             }
989               
990           lex_match (lexer, '=');
991           if (!lex_force_id (lexer))
992             goto error;
993           strcpy (mtf.first, lex_tokid (lexer));
994           lex_get (lexer);
995         }
996       else if (lex_match_id (lexer, "LAST")) 
997         {
998           if (mtf.last[0] != '\0')
999             {
1000               msg (SE, _("LAST may appear at most once."));
1001               goto error;
1002             }
1003               
1004           lex_match (lexer, '=');
1005           if (!lex_force_id (lexer))
1006             goto error;
1007           strcpy (mtf.last, lex_tokid (lexer));
1008           lex_get (lexer);
1009         }
1010       else if (lex_match_id (lexer, "MAP"))
1011         {
1012           /* FIXME. */
1013         }
1014       else if (lex_match_id (lexer, "DROP")) 
1015         {
1016           if (!drop_variables (lexer, mtf.dict))
1017             goto error;
1018         }
1019       else if (lex_match_id (lexer, "KEEP")) 
1020         {
1021           if (!keep_variables (lexer, mtf.dict))
1022             goto error;
1023         }
1024       else
1025         {
1026           lex_error (lexer, NULL);
1027           goto error;
1028         }
1029
1030       if (!lex_match (lexer, '/') && lex_token (lexer) != '.') 
1031         {
1032           lex_end_of_command (lexer);
1033           goto error;
1034         }
1035     }
1036
1037   if (mtf.by_cnt == 0)
1038     {
1039       if (saw_table)
1040         {
1041           msg (SE, _("BY is required when TABLE is specified."));
1042           goto error;
1043         }
1044       if (saw_in)
1045         {
1046           msg (SE, _("BY is required when IN is specified."));
1047           goto error;
1048         }
1049     }
1050
1051   /* Set up mapping from each file's variables to master
1052      variables. */
1053   for (iter = mtf.head; iter != NULL; iter = iter->next)
1054     {
1055       struct dictionary *d = iter->dict;
1056       int i;
1057
1058       for (i = 0; i < dict_get_var_cnt (d); i++)
1059         {
1060           struct variable *v = dict_get_var (d, i);
1061           struct variable *mv = dict_lookup_var (mtf.dict, var_get_name (v));
1062           if (mv != NULL)
1063             set_master (v, mv);
1064         }
1065     }
1066
1067   /* Add IN variables to master dictionary. */
1068   for (iter = mtf.head; iter != NULL; iter = iter->next) 
1069     if (iter->in_name != NULL)
1070       {
1071         struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
1072         iter->in_var = dict_create_var (mtf.dict, iter->in_name, 0);
1073         if (iter->in_var == NULL)
1074           {
1075             msg (SE, _("IN variable name %s duplicates an "
1076                        "existing variable name."),
1077                  var_get_name (iter->in_var));
1078             goto error;
1079           }
1080         var_set_both_formats (iter->in_var, &format);
1081       }
1082     
1083   /* MATCH FILES performs an n-way merge on all its input files.
1084      Abstract algorithm:
1085
1086      1. Read one input record from every input FILE.
1087
1088      2. If no FILEs are left, stop.  Otherwise, proceed to step 3.
1089
1090      3. Find the FILE input record(s) that have minimum BY
1091      values.  Store all the values from these input records into
1092      the output record.
1093
1094      4. For every TABLE, read another record as long as the BY values
1095      on the TABLE's input record are less than the FILEs' BY values.
1096      If an exact match is found, store all the values from the TABLE
1097      input record into the output record.
1098
1099      5. Write the output record.
1100
1101      6. Read another record from each input file FILE and TABLE that
1102      we stored values from above.  If we come to the end of one of the
1103      input files, remove it from the list of input files.
1104
1105      7. Repeat from step 2.
1106
1107      FIXME: For merging large numbers of files (more than 10?) a
1108      better algorithm would use a heap for finding minimum
1109      values. */
1110
1111   if (used_active_file) 
1112     {
1113       proc_set_sink (ds, create_case_sink (&null_sink_class, 
1114                                            dataset_dict (ds),
1115                                            dataset_get_casefile_factory (ds),
1116                                            NULL));
1117       proc_open (ds); 
1118     }
1119   else
1120     discard_variables (ds);
1121
1122   dict_compact_values (mtf.dict);
1123   mtf.output = dataset_get_casefile_factory (ds)->create_casefile
1124     (dataset_get_casefile_factory (ds),
1125      dict_get_next_value_idx (mtf.dict));
1126
1127   mtf.seq_nums = xcalloc (dict_get_var_cnt (mtf.dict), sizeof *mtf.seq_nums);
1128   case_create (&mtf.mtf_case, dict_get_next_value_idx (mtf.dict));
1129
1130   if (!mtf_read_records (&mtf, ds))
1131     goto error;
1132   while (mtf.head && mtf.head->type == MTF_FILE)
1133     if (!mtf_processing (&mtf, ds))
1134       goto error;
1135   if (!proc_close (ds))
1136     goto error;
1137
1138   discard_variables (ds);
1139
1140   dict_destroy (dataset_dict (ds));
1141   dataset_set_dict (ds, mtf.dict);
1142   mtf.dict = NULL;
1143   proc_set_source (ds, storage_source_create (mtf.output));
1144   mtf.output = NULL;
1145   
1146   return mtf_free (&mtf) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
1147   
1148  error:
1149   proc_close (ds);
1150   mtf_free (&mtf);
1151   return CMD_CASCADING_FAILURE;
1152 }
1153
1154 /* Return a string in a static buffer describing V's variable type and
1155    width. */
1156 static char *
1157 var_type_description (struct variable *v)
1158 {
1159   static char buf[2][32];
1160   static int x = 0;
1161   char *s;
1162
1163   x ^= 1;
1164   s = buf[x];
1165
1166   if (var_is_numeric (v))
1167     strcpy (s, "numeric");
1168   else
1169     sprintf (s, "string with width %d", var_get_width (v));
1170   return s;
1171 }
1172
1173 /* Closes FILE and frees its associated data.
1174    Returns true if successful, false if an I/O error
1175    occurred on FILE. */
1176 static bool
1177 mtf_close_file (struct mtf_file *file)
1178 {
1179   bool ok = file->reader == NULL || !any_reader_error (file->reader);
1180   free (file->by);
1181   any_reader_close (file->reader);
1182   if (file->handle != NULL)
1183     dict_destroy (file->dict);
1184   case_destroy (&file->input_storage);
1185   free (file->in_name);
1186   free (file);
1187   return ok;
1188 }
1189
1190 /* Free all the data for the MATCH FILES procedure.
1191    Returns true if successful, false if an I/O error
1192    occurred. */
1193 static bool
1194 mtf_free (struct mtf_proc *mtf)
1195 {
1196   struct mtf_file *iter, *next;
1197   bool ok = true;
1198
1199   for (iter = mtf->head; iter; iter = next)
1200     {
1201       next = iter->next;
1202       assert (iter->dict != mtf->dict);
1203       if (!mtf_close_file (iter))
1204         ok = false;
1205     }
1206   
1207   if (mtf->dict)
1208     dict_destroy (mtf->dict);
1209   case_destroy (&mtf->mtf_case);
1210   free (mtf->seq_nums);
1211
1212   return ok;
1213 }
1214
1215 /* Remove *FILE from the mtf_file chain.  Make *FILE point to the next
1216    file in the chain, or to NULL if was the last in the chain.
1217    Returns true if successful, false if an I/O error occurred. */
1218 static bool
1219 mtf_delete_file_in_place (struct mtf_proc *mtf, struct mtf_file **file)
1220 {
1221   struct mtf_file *f = *file;
1222   int i;
1223
1224   if (f->prev)
1225     f->prev->next = f->next;
1226   if (f->next)
1227     f->next->prev = f->prev;
1228   if (f == mtf->head)
1229     mtf->head = f->next;
1230   if (f == mtf->tail)
1231     mtf->tail = f->prev;
1232   *file = f->next;
1233
1234   if (f->in_var != NULL)
1235     case_data_rw (&mtf->mtf_case, f->in_var)->f = 0.;
1236   for (i = 0; i < dict_get_var_cnt (f->dict); i++)
1237     {
1238       struct variable *v = dict_get_var (f->dict, i);
1239       struct variable *mv = get_master (v);
1240       if (mv != NULL) 
1241         {
1242           union value *out = case_data_rw (&mtf->mtf_case, mv);
1243           
1244           if (var_is_numeric (v))
1245             out->f = SYSMIS;
1246           else
1247             memset (out->s, ' ', var_get_width (v));
1248         } 
1249     }
1250
1251   return mtf_close_file (f);
1252 }
1253
1254 /* Read a record from every input file.
1255    Returns true if successful, false if an I/O error occurred. */
1256 static bool
1257 mtf_read_records (struct mtf_proc *mtf, struct dataset *ds)
1258 {
1259   struct mtf_file *iter, *next;
1260   bool ok = true;
1261
1262   for (iter = mtf->head; ok && iter != NULL; iter = next)
1263     {
1264       next = iter->next;
1265       if (iter->handle
1266           ? !any_reader_read (iter->reader, iter->input)
1267           : !proc_read (ds, &iter->input)) 
1268         {
1269           if (!mtf_delete_file_in_place (mtf, &iter))
1270             ok = false; 
1271         }
1272     }
1273   return ok;
1274 }
1275
1276 /* Compare the BY variables for files A and B; return -1 if A < B, 0
1277    if A == B, 1 if A > B. */
1278 static inline int
1279 mtf_compare_BY_values (struct mtf_proc *mtf,
1280                        struct mtf_file *a, struct mtf_file *b)
1281 {
1282   return case_compare_2dict (a->input, b->input, a->by, b->by, mtf->by_cnt);
1283 }
1284
1285 /* Perform one iteration of steps 3...7 above.
1286    Returns true if successful, false if an I/O error occurred. */
1287 static bool
1288 mtf_processing (struct mtf_proc *mtf, struct dataset *ds)
1289 {
1290   struct mtf_file *min_head, *min_tail; /* Files with minimum BY values. */
1291   struct mtf_file *max_head, *max_tail; /* Files with non-minimum BYs. */
1292   struct mtf_file *iter, *next;
1293
1294   /* 3. Find the FILE input record(s) that have minimum BY
1295      values.  Store all the values from these input records into
1296      the output record. */
1297   min_head = min_tail = mtf->head;
1298   max_head = max_tail = NULL;
1299   for (iter = mtf->head->next; iter && iter->type == MTF_FILE;
1300        iter = iter->next) 
1301     {
1302       int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1303       if (cmp < 0) 
1304         {
1305           if (max_head)
1306             max_tail = max_tail->next_min = iter;
1307           else
1308             max_head = max_tail = iter;
1309         }
1310       else if (cmp == 0) 
1311         min_tail = min_tail->next_min = iter;
1312       else /* cmp > 0 */
1313         {
1314           if (max_head)
1315             {
1316               max_tail->next_min = min_head;
1317               max_tail = min_tail;
1318             }
1319           else
1320             {
1321               max_head = min_head;
1322               max_tail = min_tail;
1323             }
1324           min_head = min_tail = iter;
1325         }
1326     }
1327       
1328   /* 4. For every TABLE, read another record as long as the BY
1329      values on the TABLE's input record are less than the FILEs'
1330      BY values.  If an exact match is found, store all the values
1331      from the TABLE input record into the output record. */
1332   for (; iter != NULL; iter = next)
1333     {
1334       assert (iter->type == MTF_TABLE);
1335       
1336       next = iter->next;
1337       for (;;) 
1338         {
1339           int cmp = mtf_compare_BY_values (mtf, min_head, iter);
1340           if (cmp < 0) 
1341             {
1342               if (max_head)
1343                 max_tail = max_tail->next_min = iter;
1344               else
1345                 max_head = max_tail = iter;
1346             }
1347           else if (cmp == 0)
1348             min_tail = min_tail->next_min = iter;
1349           else /* cmp > 0 */
1350             {
1351               if (iter->handle
1352                   ? any_reader_read (iter->reader, iter->input)
1353                   : proc_read (ds, &iter->input))
1354                 continue;
1355               if (!mtf_delete_file_in_place (mtf, &iter))
1356                 return false;
1357             }
1358           break;
1359         }
1360     }
1361
1362   /* Next sequence number. */
1363   mtf->seq_num++;
1364
1365   /* Store data to all the records we are using. */
1366   if (min_tail)
1367     min_tail->next_min = NULL;
1368   for (iter = min_head; iter; iter = iter->next_min)
1369     {
1370       int i;
1371
1372       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1373         {
1374           struct variable *v = dict_get_var (iter->dict, i);
1375           struct variable *mv = get_master (v);
1376           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1377           
1378           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1379             {
1380               const struct ccase *record = iter->input;
1381               union value *out = case_data_rw (&mtf->mtf_case, mv);
1382
1383               mtf->seq_nums[mv_index] = mtf->seq_num;
1384               if (var_is_numeric (v))
1385                 out->f = case_num (record, v);
1386               else
1387                 memcpy (out->s, case_str (record, v), var_get_width (v));
1388             } 
1389         }
1390       if (iter->in_var != NULL)
1391         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 1.;
1392     }
1393
1394   /* Store missing values to all the records we're not using. */
1395   if (max_tail)
1396     max_tail->next_min = NULL;
1397   for (iter = max_head; iter; iter = iter->next_min)
1398     {
1399       int i;
1400
1401       for (i = 0; i < dict_get_var_cnt (iter->dict); i++)
1402         {
1403           struct variable *v = dict_get_var (iter->dict, i);
1404           struct variable *mv = get_master (v);
1405           size_t mv_index = mv ? var_get_dict_index (mv) : 0;
1406
1407           if (mv != NULL && mtf->seq_nums[mv_index] != mtf->seq_num) 
1408             {
1409               union value *out = case_data_rw (&mtf->mtf_case, mv);
1410               mtf->seq_nums[mv_index] = mtf->seq_num;
1411
1412               if (var_is_numeric (v))
1413                 out->f = SYSMIS;
1414               else
1415                 memset (out->s, ' ', var_get_width (v));
1416             }
1417         }
1418       if (iter->in_var != NULL)
1419         case_data_rw (&mtf->mtf_case, iter->in_var)->f = 0.;
1420     }
1421
1422   /* 5. Write the output record. */
1423   casefile_append (mtf->output, &mtf->mtf_case);
1424
1425   /* 6. Read another record from each input file FILE and TABLE
1426      that we stored values from above.  If we come to the end of
1427      one of the input files, remove it from the list of input
1428      files. */
1429   for (iter = min_head; iter && iter->type == MTF_FILE; iter = next)
1430     {
1431       next = iter->next_min;
1432       if (iter->reader != NULL
1433           ? !any_reader_read (iter->reader, iter->input)
1434           : !proc_read (ds, &iter->input))
1435         if (!mtf_delete_file_in_place (mtf, &iter))
1436           return false;
1437     }
1438   return true;
1439 }
1440
1441 /* Merge the dictionary for file F into master dictionary M. */
1442 static int
1443 mtf_merge_dictionary (struct dictionary *const m, struct mtf_file *f)
1444 {
1445   struct dictionary *d = f->dict;
1446   const char *d_docs, *m_docs;
1447   int i;
1448
1449   if (dict_get_label (m) == NULL)
1450     dict_set_label (m, dict_get_label (d));
1451
1452   d_docs = dict_get_documents (d);
1453   m_docs = dict_get_documents (m);
1454   if (d_docs != NULL) 
1455     {
1456       if (m_docs == NULL)
1457         dict_set_documents (m, d_docs);
1458       else
1459         {
1460           char *new_docs;
1461           size_t new_len;
1462
1463           new_len = strlen (m_docs) + strlen (d_docs);
1464           new_docs = xmalloc (new_len + 1);
1465           strcpy (new_docs, m_docs);
1466           strcat (new_docs, d_docs);
1467           dict_set_documents (m, new_docs);
1468           free (new_docs);
1469         }
1470     }
1471   
1472   for (i = 0; i < dict_get_var_cnt (d); i++)
1473     {
1474       struct variable *dv = dict_get_var (d, i);
1475       struct variable *mv = dict_lookup_var (m, var_get_name (dv));
1476
1477       if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
1478         continue;
1479
1480       if (mv != NULL)
1481         {
1482           if (var_get_width (mv) != var_get_width (dv)) 
1483             {
1484               msg (SE, _("Variable %s in file %s (%s) has different "
1485                          "type or width from the same variable in "
1486                          "earlier file (%s)."),
1487                    var_get_name (dv), fh_get_name (f->handle),
1488                    var_type_description (dv), var_type_description (mv));
1489               return 0;
1490             }
1491         
1492           if (var_get_width (dv) == var_get_width (mv))
1493             {
1494               if (var_has_value_labels (dv) && !var_has_value_labels (mv))
1495                 var_set_value_labels (mv, var_get_value_labels (dv));
1496               if (var_has_missing_values (dv) && !var_has_missing_values (mv))
1497                 var_set_missing_values (mv, var_get_missing_values (dv));
1498             }
1499
1500           if (var_get_label (dv) && !var_get_label (mv))
1501             var_set_label (mv, var_get_label (dv));
1502         }
1503       else
1504         mv = dict_clone_var_assert (m, dv, var_get_name (dv));
1505     }
1506
1507   return 1;
1508 }
1509
1510 /* Marks V's master variable as MASTER. */
1511 static void
1512 set_master (struct variable *v, struct variable *master) 
1513 {
1514   var_attach_aux (v, master, NULL);
1515 }
1516
1517 /* Returns the master variable corresponding to V,
1518    as set with set_master(). */
1519 static struct variable *
1520 get_master (struct variable *v) 
1521 {
1522   return var_get_aux (v);
1523 }
1524 \f
1525 /* Case map.
1526
1527    A case map copies data from a case that corresponds for one
1528    dictionary to a case that corresponds to a second dictionary
1529    derived from the first by, optionally, deleting, reordering,
1530    or renaming variables.  (No new variables may be created.)
1531    */
1532
1533 /* A case map. */
1534 struct case_map
1535   {
1536     size_t value_cnt;   /* Number of values in map. */
1537     int *map;           /* For each destination index, the
1538                            corresponding source index. */
1539   };
1540
1541 /* Prepares dictionary D for producing a case map.  Afterward,
1542    the caller may delete, reorder, or rename variables within D
1543    at will before using finish_case_map() to produce the case
1544    map.
1545
1546    Uses D's aux members, which must otherwise not be in use. */
1547 static void
1548 start_case_map (struct dictionary *d) 
1549 {
1550   size_t var_cnt = dict_get_var_cnt (d);
1551   size_t i;
1552   
1553   for (i = 0; i < var_cnt; i++)
1554     {
1555       struct variable *v = dict_get_var (d, i);
1556       int *src_fv = xmalloc (sizeof *src_fv);
1557       *src_fv = var_get_case_index (v);
1558       var_attach_aux (v, src_fv, var_dtor_free);
1559     }
1560 }
1561
1562 /* Produces a case map from dictionary D, which must have been
1563    previously prepared with start_case_map().
1564
1565    Does not retain any reference to D, and clears the aux members
1566    set up by start_case_map().
1567
1568    Returns the new case map, or a null pointer if no mapping is
1569    required (that is, no data has changed position). */
1570 static struct case_map *
1571 finish_case_map (struct dictionary *d) 
1572 {
1573   struct case_map *map;
1574   size_t var_cnt = dict_get_var_cnt (d);
1575   size_t i;
1576   int identity_map;
1577
1578   map = xmalloc (sizeof *map);
1579   map->value_cnt = dict_get_next_value_idx (d);
1580   map->map = xnmalloc (map->value_cnt, sizeof *map->map);
1581   for (i = 0; i < map->value_cnt; i++)
1582     map->map[i] = -1;
1583
1584   identity_map = 1;
1585   for (i = 0; i < var_cnt; i++) 
1586     {
1587       struct variable *v = dict_get_var (d, i);
1588       size_t value_cnt = var_get_value_cnt (v);
1589       int *src_fv = (int *) var_detach_aux (v);
1590       size_t idx;
1591
1592       if (var_get_case_index (v) != *src_fv)
1593         identity_map = 0;
1594       
1595       for (idx = 0; idx < value_cnt; idx++)
1596         {
1597           int src_idx = *src_fv + idx;
1598           int dst_idx = var_get_case_index (v) + idx;
1599           
1600           assert (map->map[dst_idx] == -1);
1601           map->map[dst_idx] = src_idx;
1602         }
1603       free (src_fv);
1604     }
1605
1606   if (identity_map) 
1607     {
1608       destroy_case_map (map);
1609       return NULL;
1610     }
1611
1612   while (map->value_cnt > 0 && map->map[map->value_cnt - 1] == -1)
1613     map->value_cnt--;
1614
1615   return map;
1616 }
1617
1618 /* Maps from SRC to DST, applying case map MAP. */
1619 static void
1620 map_case (const struct case_map *map,
1621           const struct ccase *src, struct ccase *dst) 
1622 {
1623   size_t dst_idx;
1624
1625   assert (map != NULL);
1626   assert (src != NULL);
1627   assert (dst != NULL);
1628   assert (src != dst);
1629
1630   for (dst_idx = 0; dst_idx < map->value_cnt; dst_idx++)
1631     {
1632       int src_idx = map->map[dst_idx];
1633       if (src_idx != -1)
1634         *case_data_rw_idx (dst, dst_idx) = *case_data_idx (src, src_idx);
1635     }
1636 }
1637
1638 /* Destroys case map MAP. */
1639 static void
1640 destroy_case_map (struct case_map *map) 
1641 {
1642   if (map != NULL) 
1643     {
1644       free (map->map);
1645       free (map);
1646     }
1647 }