3d8f03ae56b97f27ef2c01fb1c8cbf0d1c195d6b
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "command.h"
34 #include "dictionary.h"
35 #include "ctl-stack.h"
36 #include "error.h"
37 #include "expressions/public.h"
38 #include "misc.h"
39 #include "settings.h"
40 #include "som.h"
41 #include "str.h"
42 #include "tab.h"
43 #include "var.h"
44 #include "value-labels.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 /*
50    Virtual File Manager (vfm):
51
52    vfm is used to process data files.  It uses the model that
53    data is read from one stream (the data source), processed,
54    then written to another (the data sink).  The data source is
55    then deleted and the data sink becomes the data source for the
56    next procedure. */
57
58 /* Procedure execution data. */
59 struct write_case_data
60   {
61     /* Function to call for each case. */
62     int (*proc_func) (struct ccase *, void *); /* Function. */
63     void *aux;                                 /* Auxiliary data. */ 
64
65     struct ccase trns_case;     /* Case used for transformations. */
66     struct ccase sink_case;     /* Case written to sink, if
67                                    compaction is necessary. */
68     size_t cases_written;       /* Cases output so far. */
69     size_t cases_analyzed;      /* Cases passed to procedure so far. */
70   };
71
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
74
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
77
78 /* Nonzero if the case needs to have values deleted before being
79    stored, zero otherwise. */
80 static int compaction_necessary;
81
82 /* Time at which vfm was last invoked. */
83 static time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
90
91 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
92                                 void *aux);
93 static void update_last_vfm_invocation (void);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (void);
96 static int write_case (struct write_case_data *wc_data);
97 static int execute_transformations (struct ccase *c,
98                                     struct transformation *trns,
99                                     int first_idx, int last_idx,
100                                     int case_num);
101 static int filter_case (const struct ccase *c, int case_num);
102 static void lag_case (const struct ccase *c);
103 static void clear_case (struct ccase *c);
104 static void close_active_file (void);
105 \f
106 /* Public functions. */
107
108 /* Returns the last time the data was read. */
109 time_t
110 vfm_last_invocation (void) 
111 {
112   if (last_vfm_invocation == 0)
113     update_last_vfm_invocation ();
114   return last_vfm_invocation;
115 }
116
117 /* Reads the data from the input program and writes it to a new
118    active file.  For each case we read from the input program, we
119    do the following
120
121    1. Execute permanent transformations.  If these drop the case,
122       start the next case from step 1.
123
124    2. N OF CASES.  If we have already written N cases, start the
125       next case from step 1.
126    
127    3. Write case to replacement active file.
128    
129    4. Execute temporary transformations.  If these drop the case,
130       start the next case from step 1.
131       
132    5. FILTER, PROCESS IF.  If these drop the case, start the next
133       case from step 1.
134    
135    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
136       cases, start the next case from step 1.
137       
138    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
139 void
140 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
141 {
142   if (proc_func == NULL
143       && case_source_is_class (vfm_source, &storage_source_class)
144       && vfm_sink == NULL
145       && !temporary
146       && n_trns == 0)
147     {
148       /* Nothing to do. */
149       update_last_vfm_invocation ();
150       return;
151     }
152
153   open_active_file ();
154   internal_procedure (proc_func, aux);
155   close_active_file ();
156 }
157
158 /* Executes a procedure, as procedure(), except that the caller
159    is responsible for calling open_active_file() and
160    close_active_file(). */
161 static void
162 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
163 {
164   static int recursive_call;
165
166   struct write_case_data wc_data;
167
168   assert (++recursive_call == 1);
169
170   wc_data.proc_func = proc_func;
171   wc_data.aux = aux;
172   create_trns_case (&wc_data.trns_case, default_dict);
173   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
174   wc_data.cases_written = 0;
175
176   update_last_vfm_invocation ();
177
178   if (vfm_source != NULL) 
179     vfm_source->class->read (vfm_source,
180                              &wc_data.trns_case,
181                              write_case, &wc_data);
182
183   case_destroy (&wc_data.sink_case);
184   case_destroy (&wc_data.trns_case);
185
186   assert (--recursive_call == 0);
187 }
188
189 /* Updates last_vfm_invocation. */
190 static void
191 update_last_vfm_invocation (void) 
192 {
193   last_vfm_invocation = time (NULL);
194 }
195
196 /* Creates and returns a case, initializing it from the vectors
197    that say which `value's need to be initialized just once, and
198    which ones need to be re-initialized before every case. */
199 static void
200 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
201 {
202   size_t var_cnt = dict_get_var_cnt (dict);
203   size_t i;
204
205   case_create (trns_case, dict_get_next_value_idx (dict));
206   for (i = 0; i < var_cnt; i++) 
207     {
208       struct variable *v = dict_get_var (dict, i);
209       union value *value = case_data_rw (trns_case, v->fv);
210
211       if (v->type == NUMERIC)
212         value->f = v->reinit ? 0.0 : SYSMIS;
213       else
214         memset (value->s, ' ', v->width);
215     }
216 }
217
218 /* Makes all preparations for reading from the data source and writing
219    to the data sink. */
220 static void
221 open_active_file (void)
222 {
223   /* Make temp_dict refer to the dictionary right before data
224      reaches the sink */
225   if (!temporary)
226     {
227       temp_trns = n_trns;
228       temp_dict = default_dict;
229     }
230
231   /* Figure out compaction. */
232   compaction_necessary = (dict_get_next_value_idx (temp_dict)
233                           != dict_get_compacted_value_cnt (temp_dict));
234
235   /* Prepare sink. */
236   if (vfm_sink == NULL)
237     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
238   if (vfm_sink->class->open != NULL)
239     vfm_sink->class->open (vfm_sink);
240
241   /* Allocate memory for lag queue. */
242   if (n_lag > 0)
243     {
244       int i;
245   
246       lag_count = 0;
247       lag_head = 0;
248       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
249       for (i = 0; i < n_lag; i++)
250         case_nullify (&lag_queue[i]);
251     }
252
253   /* Close any unclosed DO IF or LOOP constructs. */
254   ctl_stack_clear ();
255 }
256
257 /* Transforms trns_case and writes it to the replacement active
258    file if advisable.  Returns nonzero if more cases can be
259    accepted, zero otherwise.  Do not call this function again
260    after it has returned zero once.  */
261 static int
262 write_case (struct write_case_data *wc_data)
263 {
264   /* Execute permanent transformations.  */
265   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
266                                 wc_data->cases_written + 1))
267     goto done;
268
269   /* N OF CASES. */
270   if (dict_get_case_limit (default_dict)
271       && wc_data->cases_written >= dict_get_case_limit (default_dict))
272     goto done;
273   wc_data->cases_written++;
274
275   /* Write case to LAG queue. */
276   if (n_lag)
277     lag_case (&wc_data->trns_case);
278
279   /* Write case to replacement active file. */
280   if (vfm_sink->class->write != NULL) 
281     {
282       if (compaction_necessary) 
283         {
284           dict_compact_case (temp_dict, &wc_data->sink_case,
285                              &wc_data->trns_case);
286           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
287         }
288       else
289         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
290     }
291   
292   /* Execute temporary transformations. */
293   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
294                                 wc_data->cases_written))
295     goto done;
296   
297   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
298   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
299       || (dict_get_case_limit (temp_dict)
300           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
301     goto done;
302   wc_data->cases_analyzed++;
303
304   /* Pass case to procedure. */
305   if (wc_data->proc_func != NULL)
306     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
307
308  done:
309   clear_case (&wc_data->trns_case);
310   return 1;
311 }
312
313 /* Transforms case C using the transformations in TRNS[] with
314    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
315    become case CASE_NUM (1-based) in the output file.  Returns
316    zero if the case was filtered out by one of the
317    transformations, nonzero otherwise. */
318 static int
319 execute_transformations (struct ccase *c,
320                          struct transformation *trns,
321                          int first_idx, int last_idx,
322                          int case_num) 
323 {
324   int idx;
325
326   for (idx = first_idx; idx != last_idx; )
327     {
328       struct transformation *t = &trns[idx];
329       int retval = t->proc (t->private, c, case_num);
330       switch (retval)
331         {
332         case -1:
333           idx++;
334           break;
335           
336         case -2:
337           return 0;
338           
339         default:
340           idx = retval;
341           break;
342         }
343     }
344
345   return 1;
346 }
347
348 /* Returns nonzero if case C with case number CASE_NUM should be
349    exclude as specified on FILTER or PROCESS IF, otherwise
350    zero. */
351 static int
352 filter_case (const struct ccase *c, int case_idx)
353 {
354   /* FILTER. */
355   struct variable *filter_var = dict_get_filter (default_dict);
356   if (filter_var != NULL) 
357     {
358       double f = case_num (c, filter_var->fv);
359       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
360         return 1;
361     }
362
363   /* PROCESS IF. */
364   if (process_if_expr != NULL
365       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
366     return 1;
367
368   return 0;
369 }
370
371 /* Add C to the lag queue. */
372 static void
373 lag_case (const struct ccase *c)
374 {
375   if (lag_count < n_lag)
376     lag_count++;
377   case_destroy (&lag_queue[lag_head]);
378   case_clone (&lag_queue[lag_head], c);
379   if (++lag_head >= n_lag)
380     lag_head = 0;
381 }
382
383 /* Clears the variables in C that need to be cleared between
384    processing cases.  */
385 static void
386 clear_case (struct ccase *c)
387 {
388   size_t var_cnt = dict_get_var_cnt (default_dict);
389   size_t i;
390   
391   for (i = 0; i < var_cnt; i++) 
392     {
393       struct variable *v = dict_get_var (default_dict, i);
394       if (v->init && v->reinit) 
395         {
396           if (v->type == NUMERIC)
397             case_data_rw (c, v->fv)->f = SYSMIS;
398           else
399             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
400         } 
401     }
402 }
403
404 /* Closes the active file. */
405 static void
406 close_active_file (void)
407 {
408   /* Free memory for lag queue, and turn off lagging. */
409   if (n_lag > 0)
410     {
411       int i;
412       
413       for (i = 0; i < n_lag; i++)
414         case_destroy (&lag_queue[i]);
415       free (lag_queue);
416       n_lag = 0;
417     }
418   
419   /* Dictionary from before TEMPORARY becomes permanent.. */
420   if (temporary)
421     {
422       dict_destroy (default_dict);
423       default_dict = temp_dict;
424       temp_dict = NULL;
425     }
426
427   /* Finish compaction. */
428   if (compaction_necessary)
429     dict_compact_values (default_dict);
430     
431   /* Free data source. */
432   free_case_source (vfm_source);
433   vfm_source = NULL;
434
435   /* Old data sink becomes new data source. */
436   if (vfm_sink->class->make_source != NULL)
437     vfm_source = vfm_sink->class->make_source (vfm_sink);
438   free_case_sink (vfm_sink);
439   vfm_sink = NULL;
440
441   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
442      and get rid of all the transformations. */
443   cancel_temporary ();
444   expr_free (process_if_expr);
445   process_if_expr = NULL;
446   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
447     dict_set_filter (default_dict, NULL);
448   dict_set_case_limit (default_dict, 0);
449   dict_clear_vectors (default_dict);
450   cancel_transformations ();
451 }
452 \f
453 /* Storage case stream. */
454
455 /* Information about storage sink or source. */
456 struct storage_stream_info 
457   {
458     struct casefile *casefile;  /* Storage. */
459   };
460
461 /* Initializes a storage sink. */
462 static void
463 storage_sink_open (struct case_sink *sink)
464 {
465   struct storage_stream_info *info;
466
467   sink->aux = info = xmalloc (sizeof *info);
468   info->casefile = casefile_create (sink->value_cnt);
469 }
470
471 /* Destroys storage stream represented by INFO. */
472 static void
473 destroy_storage_stream_info (struct storage_stream_info *info) 
474 {
475   if (info != NULL) 
476     {
477       casefile_destroy (info->casefile);
478       free (info); 
479     }
480 }
481
482 /* Writes case C to the storage sink SINK. */
483 static void
484 storage_sink_write (struct case_sink *sink, const struct ccase *c)
485 {
486   struct storage_stream_info *info = sink->aux;
487
488   casefile_append (info->casefile, c);
489 }
490
491 /* Destroys internal data in SINK. */
492 static void
493 storage_sink_destroy (struct case_sink *sink)
494 {
495   destroy_storage_stream_info (sink->aux);
496 }
497
498 /* Closes the sink and returns a storage source to read back the
499    written data. */
500 static struct case_source *
501 storage_sink_make_source (struct case_sink *sink) 
502 {
503   struct case_source *source
504     = create_case_source (&storage_source_class, sink->aux);
505   sink->aux = NULL;
506   return source;
507 }
508
509 /* Storage sink. */
510 const struct case_sink_class storage_sink_class = 
511   {
512     "storage",
513     storage_sink_open,
514     storage_sink_write,
515     storage_sink_destroy,
516     storage_sink_make_source,
517   };
518 \f
519 /* Storage source. */
520
521 /* Returns the number of cases that will be read by
522    storage_source_read(). */
523 static int
524 storage_source_count (const struct case_source *source) 
525 {
526   struct storage_stream_info *info = source->aux;
527
528   return casefile_get_case_cnt (info->casefile);
529 }
530
531 /* Reads all cases from the storage source and passes them one by one to
532    write_case(). */
533 static void
534 storage_source_read (struct case_source *source,
535                      struct ccase *output_case,
536                      write_case_func *write_case, write_case_data wc_data)
537 {
538   struct storage_stream_info *info = source->aux;
539   struct ccase casefile_case;
540   struct casereader *reader;
541
542   for (reader = casefile_get_reader (info->casefile);
543        casereader_read (reader, &casefile_case);
544        case_destroy (&casefile_case))
545     {
546       case_copy (output_case, 0,
547                  &casefile_case, 0,
548                  casefile_get_value_cnt (info->casefile));
549       write_case (wc_data);
550     }
551   casereader_destroy (reader);
552 }
553
554 /* Destroys the source's internal data. */
555 static void
556 storage_source_destroy (struct case_source *source)
557 {
558   destroy_storage_stream_info (source->aux);
559 }
560
561 /* Storage source. */
562 const struct case_source_class storage_source_class = 
563   {
564     "storage",
565     storage_source_count,
566     storage_source_read,
567     storage_source_destroy,
568   };
569
570 struct casefile *
571 storage_source_get_casefile (struct case_source *source) 
572 {
573   struct storage_stream_info *info = source->aux;
574
575   assert (source->class == &storage_source_class);
576   return info->casefile;
577 }
578
579 struct case_source *
580 storage_source_create (struct casefile *cf)
581 {
582   struct storage_stream_info *info;
583
584   info = xmalloc (sizeof *info);
585   info->casefile = cf;
586
587   return create_case_source (&storage_source_class, info);
588 }
589 \f
590 /* Null sink.  Used by a few procedures that keep track of output
591    themselves and would throw away anything that the sink
592    contained anyway. */
593
594 const struct case_sink_class null_sink_class = 
595   {
596     "null",
597     NULL,
598     NULL,
599     NULL,
600     NULL,
601   };
602 \f
603 /* Returns a pointer to the lagged case from N_BEFORE cases before the
604    current one, or NULL if there haven't been that many cases yet. */
605 struct ccase *
606 lagged_case (int n_before)
607 {
608   assert (n_before >= 1 );
609   assert (n_before <= n_lag);
610
611   if (n_before <= lag_count)
612     {
613       int index = lag_head - n_before;
614       if (index < 0)
615         index += n_lag;
616       return &lag_queue[index];
617     }
618   else
619     return NULL;
620 }
621    
622 /* Appends TRNS to t_trns[], the list of all transformations to be
623    performed on data as it is read from the active file. */
624 void
625 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
626 {
627   struct transformation *trns;
628   if (n_trns >= m_trns)
629     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
630   trns = &t_trns[n_trns++];
631   trns->proc = proc;
632   trns->free = free;
633   trns->private = private;
634 }
635
636 /* Returns the index number that the next transformation added by
637    add_transformation() will receive.  A trns_proc_func that
638    returns this index causes control flow to jump to it. */
639 size_t
640 next_transformation (void) 
641 {
642   return n_trns;
643 }
644
645 /* Cancels all active transformations, including any transformations
646    created by the input program. */
647 void
648 cancel_transformations (void)
649 {
650   size_t i;
651   for (i = 0; i < n_trns; i++)
652     {
653       struct transformation *t = &t_trns[i];
654       if (t->free != NULL)
655         t->free (t->private);
656     }
657   n_trns = f_trns = 0;
658   free (t_trns);
659   t_trns = NULL;
660   m_trns = 0;
661 }
662 \f
663 /* Creates a case source with class CLASS and auxiliary data AUX
664    and based on dictionary DICT. */
665 struct case_source *
666 create_case_source (const struct case_source_class *class,
667                     void *aux) 
668 {
669   struct case_source *source = xmalloc (sizeof *source);
670   source->class = class;
671   source->aux = aux;
672   return source;
673 }
674
675 /* Destroys case source SOURCE.  It is the caller's responsible to
676    call the source's destroy function, if any. */
677 void
678 free_case_source (struct case_source *source) 
679 {
680   if (source != NULL) 
681     {
682       if (source->class->destroy != NULL)
683         source->class->destroy (source);
684       free (source);
685     }
686 }
687
688 /* Returns nonzero if a case source is "complex". */
689 int
690 case_source_is_complex (const struct case_source *source) 
691 {
692   return source != NULL && (source->class == &input_program_source_class
693                             || source->class == &file_type_source_class);
694 }
695
696 /* Returns nonzero if CLASS is the class of SOURCE. */
697 int
698 case_source_is_class (const struct case_source *source,
699                       const struct case_source_class *class) 
700 {
701   return source != NULL && source->class == class;
702 }
703
704 /* Creates a case sink to accept cases from the given DICT with
705    class CLASS and auxiliary data AUX. */
706 struct case_sink *
707 create_case_sink (const struct case_sink_class *class,
708                   const struct dictionary *dict,
709                   void *aux) 
710 {
711   struct case_sink *sink = xmalloc (sizeof *sink);
712   sink->class = class;
713   sink->value_cnt = dict_get_compacted_value_cnt (dict);
714   sink->aux = aux;
715   return sink;
716 }
717
718 /* Destroys case sink SINK.  */
719 void
720 free_case_sink (struct case_sink *sink) 
721 {
722   if (sink != NULL) 
723     {
724       if (sink->class->destroy != NULL)
725         sink->class->destroy (sink);
726       free (sink); 
727     }
728 }
729 \f
730 /* Represents auxiliary data for handling SPLIT FILE. */
731 struct split_aux_data 
732   {
733     size_t case_count;          /* Number of cases so far. */
734     struct ccase prev_case;     /* Data in previous case. */
735
736     /* Functions to call... */
737     void (*begin_func) (void *);               /* ...before data. */
738     int (*proc_func) (struct ccase *, void *); /* ...with data. */
739     void (*end_func) (void *);                 /* ...after data. */
740     void *func_aux;                            /* Auxiliary data. */ 
741   };
742
743 static int equal_splits (const struct ccase *, const struct ccase *);
744 static int procedure_with_splits_callback (struct ccase *, void *);
745 static void dump_splits (struct ccase *);
746
747 /* Like procedure(), but it automatically breaks the case stream
748    into SPLIT FILE break groups.  Before each group of cases with
749    identical SPLIT FILE variable values, BEGIN_FUNC is called.
750    Then PROC_FUNC is called with each case in the group.  
751    END_FUNC is called when the group is finished.  FUNC_AUX is
752    passed to each of the functions as auxiliary data.
753
754    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
755    and END_FUNC will be called at all. 
756
757    If SPLIT FILE is not in effect, then there is one break group
758    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
759    will be called once. */
760 void
761 procedure_with_splits (void (*begin_func) (void *aux),
762                        int (*proc_func) (struct ccase *, void *aux),
763                        void (*end_func) (void *aux),
764                        void *func_aux) 
765 {
766   struct split_aux_data split_aux;
767
768   split_aux.case_count = 0;
769   case_nullify (&split_aux.prev_case);
770   split_aux.begin_func = begin_func;
771   split_aux.proc_func = proc_func;
772   split_aux.end_func = end_func;
773   split_aux.func_aux = func_aux;
774
775   open_active_file ();
776   internal_procedure (procedure_with_splits_callback, &split_aux);
777   if (split_aux.case_count > 0 && end_func != NULL)
778     end_func (func_aux);
779   close_active_file ();
780
781   case_destroy (&split_aux.prev_case);
782 }
783
784 /* procedure() callback used by procedure_with_splits(). */
785 static int
786 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
787 {
788   struct split_aux_data *split_aux = split_aux_;
789
790   /* Start a new series if needed. */
791   if (split_aux->case_count == 0
792       || !equal_splits (c, &split_aux->prev_case))
793     {
794       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
795         split_aux->end_func (split_aux->func_aux);
796
797       dump_splits (c);
798       case_destroy (&split_aux->prev_case);
799       case_clone (&split_aux->prev_case, c);
800
801       if (split_aux->begin_func != NULL)
802         split_aux->begin_func (split_aux->func_aux);
803     }
804
805   split_aux->case_count++;
806   if (split_aux->proc_func != NULL)
807     return split_aux->proc_func (c, split_aux->func_aux);
808   else
809     return 1;
810 }
811
812 /* Compares the SPLIT FILE variables in cases A and B and returns
813    nonzero only if they differ. */
814 static int
815 equal_splits (const struct ccase *a, const struct ccase *b) 
816 {
817   return case_compare (a, b,
818                        dict_get_split_vars (default_dict),
819                        dict_get_split_cnt (default_dict)) == 0;
820 }
821
822 /* Dumps out the values of all the split variables for the case C. */
823 static void
824 dump_splits (struct ccase *c)
825 {
826   struct variable *const *split;
827   struct tab_table *t;
828   size_t split_cnt;
829   int i;
830
831   split_cnt = dict_get_split_cnt (default_dict);
832   if (split_cnt == 0)
833     return;
834
835   t = tab_create (3, split_cnt + 1, 0);
836   tab_dim (t, tab_natural_dimensions);
837   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
838   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
839   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
840   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
841   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
842   split = dict_get_split_vars (default_dict);
843   for (i = 0; i < split_cnt; i++)
844     {
845       struct variable *v = split[i];
846       char temp_buf[80];
847       const char *val_lab;
848
849       assert (v->type == NUMERIC || v->type == ALPHA);
850       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
851       
852       data_out (temp_buf, &v->print, case_data (c, v->fv));
853       
854       temp_buf[v->print.w] = 0;
855       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
856
857       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
858       if (val_lab)
859         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
860     }
861   tab_flags (t, SOMF_NO_TITLE);
862   tab_submit (t);
863 }
864 \f
865 /* Represents auxiliary data for handling SPLIT FILE in a
866    multipass procedure. */
867 struct multipass_split_aux_data 
868   {
869     struct ccase prev_case;     /* Data in previous case. */
870     struct casefile *casefile;  /* Accumulates data for a split. */
871
872     /* Function to call with the accumulated data. */
873     void (*split_func) (const struct casefile *, void *);
874     void *func_aux;                            /* Auxiliary data. */ 
875   };
876
877 static int multipass_split_callback (struct ccase *c, void *aux_);
878 static void multipass_split_output (struct multipass_split_aux_data *);
879
880 void
881 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
882                                                      void *),
883                                  void *func_aux) 
884 {
885   struct multipass_split_aux_data aux;
886
887   assert (split_func != NULL);
888
889   open_active_file ();
890
891   case_nullify (&aux.prev_case);
892   aux.casefile = NULL;
893   aux.split_func = split_func;
894   aux.func_aux = func_aux;
895
896   internal_procedure (multipass_split_callback, &aux);
897   if (aux.casefile != NULL)
898     multipass_split_output (&aux);
899   case_destroy (&aux.prev_case);
900
901   close_active_file ();
902 }
903
904 /* procedure() callback used by multipass_procedure_with_splits(). */
905 static int
906 multipass_split_callback (struct ccase *c, void *aux_)
907 {
908   struct multipass_split_aux_data *aux = aux_;
909
910   /* Start a new series if needed. */
911   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
912     {
913       /* Pass any cases to split_func. */
914       if (aux->casefile != NULL)
915         multipass_split_output (aux);
916
917       /* Start a new casefile. */
918       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
919
920       /* Record split values. */
921       dump_splits (c);
922       case_destroy (&aux->prev_case);
923       case_clone (&aux->prev_case, c);
924     }
925
926   casefile_append (aux->casefile, c);
927
928   return 1;
929 }
930
931 static void
932 multipass_split_output (struct multipass_split_aux_data *aux)
933 {
934   assert (aux->casefile != NULL);
935   aux->split_func (aux->casefile, aux->func_aux);
936   casefile_destroy (aux->casefile);
937   aux->casefile = NULL;
938 }
939
940
941 /* Discards all the current state in preparation for a data-input
942    command like DATA LIST or GET. */
943 void
944 discard_variables (void)
945 {
946   dict_clear (default_dict);
947   default_handle = NULL;
948
949   n_lag = 0;
950   
951   if (vfm_source != NULL)
952     {
953       free_case_source (vfm_source);
954       vfm_source = NULL;
955     }
956
957   cancel_transformations ();
958
959   ctl_stack_clear ();
960
961   expr_free (process_if_expr);
962   process_if_expr = NULL;
963
964   cancel_temporary ();
965
966   pgm_state = STATE_INIT;
967 }