DO IF, LOOP cleanup.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "command.h"
34 #include "dictionary.h"
35 #include "ctl-stack.h"
36 #include "error.h"
37 #include "expressions/public.h"
38 #include "misc.h"
39 #include "settings.h"
40 #include "som.h"
41 #include "str.h"
42 #include "tab.h"
43 #include "var.h"
44 #include "value-labels.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 /*
50    Virtual File Manager (vfm):
51
52    vfm is used to process data files.  It uses the model that
53    data is read from one stream (the data source), processed,
54    then written to another (the data sink).  The data source is
55    then deleted and the data sink becomes the data source for the
56    next procedure. */
57
58 /* Procedure execution data. */
59 struct write_case_data
60   {
61     /* Function to call for each case. */
62     int (*proc_func) (struct ccase *, void *); /* Function. */
63     void *aux;                                 /* Auxiliary data. */ 
64
65     struct ccase trns_case;     /* Case used for transformations. */
66     struct ccase sink_case;     /* Case written to sink, if
67                                    compaction is necessary. */
68     size_t cases_written;       /* Cases output so far. */
69     size_t cases_analyzed;      /* Cases passed to procedure so far. */
70   };
71
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
74
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
77
78 /* Nonzero if the case needs to have values deleted before being
79    stored, zero otherwise. */
80 static int compaction_necessary;
81
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
90
91 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
92                                 void *aux);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static int write_case (struct write_case_data *wc_data);
96 static int execute_transformations (struct ccase *c,
97                                     struct transformation *trns,
98                                     int first_idx, int last_idx,
99                                     int case_num);
100 static int filter_case (const struct ccase *c, int case_num);
101 static void lag_case (const struct ccase *c);
102 static void clear_case (struct ccase *c);
103 static void close_active_file (void);
104 \f
105 /* Public functions. */
106
107 /* Reads the data from the input program and writes it to a new
108    active file.  For each case we read from the input program, we
109    do the following
110
111    1. Execute permanent transformations.  If these drop the case,
112       start the next case from step 1.
113
114    2. N OF CASES.  If we have already written N cases, start the
115       next case from step 1.
116    
117    3. Write case to replacement active file.
118    
119    4. Execute temporary transformations.  If these drop the case,
120       start the next case from step 1.
121       
122    5. FILTER, PROCESS IF.  If these drop the case, start the next
123       case from step 1.
124    
125    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
126       cases, start the next case from step 1.
127       
128    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
129 void
130 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
131 {
132   if (proc_func == NULL
133       && case_source_is_class (vfm_source, &storage_source_class)
134       && vfm_sink == NULL
135       && !temporary
136       && n_trns == 0)
137     {
138       /* Nothing to do. */
139       return;
140     }
141
142   open_active_file ();
143   internal_procedure (proc_func, aux);
144   close_active_file ();
145 }
146
147 /* Executes a procedure, as procedure(), except that the caller
148    is responsible for calling open_active_file() and
149    close_active_file(). */
150 static void
151 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
152 {
153   static int recursive_call;
154
155   struct write_case_data wc_data;
156
157   assert (++recursive_call == 1);
158
159   wc_data.proc_func = proc_func;
160   wc_data.aux = aux;
161   create_trns_case (&wc_data.trns_case, default_dict);
162   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
163   wc_data.cases_written = 0;
164
165   last_vfm_invocation = time (NULL);
166
167   if (vfm_source != NULL) 
168     vfm_source->class->read (vfm_source,
169                              &wc_data.trns_case,
170                              write_case, &wc_data);
171
172   case_destroy (&wc_data.sink_case);
173   case_destroy (&wc_data.trns_case);
174
175   assert (--recursive_call == 0);
176 }
177
178 /* Creates and returns a case, initializing it from the vectors
179    that say which `value's need to be initialized just once, and
180    which ones need to be re-initialized before every case. */
181 static void
182 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
183 {
184   size_t var_cnt = dict_get_var_cnt (dict);
185   size_t i;
186
187   case_create (trns_case, dict_get_next_value_idx (dict));
188   for (i = 0; i < var_cnt; i++) 
189     {
190       struct variable *v = dict_get_var (dict, i);
191       union value *value = case_data_rw (trns_case, v->fv);
192
193       if (v->type == NUMERIC)
194         value->f = v->reinit ? 0.0 : SYSMIS;
195       else
196         memset (value->s, ' ', v->width);
197     }
198 }
199
200 /* Makes all preparations for reading from the data source and writing
201    to the data sink. */
202 static void
203 open_active_file (void)
204 {
205   /* Make temp_dict refer to the dictionary right before data
206      reaches the sink */
207   if (!temporary)
208     {
209       temp_trns = n_trns;
210       temp_dict = default_dict;
211     }
212
213   /* Figure out compaction. */
214   compaction_necessary = (dict_get_next_value_idx (temp_dict)
215                           != dict_get_compacted_value_cnt (temp_dict));
216
217   /* Prepare sink. */
218   if (vfm_sink == NULL)
219     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
220   if (vfm_sink->class->open != NULL)
221     vfm_sink->class->open (vfm_sink);
222
223   /* Allocate memory for lag queue. */
224   if (n_lag > 0)
225     {
226       int i;
227   
228       lag_count = 0;
229       lag_head = 0;
230       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
231       for (i = 0; i < n_lag; i++)
232         case_nullify (&lag_queue[i]);
233     }
234
235   /* Close any unclosed DO IF or LOOP constructs. */
236   ctl_stack_clear ();
237 }
238
239 /* Transforms trns_case and writes it to the replacement active
240    file if advisable.  Returns nonzero if more cases can be
241    accepted, zero otherwise.  Do not call this function again
242    after it has returned zero once.  */
243 static int
244 write_case (struct write_case_data *wc_data)
245 {
246   /* Execute permanent transformations.  */
247   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
248                                 wc_data->cases_written + 1))
249     goto done;
250
251   /* N OF CASES. */
252   if (dict_get_case_limit (default_dict)
253       && wc_data->cases_written >= dict_get_case_limit (default_dict))
254     goto done;
255   wc_data->cases_written++;
256
257   /* Write case to LAG queue. */
258   if (n_lag)
259     lag_case (&wc_data->trns_case);
260
261   /* Write case to replacement active file. */
262   if (vfm_sink->class->write != NULL) 
263     {
264       if (compaction_necessary) 
265         {
266           dict_compact_case (temp_dict, &wc_data->sink_case,
267                              &wc_data->trns_case);
268           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
269         }
270       else
271         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
272     }
273   
274   /* Execute temporary transformations. */
275   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
276                                 wc_data->cases_written))
277     goto done;
278   
279   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
280   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
281       || (dict_get_case_limit (temp_dict)
282           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
283     goto done;
284   wc_data->cases_analyzed++;
285
286   /* Pass case to procedure. */
287   if (wc_data->proc_func != NULL)
288     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
289
290  done:
291   clear_case (&wc_data->trns_case);
292   return 1;
293 }
294
295 /* Transforms case C using the transformations in TRNS[] with
296    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
297    become case CASE_NUM (1-based) in the output file.  Returns
298    zero if the case was filtered out by one of the
299    transformations, nonzero otherwise. */
300 static int
301 execute_transformations (struct ccase *c,
302                          struct transformation *trns,
303                          int first_idx, int last_idx,
304                          int case_num) 
305 {
306   int idx;
307
308   for (idx = first_idx; idx != last_idx; )
309     {
310       struct transformation *t = &trns[idx];
311       int retval = t->proc (t->private, c, case_num);
312       switch (retval)
313         {
314         case -1:
315           idx++;
316           break;
317           
318         case -2:
319           return 0;
320           
321         default:
322           idx = retval;
323           break;
324         }
325     }
326
327   return 1;
328 }
329
330 /* Returns nonzero if case C with case number CASE_NUM should be
331    exclude as specified on FILTER or PROCESS IF, otherwise
332    zero. */
333 static int
334 filter_case (const struct ccase *c, int case_idx)
335 {
336   /* FILTER. */
337   struct variable *filter_var = dict_get_filter (default_dict);
338   if (filter_var != NULL) 
339     {
340       double f = case_num (c, filter_var->fv);
341       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
342         return 1;
343     }
344
345   /* PROCESS IF. */
346   if (process_if_expr != NULL
347       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
348     return 1;
349
350   return 0;
351 }
352
353 /* Add C to the lag queue. */
354 static void
355 lag_case (const struct ccase *c)
356 {
357   if (lag_count < n_lag)
358     lag_count++;
359   case_destroy (&lag_queue[lag_head]);
360   case_clone (&lag_queue[lag_head], c);
361   if (++lag_head >= n_lag)
362     lag_head = 0;
363 }
364
365 /* Clears the variables in C that need to be cleared between
366    processing cases.  */
367 static void
368 clear_case (struct ccase *c)
369 {
370   size_t var_cnt = dict_get_var_cnt (default_dict);
371   size_t i;
372   
373   for (i = 0; i < var_cnt; i++) 
374     {
375       struct variable *v = dict_get_var (default_dict, i);
376       if (v->init && v->reinit) 
377         {
378           if (v->type == NUMERIC)
379             case_data_rw (c, v->fv)->f = SYSMIS;
380           else
381             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
382         } 
383     }
384 }
385
386 /* Closes the active file. */
387 static void
388 close_active_file (void)
389 {
390   /* Free memory for lag queue, and turn off lagging. */
391   if (n_lag > 0)
392     {
393       int i;
394       
395       for (i = 0; i < n_lag; i++)
396         case_destroy (&lag_queue[i]);
397       free (lag_queue);
398       n_lag = 0;
399     }
400   
401   /* Dictionary from before TEMPORARY becomes permanent.. */
402   if (temporary)
403     {
404       dict_destroy (default_dict);
405       default_dict = temp_dict;
406       temp_dict = NULL;
407     }
408
409   /* Finish compaction. */
410   if (compaction_necessary)
411     dict_compact_values (default_dict);
412     
413   /* Free data source. */
414   free_case_source (vfm_source);
415   vfm_source = NULL;
416
417   /* Old data sink becomes new data source. */
418   if (vfm_sink->class->make_source != NULL)
419     vfm_source = vfm_sink->class->make_source (vfm_sink);
420   free_case_sink (vfm_sink);
421   vfm_sink = NULL;
422
423   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
424      and get rid of all the transformations. */
425   cancel_temporary ();
426   expr_free (process_if_expr);
427   process_if_expr = NULL;
428   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
429     dict_set_filter (default_dict, NULL);
430   dict_set_case_limit (default_dict, 0);
431   dict_clear_vectors (default_dict);
432   cancel_transformations ();
433 }
434 \f
435 /* Storage case stream. */
436
437 /* Information about storage sink or source. */
438 struct storage_stream_info 
439   {
440     struct casefile *casefile;  /* Storage. */
441   };
442
443 /* Initializes a storage sink. */
444 static void
445 storage_sink_open (struct case_sink *sink)
446 {
447   struct storage_stream_info *info;
448
449   sink->aux = info = xmalloc (sizeof *info);
450   info->casefile = casefile_create (sink->value_cnt);
451 }
452
453 /* Destroys storage stream represented by INFO. */
454 static void
455 destroy_storage_stream_info (struct storage_stream_info *info) 
456 {
457   if (info != NULL) 
458     {
459       casefile_destroy (info->casefile);
460       free (info); 
461     }
462 }
463
464 /* Writes case C to the storage sink SINK. */
465 static void
466 storage_sink_write (struct case_sink *sink, const struct ccase *c)
467 {
468   struct storage_stream_info *info = sink->aux;
469
470   casefile_append (info->casefile, c);
471 }
472
473 /* Destroys internal data in SINK. */
474 static void
475 storage_sink_destroy (struct case_sink *sink)
476 {
477   destroy_storage_stream_info (sink->aux);
478 }
479
480 /* Closes the sink and returns a storage source to read back the
481    written data. */
482 static struct case_source *
483 storage_sink_make_source (struct case_sink *sink) 
484 {
485   struct case_source *source
486     = create_case_source (&storage_source_class, sink->aux);
487   sink->aux = NULL;
488   return source;
489 }
490
491 /* Storage sink. */
492 const struct case_sink_class storage_sink_class = 
493   {
494     "storage",
495     storage_sink_open,
496     storage_sink_write,
497     storage_sink_destroy,
498     storage_sink_make_source,
499   };
500 \f
501 /* Storage source. */
502
503 /* Returns the number of cases that will be read by
504    storage_source_read(). */
505 static int
506 storage_source_count (const struct case_source *source) 
507 {
508   struct storage_stream_info *info = source->aux;
509
510   return casefile_get_case_cnt (info->casefile);
511 }
512
513 /* Reads all cases from the storage source and passes them one by one to
514    write_case(). */
515 static void
516 storage_source_read (struct case_source *source,
517                      struct ccase *output_case,
518                      write_case_func *write_case, write_case_data wc_data)
519 {
520   struct storage_stream_info *info = source->aux;
521   struct ccase casefile_case;
522   struct casereader *reader;
523
524   for (reader = casefile_get_reader (info->casefile);
525        casereader_read (reader, &casefile_case);
526        case_destroy (&casefile_case))
527     {
528       case_copy (output_case, 0,
529                  &casefile_case, 0,
530                  casefile_get_value_cnt (info->casefile));
531       write_case (wc_data);
532     }
533   casereader_destroy (reader);
534 }
535
536 /* Destroys the source's internal data. */
537 static void
538 storage_source_destroy (struct case_source *source)
539 {
540   destroy_storage_stream_info (source->aux);
541 }
542
543 /* Storage source. */
544 const struct case_source_class storage_source_class = 
545   {
546     "storage",
547     storage_source_count,
548     storage_source_read,
549     storage_source_destroy,
550   };
551
552 struct casefile *
553 storage_source_get_casefile (struct case_source *source) 
554 {
555   struct storage_stream_info *info = source->aux;
556
557   assert (source->class == &storage_source_class);
558   return info->casefile;
559 }
560
561 struct case_source *
562 storage_source_create (struct casefile *cf)
563 {
564   struct storage_stream_info *info;
565
566   info = xmalloc (sizeof *info);
567   info->casefile = cf;
568
569   return create_case_source (&storage_source_class, info);
570 }
571 \f
572 /* Null sink.  Used by a few procedures that keep track of output
573    themselves and would throw away anything that the sink
574    contained anyway. */
575
576 const struct case_sink_class null_sink_class = 
577   {
578     "null",
579     NULL,
580     NULL,
581     NULL,
582     NULL,
583   };
584 \f
585 /* Returns a pointer to the lagged case from N_BEFORE cases before the
586    current one, or NULL if there haven't been that many cases yet. */
587 struct ccase *
588 lagged_case (int n_before)
589 {
590   assert (n_before >= 1 );
591   assert (n_before <= n_lag);
592
593   if (n_before <= lag_count)
594     {
595       int index = lag_head - n_before;
596       if (index < 0)
597         index += n_lag;
598       return &lag_queue[index];
599     }
600   else
601     return NULL;
602 }
603    
604 /* Appends TRNS to t_trns[], the list of all transformations to be
605    performed on data as it is read from the active file. */
606 void
607 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
608 {
609   struct transformation *trns;
610   if (n_trns >= m_trns)
611     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
612   trns = &t_trns[n_trns++];
613   trns->proc = proc;
614   trns->free = free;
615   trns->private = private;
616 }
617
618 /* Returns the index number that the next transformation added by
619    add_transformation() will receive.  A trns_proc_func that
620    returns this index causes control flow to jump to it. */
621 size_t
622 next_transformation (void) 
623 {
624   return n_trns;
625 }
626
627 /* Cancels all active transformations, including any transformations
628    created by the input program. */
629 void
630 cancel_transformations (void)
631 {
632   size_t i;
633   for (i = 0; i < n_trns; i++)
634     {
635       struct transformation *t = &t_trns[i];
636       if (t->free != NULL)
637         t->free (t->private);
638     }
639   n_trns = f_trns = 0;
640   free (t_trns);
641   t_trns = NULL;
642   m_trns = 0;
643 }
644 \f
645 /* Creates a case source with class CLASS and auxiliary data AUX
646    and based on dictionary DICT. */
647 struct case_source *
648 create_case_source (const struct case_source_class *class,
649                     void *aux) 
650 {
651   struct case_source *source = xmalloc (sizeof *source);
652   source->class = class;
653   source->aux = aux;
654   return source;
655 }
656
657 /* Destroys case source SOURCE.  It is the caller's responsible to
658    call the source's destroy function, if any. */
659 void
660 free_case_source (struct case_source *source) 
661 {
662   if (source != NULL) 
663     {
664       if (source->class->destroy != NULL)
665         source->class->destroy (source);
666       free (source);
667     }
668 }
669
670 /* Returns nonzero if a case source is "complex". */
671 int
672 case_source_is_complex (const struct case_source *source) 
673 {
674   return source != NULL && (source->class == &input_program_source_class
675                             || source->class == &file_type_source_class);
676 }
677
678 /* Returns nonzero if CLASS is the class of SOURCE. */
679 int
680 case_source_is_class (const struct case_source *source,
681                       const struct case_source_class *class) 
682 {
683   return source != NULL && source->class == class;
684 }
685
686 /* Creates a case sink to accept cases from the given DICT with
687    class CLASS and auxiliary data AUX. */
688 struct case_sink *
689 create_case_sink (const struct case_sink_class *class,
690                   const struct dictionary *dict,
691                   void *aux) 
692 {
693   struct case_sink *sink = xmalloc (sizeof *sink);
694   sink->class = class;
695   sink->value_cnt = dict_get_compacted_value_cnt (dict);
696   sink->aux = aux;
697   return sink;
698 }
699
700 /* Destroys case sink SINK.  */
701 void
702 free_case_sink (struct case_sink *sink) 
703 {
704   if (sink != NULL) 
705     {
706       if (sink->class->destroy != NULL)
707         sink->class->destroy (sink);
708       free (sink); 
709     }
710 }
711 \f
712 /* Represents auxiliary data for handling SPLIT FILE. */
713 struct split_aux_data 
714   {
715     size_t case_count;          /* Number of cases so far. */
716     struct ccase prev_case;     /* Data in previous case. */
717
718     /* Functions to call... */
719     void (*begin_func) (void *);               /* ...before data. */
720     int (*proc_func) (struct ccase *, void *); /* ...with data. */
721     void (*end_func) (void *);                 /* ...after data. */
722     void *func_aux;                            /* Auxiliary data. */ 
723   };
724
725 static int equal_splits (const struct ccase *, const struct ccase *);
726 static int procedure_with_splits_callback (struct ccase *, void *);
727 static void dump_splits (struct ccase *);
728
729 /* Like procedure(), but it automatically breaks the case stream
730    into SPLIT FILE break groups.  Before each group of cases with
731    identical SPLIT FILE variable values, BEGIN_FUNC is called.
732    Then PROC_FUNC is called with each case in the group.  
733    END_FUNC is called when the group is finished.  FUNC_AUX is
734    passed to each of the functions as auxiliary data.
735
736    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
737    and END_FUNC will be called at all. 
738
739    If SPLIT FILE is not in effect, then there is one break group
740    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
741    will be called once. */
742 void
743 procedure_with_splits (void (*begin_func) (void *aux),
744                        int (*proc_func) (struct ccase *, void *aux),
745                        void (*end_func) (void *aux),
746                        void *func_aux) 
747 {
748   struct split_aux_data split_aux;
749
750   split_aux.case_count = 0;
751   case_nullify (&split_aux.prev_case);
752   split_aux.begin_func = begin_func;
753   split_aux.proc_func = proc_func;
754   split_aux.end_func = end_func;
755   split_aux.func_aux = func_aux;
756
757   open_active_file ();
758   internal_procedure (procedure_with_splits_callback, &split_aux);
759   if (split_aux.case_count > 0 && end_func != NULL)
760     end_func (func_aux);
761   close_active_file ();
762
763   case_destroy (&split_aux.prev_case);
764 }
765
766 /* procedure() callback used by procedure_with_splits(). */
767 static int
768 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
769 {
770   struct split_aux_data *split_aux = split_aux_;
771
772   /* Start a new series if needed. */
773   if (split_aux->case_count == 0
774       || !equal_splits (c, &split_aux->prev_case))
775     {
776       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
777         split_aux->end_func (split_aux->func_aux);
778
779       dump_splits (c);
780       case_destroy (&split_aux->prev_case);
781       case_clone (&split_aux->prev_case, c);
782
783       if (split_aux->begin_func != NULL)
784         split_aux->begin_func (split_aux->func_aux);
785     }
786
787   split_aux->case_count++;
788   if (split_aux->proc_func != NULL)
789     return split_aux->proc_func (c, split_aux->func_aux);
790   else
791     return 1;
792 }
793
794 /* Compares the SPLIT FILE variables in cases A and B and returns
795    nonzero only if they differ. */
796 static int
797 equal_splits (const struct ccase *a, const struct ccase *b) 
798 {
799   return case_compare (a, b,
800                        dict_get_split_vars (default_dict),
801                        dict_get_split_cnt (default_dict)) == 0;
802 }
803
804 /* Dumps out the values of all the split variables for the case C. */
805 static void
806 dump_splits (struct ccase *c)
807 {
808   struct variable *const *split;
809   struct tab_table *t;
810   size_t split_cnt;
811   int i;
812
813   split_cnt = dict_get_split_cnt (default_dict);
814   if (split_cnt == 0)
815     return;
816
817   t = tab_create (3, split_cnt + 1, 0);
818   tab_dim (t, tab_natural_dimensions);
819   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
820   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
821   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
822   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
823   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
824   split = dict_get_split_vars (default_dict);
825   for (i = 0; i < split_cnt; i++)
826     {
827       struct variable *v = split[i];
828       char temp_buf[80];
829       const char *val_lab;
830
831       assert (v->type == NUMERIC || v->type == ALPHA);
832       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
833       
834       data_out (temp_buf, &v->print, case_data (c, v->fv));
835       
836       temp_buf[v->print.w] = 0;
837       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
838
839       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
840       if (val_lab)
841         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
842     }
843   tab_flags (t, SOMF_NO_TITLE);
844   tab_submit (t);
845 }
846 \f
847 /* Represents auxiliary data for handling SPLIT FILE in a
848    multipass procedure. */
849 struct multipass_split_aux_data 
850   {
851     struct ccase prev_case;     /* Data in previous case. */
852     struct casefile *casefile;  /* Accumulates data for a split. */
853
854     /* Function to call with the accumulated data. */
855     void (*split_func) (const struct casefile *, void *);
856     void *func_aux;                            /* Auxiliary data. */ 
857   };
858
859 static int multipass_split_callback (struct ccase *c, void *aux_);
860 static void multipass_split_output (struct multipass_split_aux_data *);
861
862 void
863 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
864                                                      void *),
865                                  void *func_aux) 
866 {
867   struct multipass_split_aux_data aux;
868
869   assert (split_func != NULL);
870
871   open_active_file ();
872
873   case_nullify (&aux.prev_case);
874   aux.casefile = NULL;
875   aux.split_func = split_func;
876   aux.func_aux = func_aux;
877
878   internal_procedure (multipass_split_callback, &aux);
879   if (aux.casefile != NULL)
880     multipass_split_output (&aux);
881   case_destroy (&aux.prev_case);
882
883   close_active_file ();
884 }
885
886 /* procedure() callback used by multipass_procedure_with_splits(). */
887 static int
888 multipass_split_callback (struct ccase *c, void *aux_)
889 {
890   struct multipass_split_aux_data *aux = aux_;
891
892   /* Start a new series if needed. */
893   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
894     {
895       /* Pass any cases to split_func. */
896       if (aux->casefile != NULL)
897         multipass_split_output (aux);
898
899       /* Start a new casefile. */
900       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
901
902       /* Record split values. */
903       dump_splits (c);
904       case_destroy (&aux->prev_case);
905       case_clone (&aux->prev_case, c);
906     }
907
908   casefile_append (aux->casefile, c);
909
910   return 1;
911 }
912
913 static void
914 multipass_split_output (struct multipass_split_aux_data *aux)
915 {
916   assert (aux->casefile != NULL);
917   aux->split_func (aux->casefile, aux->func_aux);
918   casefile_destroy (aux->casefile);
919   aux->casefile = NULL;
920 }
921
922
923 /* Discards all the current state in preparation for a data-input
924    command like DATA LIST or GET. */
925 void
926 discard_variables (void)
927 {
928   dict_clear (default_dict);
929   default_handle = NULL;
930
931   n_lag = 0;
932   
933   if (vfm_source != NULL)
934     {
935       free_case_source (vfm_source);
936       vfm_source = NULL;
937     }
938
939   cancel_transformations ();
940
941   ctl_stack_clear ();
942
943   expr_free (process_if_expr);
944   process_if_expr = NULL;
945
946   cancel_temporary ();
947
948   pgm_state = STATE_INIT;
949 }