Refactored to make it easier to abstract a dictionary from the rest of PSPP.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "command.h"
34 #include "dictionary.h"
35 #include "do-ifP.h"
36 #include "error.h"
37 #include "expressions/public.h"
38 #include "misc.h"
39 #include "settings.h"
40 #include "som.h"
41 #include "str.h"
42 #include "tab.h"
43 #include "var.h"
44 #include "value-labels.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 /*
50    Virtual File Manager (vfm):
51
52    vfm is used to process data files.  It uses the model that
53    data is read from one stream (the data source), processed,
54    then written to another (the data sink).  The data source is
55    then deleted and the data sink becomes the data source for the
56    next procedure. */
57
58 /* Procedure execution data. */
59 struct write_case_data
60   {
61     /* Function to call for each case. */
62     int (*proc_func) (struct ccase *, void *); /* Function. */
63     void *aux;                                 /* Auxiliary data. */ 
64
65     struct ccase trns_case;     /* Case used for transformations. */
66     struct ccase sink_case;     /* Case written to sink, if
67                                    compaction is necessary. */
68     size_t cases_written;       /* Cases output so far. */
69     size_t cases_analyzed;      /* Cases passed to procedure so far. */
70   };
71
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
74
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
77
78 /* Nonzero if the case needs to have values deleted before being
79    stored, zero otherwise. */
80 static int compaction_necessary;
81
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
90
91 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
92                                 void *aux);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static int write_case (struct write_case_data *wc_data);
96 static int execute_transformations (struct ccase *c,
97                                     struct trns_header **trns,
98                                     int first_idx, int last_idx,
99                                     int case_num);
100 static int filter_case (const struct ccase *c, int case_num);
101 static void lag_case (const struct ccase *c);
102 static void clear_case (struct ccase *c);
103 static void close_active_file (void);
104 \f
105 /* Public functions. */
106
107 /* Reads the data from the input program and writes it to a new
108    active file.  For each case we read from the input program, we
109    do the following
110
111    1. Execute permanent transformations.  If these drop the case,
112       start the next case from step 1.
113
114    2. N OF CASES.  If we have already written N cases, start the
115       next case from step 1.
116    
117    3. Write case to replacement active file.
118    
119    4. Execute temporary transformations.  If these drop the case,
120       start the next case from step 1.
121       
122    5. FILTER, PROCESS IF.  If these drop the case, start the next
123       case from step 1.
124    
125    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
126       cases, start the next case from step 1.
127       
128    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
129 void
130 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
131 {
132   if (proc_func == NULL
133       && case_source_is_class (vfm_source, &storage_source_class)
134       && vfm_sink == NULL
135       && !temporary
136       && n_trns == 0)
137     {
138       /* Nothing to do. */
139       return;
140     }
141
142   open_active_file ();
143   internal_procedure (proc_func, aux);
144   close_active_file ();
145 }
146
147 /* Executes a procedure, as procedure(), except that the caller
148    is responsible for calling open_active_file() and
149    close_active_file(). */
150 static void
151 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
152 {
153   static int recursive_call;
154
155   struct write_case_data wc_data;
156
157   assert (++recursive_call == 1);
158
159   wc_data.proc_func = proc_func;
160   wc_data.aux = aux;
161   create_trns_case (&wc_data.trns_case, default_dict);
162   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
163   wc_data.cases_written = 0;
164
165   last_vfm_invocation = time (NULL);
166
167   if (vfm_source != NULL) 
168     vfm_source->class->read (vfm_source,
169                              &wc_data.trns_case,
170                              write_case, &wc_data);
171
172   case_destroy (&wc_data.sink_case);
173   case_destroy (&wc_data.trns_case);
174
175   assert (--recursive_call == 0);
176 }
177
178 /* Creates and returns a case, initializing it from the vectors
179    that say which `value's need to be initialized just once, and
180    which ones need to be re-initialized before every case. */
181 static void
182 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
183 {
184   size_t var_cnt = dict_get_var_cnt (dict);
185   size_t i;
186
187   case_create (trns_case, dict_get_next_value_idx (dict));
188   for (i = 0; i < var_cnt; i++) 
189     {
190       struct variable *v = dict_get_var (dict, i);
191       union value *value = case_data_rw (trns_case, v->fv);
192
193       if (v->type == NUMERIC)
194         value->f = v->reinit ? 0.0 : SYSMIS;
195       else
196         memset (value->s, ' ', v->width);
197     }
198 }
199
200 /* Makes all preparations for reading from the data source and writing
201    to the data sink. */
202 static void
203 open_active_file (void)
204 {
205   /* Make temp_dict refer to the dictionary right before data
206      reaches the sink */
207   if (!temporary)
208     {
209       temp_trns = n_trns;
210       temp_dict = default_dict;
211     }
212
213   /* Figure out compaction. */
214   compaction_necessary = (dict_get_next_value_idx (temp_dict)
215                           != dict_get_compacted_value_cnt (temp_dict));
216
217   /* Prepare sink. */
218   if (vfm_sink == NULL)
219     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
220   if (vfm_sink->class->open != NULL)
221     vfm_sink->class->open (vfm_sink);
222
223   /* Allocate memory for lag queue. */
224   if (n_lag > 0)
225     {
226       int i;
227   
228       lag_count = 0;
229       lag_head = 0;
230       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
231       for (i = 0; i < n_lag; i++)
232         case_nullify (&lag_queue[i]);
233     }
234
235   /* Close any unclosed DO IF or LOOP constructs. */
236   discard_ctl_stack ();
237 }
238
239 /* Transforms trns_case and writes it to the replacement active
240    file if advisable.  Returns nonzero if more cases can be
241    accepted, zero otherwise.  Do not call this function again
242    after it has returned zero once.  */
243 static int
244 write_case (struct write_case_data *wc_data)
245 {
246   /* Execute permanent transformations.  */
247   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
248                                 wc_data->cases_written + 1))
249     goto done;
250
251   /* N OF CASES. */
252   if (dict_get_case_limit (default_dict)
253       && wc_data->cases_written >= dict_get_case_limit (default_dict))
254     goto done;
255   wc_data->cases_written++;
256
257   /* Write case to LAG queue. */
258   if (n_lag)
259     lag_case (&wc_data->trns_case);
260
261   /* Write case to replacement active file. */
262   if (vfm_sink->class->write != NULL) 
263     {
264       if (compaction_necessary) 
265         {
266           dict_compact_case (temp_dict, &wc_data->sink_case,
267                              &wc_data->trns_case);
268           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
269         }
270       else
271         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
272     }
273   
274   /* Execute temporary transformations. */
275   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
276                                 wc_data->cases_written))
277     goto done;
278   
279   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
280   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
281       || (dict_get_case_limit (temp_dict)
282           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
283     goto done;
284   wc_data->cases_analyzed++;
285
286   /* Pass case to procedure. */
287   if (wc_data->proc_func != NULL)
288     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
289
290  done:
291   clear_case (&wc_data->trns_case);
292   return 1;
293 }
294
295 /* Transforms case C using the transformations in TRNS[] with
296    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
297    become case CASE_NUM (1-based) in the output file.  Returns
298    zero if the case was filtered out by one of the
299    transformations, nonzero otherwise. */
300 static int
301 execute_transformations (struct ccase *c,
302                          struct trns_header **trns,
303                          int first_idx, int last_idx,
304                          int case_num) 
305 {
306   int idx;
307
308   for (idx = first_idx; idx != last_idx; )
309     {
310       int retval = trns[idx]->proc (trns[idx], c, case_num);
311       switch (retval)
312         {
313         case -1:
314           idx++;
315           break;
316           
317         case -2:
318           return 0;
319           
320         default:
321           idx = retval;
322           break;
323         }
324     }
325
326   return 1;
327 }
328
329 /* Returns nonzero if case C with case number CASE_NUM should be
330    exclude as specified on FILTER or PROCESS IF, otherwise
331    zero. */
332 static int
333 filter_case (const struct ccase *c, int case_idx)
334 {
335   /* FILTER. */
336   struct variable *filter_var = dict_get_filter (default_dict);
337   if (filter_var != NULL) 
338     {
339       double f = case_num (c, filter_var->fv);
340       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
341         return 1;
342     }
343
344   /* PROCESS IF. */
345   if (process_if_expr != NULL
346       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
347     return 1;
348
349   return 0;
350 }
351
352 /* Add C to the lag queue. */
353 static void
354 lag_case (const struct ccase *c)
355 {
356   if (lag_count < n_lag)
357     lag_count++;
358   case_destroy (&lag_queue[lag_head]);
359   case_clone (&lag_queue[lag_head], c);
360   if (++lag_head >= n_lag)
361     lag_head = 0;
362 }
363
364 /* Clears the variables in C that need to be cleared between
365    processing cases.  */
366 static void
367 clear_case (struct ccase *c)
368 {
369   size_t var_cnt = dict_get_var_cnt (default_dict);
370   size_t i;
371   
372   for (i = 0; i < var_cnt; i++) 
373     {
374       struct variable *v = dict_get_var (default_dict, i);
375       if (v->init && v->reinit) 
376         {
377           if (v->type == NUMERIC)
378             case_data_rw (c, v->fv)->f = SYSMIS;
379           else
380             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
381         } 
382     }
383 }
384
385 /* Closes the active file. */
386 static void
387 close_active_file (void)
388 {
389   /* Free memory for lag queue, and turn off lagging. */
390   if (n_lag > 0)
391     {
392       int i;
393       
394       for (i = 0; i < n_lag; i++)
395         case_destroy (&lag_queue[i]);
396       free (lag_queue);
397       n_lag = 0;
398     }
399   
400   /* Dictionary from before TEMPORARY becomes permanent.. */
401   if (temporary)
402     {
403       dict_destroy (default_dict);
404       default_dict = temp_dict;
405       temp_dict = NULL;
406     }
407
408   /* Finish compaction. */
409   if (compaction_necessary)
410     dict_compact_values (default_dict);
411     
412   /* Free data source. */
413   free_case_source (vfm_source);
414   vfm_source = NULL;
415
416   /* Old data sink becomes new data source. */
417   if (vfm_sink->class->make_source != NULL)
418     vfm_source = vfm_sink->class->make_source (vfm_sink);
419   free_case_sink (vfm_sink);
420   vfm_sink = NULL;
421
422   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
423      and get rid of all the transformations. */
424   cancel_temporary ();
425   expr_free (process_if_expr);
426   process_if_expr = NULL;
427   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
428     dict_set_filter (default_dict, NULL);
429   dict_set_case_limit (default_dict, 0);
430   dict_clear_vectors (default_dict);
431   cancel_transformations ();
432 }
433 \f
434 /* Storage case stream. */
435
436 /* Information about storage sink or source. */
437 struct storage_stream_info 
438   {
439     struct casefile *casefile;  /* Storage. */
440   };
441
442 /* Initializes a storage sink. */
443 static void
444 storage_sink_open (struct case_sink *sink)
445 {
446   struct storage_stream_info *info;
447
448   sink->aux = info = xmalloc (sizeof *info);
449   info->casefile = casefile_create (sink->value_cnt);
450 }
451
452 /* Destroys storage stream represented by INFO. */
453 static void
454 destroy_storage_stream_info (struct storage_stream_info *info) 
455 {
456   if (info != NULL) 
457     {
458       casefile_destroy (info->casefile);
459       free (info); 
460     }
461 }
462
463 /* Writes case C to the storage sink SINK. */
464 static void
465 storage_sink_write (struct case_sink *sink, const struct ccase *c)
466 {
467   struct storage_stream_info *info = sink->aux;
468
469   casefile_append (info->casefile, c);
470 }
471
472 /* Destroys internal data in SINK. */
473 static void
474 storage_sink_destroy (struct case_sink *sink)
475 {
476   destroy_storage_stream_info (sink->aux);
477 }
478
479 /* Closes the sink and returns a storage source to read back the
480    written data. */
481 static struct case_source *
482 storage_sink_make_source (struct case_sink *sink) 
483 {
484   struct case_source *source
485     = create_case_source (&storage_source_class, sink->aux);
486   sink->aux = NULL;
487   return source;
488 }
489
490 /* Storage sink. */
491 const struct case_sink_class storage_sink_class = 
492   {
493     "storage",
494     storage_sink_open,
495     storage_sink_write,
496     storage_sink_destroy,
497     storage_sink_make_source,
498   };
499 \f
500 /* Storage source. */
501
502 /* Returns the number of cases that will be read by
503    storage_source_read(). */
504 static int
505 storage_source_count (const struct case_source *source) 
506 {
507   struct storage_stream_info *info = source->aux;
508
509   return casefile_get_case_cnt (info->casefile);
510 }
511
512 /* Reads all cases from the storage source and passes them one by one to
513    write_case(). */
514 static void
515 storage_source_read (struct case_source *source,
516                      struct ccase *output_case,
517                      write_case_func *write_case, write_case_data wc_data)
518 {
519   struct storage_stream_info *info = source->aux;
520   struct ccase casefile_case;
521   struct casereader *reader;
522
523   for (reader = casefile_get_reader (info->casefile);
524        casereader_read (reader, &casefile_case);
525        case_destroy (&casefile_case))
526     {
527       case_copy (output_case, 0,
528                  &casefile_case, 0,
529                  casefile_get_value_cnt (info->casefile));
530       write_case (wc_data);
531     }
532   casereader_destroy (reader);
533 }
534
535 /* Destroys the source's internal data. */
536 static void
537 storage_source_destroy (struct case_source *source)
538 {
539   destroy_storage_stream_info (source->aux);
540 }
541
542 /* Storage source. */
543 const struct case_source_class storage_source_class = 
544   {
545     "storage",
546     storage_source_count,
547     storage_source_read,
548     storage_source_destroy,
549   };
550
551 struct casefile *
552 storage_source_get_casefile (struct case_source *source) 
553 {
554   struct storage_stream_info *info = source->aux;
555
556   assert (source->class == &storage_source_class);
557   return info->casefile;
558 }
559
560 struct case_source *
561 storage_source_create (struct casefile *cf)
562 {
563   struct storage_stream_info *info;
564
565   info = xmalloc (sizeof *info);
566   info->casefile = cf;
567
568   return create_case_source (&storage_source_class, info);
569 }
570 \f
571 /* Null sink.  Used by a few procedures that keep track of output
572    themselves and would throw away anything that the sink
573    contained anyway. */
574
575 const struct case_sink_class null_sink_class = 
576   {
577     "null",
578     NULL,
579     NULL,
580     NULL,
581     NULL,
582   };
583 \f
584 /* Returns a pointer to the lagged case from N_BEFORE cases before the
585    current one, or NULL if there haven't been that many cases yet. */
586 struct ccase *
587 lagged_case (int n_before)
588 {
589   assert (n_before >= 1 );
590   assert (n_before <= n_lag);
591
592   if (n_before <= lag_count)
593     {
594       int index = lag_head - n_before;
595       if (index < 0)
596         index += n_lag;
597       return &lag_queue[index];
598     }
599   else
600     return NULL;
601 }
602    
603 /* Appends TRNS to t_trns[], the list of all transformations to be
604    performed on data as it is read from the active file. */
605 void
606 add_transformation (struct trns_header * trns)
607 {
608   if (n_trns >= m_trns)
609     {
610       m_trns += 16;
611       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
612     }
613   t_trns[n_trns] = trns;
614   trns->index = n_trns++;
615 }
616
617 /* Cancels all active transformations, including any transformations
618    created by the input program. */
619 void
620 cancel_transformations (void)
621 {
622   int i;
623   for (i = 0; i < n_trns; i++)
624     {
625       if (t_trns[i]->free)
626         t_trns[i]->free (t_trns[i]);
627       free (t_trns[i]);
628     }
629   n_trns = f_trns = 0;
630   free (t_trns);
631   t_trns = NULL;
632   m_trns = 0;
633 }
634 \f
635 /* Creates a case source with class CLASS and auxiliary data AUX
636    and based on dictionary DICT. */
637 struct case_source *
638 create_case_source (const struct case_source_class *class,
639                     void *aux) 
640 {
641   struct case_source *source = xmalloc (sizeof *source);
642   source->class = class;
643   source->aux = aux;
644   return source;
645 }
646
647 /* Destroys case source SOURCE.  It is the caller's responsible to
648    call the source's destroy function, if any. */
649 void
650 free_case_source (struct case_source *source) 
651 {
652   if (source != NULL) 
653     {
654       if (source->class->destroy != NULL)
655         source->class->destroy (source);
656       free (source);
657     }
658 }
659
660 /* Returns nonzero if a case source is "complex". */
661 int
662 case_source_is_complex (const struct case_source *source) 
663 {
664   return source != NULL && (source->class == &input_program_source_class
665                             || source->class == &file_type_source_class);
666 }
667
668 /* Returns nonzero if CLASS is the class of SOURCE. */
669 int
670 case_source_is_class (const struct case_source *source,
671                       const struct case_source_class *class) 
672 {
673   return source != NULL && source->class == class;
674 }
675
676 /* Creates a case sink to accept cases from the given DICT with
677    class CLASS and auxiliary data AUX. */
678 struct case_sink *
679 create_case_sink (const struct case_sink_class *class,
680                   const struct dictionary *dict,
681                   void *aux) 
682 {
683   struct case_sink *sink = xmalloc (sizeof *sink);
684   sink->class = class;
685   sink->value_cnt = dict_get_compacted_value_cnt (dict);
686   sink->aux = aux;
687   return sink;
688 }
689
690 /* Destroys case sink SINK.  */
691 void
692 free_case_sink (struct case_sink *sink) 
693 {
694   if (sink != NULL) 
695     {
696       if (sink->class->destroy != NULL)
697         sink->class->destroy (sink);
698       free (sink); 
699     }
700 }
701 \f
702 /* Represents auxiliary data for handling SPLIT FILE. */
703 struct split_aux_data 
704   {
705     size_t case_count;          /* Number of cases so far. */
706     struct ccase prev_case;     /* Data in previous case. */
707
708     /* Functions to call... */
709     void (*begin_func) (void *);               /* ...before data. */
710     int (*proc_func) (struct ccase *, void *); /* ...with data. */
711     void (*end_func) (void *);                 /* ...after data. */
712     void *func_aux;                            /* Auxiliary data. */ 
713   };
714
715 static int equal_splits (const struct ccase *, const struct ccase *);
716 static int procedure_with_splits_callback (struct ccase *, void *);
717 static void dump_splits (struct ccase *);
718
719 /* Like procedure(), but it automatically breaks the case stream
720    into SPLIT FILE break groups.  Before each group of cases with
721    identical SPLIT FILE variable values, BEGIN_FUNC is called.
722    Then PROC_FUNC is called with each case in the group.  
723    END_FUNC is called when the group is finished.  FUNC_AUX is
724    passed to each of the functions as auxiliary data.
725
726    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
727    and END_FUNC will be called at all. 
728
729    If SPLIT FILE is not in effect, then there is one break group
730    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
731    will be called once. */
732 void
733 procedure_with_splits (void (*begin_func) (void *aux),
734                        int (*proc_func) (struct ccase *, void *aux),
735                        void (*end_func) (void *aux),
736                        void *func_aux) 
737 {
738   struct split_aux_data split_aux;
739
740   split_aux.case_count = 0;
741   case_nullify (&split_aux.prev_case);
742   split_aux.begin_func = begin_func;
743   split_aux.proc_func = proc_func;
744   split_aux.end_func = end_func;
745   split_aux.func_aux = func_aux;
746
747   open_active_file ();
748   internal_procedure (procedure_with_splits_callback, &split_aux);
749   if (split_aux.case_count > 0 && end_func != NULL)
750     end_func (func_aux);
751   close_active_file ();
752
753   case_destroy (&split_aux.prev_case);
754 }
755
756 /* procedure() callback used by procedure_with_splits(). */
757 static int
758 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
759 {
760   struct split_aux_data *split_aux = split_aux_;
761
762   /* Start a new series if needed. */
763   if (split_aux->case_count == 0
764       || !equal_splits (c, &split_aux->prev_case))
765     {
766       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
767         split_aux->end_func (split_aux->func_aux);
768
769       dump_splits (c);
770       case_destroy (&split_aux->prev_case);
771       case_clone (&split_aux->prev_case, c);
772
773       if (split_aux->begin_func != NULL)
774         split_aux->begin_func (split_aux->func_aux);
775     }
776
777   split_aux->case_count++;
778   if (split_aux->proc_func != NULL)
779     return split_aux->proc_func (c, split_aux->func_aux);
780   else
781     return 1;
782 }
783
784 /* Compares the SPLIT FILE variables in cases A and B and returns
785    nonzero only if they differ. */
786 static int
787 equal_splits (const struct ccase *a, const struct ccase *b) 
788 {
789   return case_compare (a, b,
790                        dict_get_split_vars (default_dict),
791                        dict_get_split_cnt (default_dict)) == 0;
792 }
793
794 /* Dumps out the values of all the split variables for the case C. */
795 static void
796 dump_splits (struct ccase *c)
797 {
798   struct variable *const *split;
799   struct tab_table *t;
800   size_t split_cnt;
801   int i;
802
803   split_cnt = dict_get_split_cnt (default_dict);
804   if (split_cnt == 0)
805     return;
806
807   t = tab_create (3, split_cnt + 1, 0);
808   tab_dim (t, tab_natural_dimensions);
809   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
810   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
811   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
812   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
813   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
814   split = dict_get_split_vars (default_dict);
815   for (i = 0; i < split_cnt; i++)
816     {
817       struct variable *v = split[i];
818       char temp_buf[80];
819       const char *val_lab;
820
821       assert (v->type == NUMERIC || v->type == ALPHA);
822       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
823       
824       data_out (temp_buf, &v->print, case_data (c, v->fv));
825       
826       temp_buf[v->print.w] = 0;
827       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
828
829       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
830       if (val_lab)
831         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
832     }
833   tab_flags (t, SOMF_NO_TITLE);
834   tab_submit (t);
835 }
836 \f
837 /* Represents auxiliary data for handling SPLIT FILE in a
838    multipass procedure. */
839 struct multipass_split_aux_data 
840   {
841     struct ccase prev_case;     /* Data in previous case. */
842     struct casefile *casefile;  /* Accumulates data for a split. */
843
844     /* Function to call with the accumulated data. */
845     void (*split_func) (const struct casefile *, void *);
846     void *func_aux;                            /* Auxiliary data. */ 
847   };
848
849 static int multipass_split_callback (struct ccase *c, void *aux_);
850 static void multipass_split_output (struct multipass_split_aux_data *);
851
852 void
853 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
854                                                      void *),
855                                  void *func_aux) 
856 {
857   struct multipass_split_aux_data aux;
858
859   assert (split_func != NULL);
860
861   open_active_file ();
862
863   case_nullify (&aux.prev_case);
864   aux.casefile = NULL;
865   aux.split_func = split_func;
866   aux.func_aux = func_aux;
867
868   internal_procedure (multipass_split_callback, &aux);
869   if (aux.casefile != NULL)
870     multipass_split_output (&aux);
871   case_destroy (&aux.prev_case);
872
873   close_active_file ();
874 }
875
876 /* procedure() callback used by multipass_procedure_with_splits(). */
877 static int
878 multipass_split_callback (struct ccase *c, void *aux_)
879 {
880   struct multipass_split_aux_data *aux = aux_;
881
882   /* Start a new series if needed. */
883   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
884     {
885       /* Pass any cases to split_func. */
886       if (aux->casefile != NULL)
887         multipass_split_output (aux);
888
889       /* Start a new casefile. */
890       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
891
892       /* Record split values. */
893       dump_splits (c);
894       case_destroy (&aux->prev_case);
895       case_clone (&aux->prev_case, c);
896     }
897
898   casefile_append (aux->casefile, c);
899
900   return 1;
901 }
902
903 static void
904 multipass_split_output (struct multipass_split_aux_data *aux)
905 {
906   assert (aux->casefile != NULL);
907   aux->split_func (aux->casefile, aux->func_aux);
908   casefile_destroy (aux->casefile);
909   aux->casefile = NULL;
910 }
911
912
913 /* Discards all the current state in preparation for a data-input
914    command like DATA LIST or GET. */
915 void
916 discard_variables (void)
917 {
918   dict_clear (default_dict);
919   default_handle = NULL;
920
921   n_lag = 0;
922   
923   if (vfm_source != NULL)
924     {
925       free_case_source (vfm_source);
926       vfm_source = NULL;
927     }
928
929   cancel_transformations ();
930
931   ctl_stack = NULL;
932
933   expr_free (process_if_expr);
934   process_if_expr = NULL;
935
936   cancel_temporary ();
937
938   pgm_state = STATE_INIT;
939 }