Continue work on bug 12859, plus some code cleanup.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   if (proc_func == NULL
129       && case_source_is_class (vfm_source, &storage_source_class)
130       && vfm_sink == NULL
131       && !temporary
132       && n_trns == 0)
133     {
134       /* Nothing to do. */
135       return;
136     }
137
138   open_active_file ();
139   internal_procedure (proc_func, aux);
140   close_active_file ();
141 }
142
143 /* Executes a procedure, as procedure(), except that the caller
144    is responsible for calling open_active_file() and
145    close_active_file(). */
146 static void
147 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
148 {
149   static int recursive_call;
150
151   struct write_case_data wc_data;
152
153   assert (++recursive_call == 1);
154
155   wc_data.proc_func = proc_func;
156   wc_data.aux = aux;
157   create_trns_case (&wc_data.trns_case, default_dict);
158   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
159   wc_data.cases_written = 0;
160
161   last_vfm_invocation = time (NULL);
162
163   if (vfm_source != NULL) 
164     vfm_source->class->read (vfm_source,
165                              &wc_data.trns_case,
166                              write_case, &wc_data);
167
168   case_destroy (&wc_data.sink_case);
169   case_destroy (&wc_data.trns_case);
170
171   assert (--recursive_call == 0);
172 }
173
174 /* Creates and returns a case, initializing it from the vectors
175    that say which `value's need to be initialized just once, and
176    which ones need to be re-initialized before every case. */
177 static void
178 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
179 {
180   size_t var_cnt = dict_get_var_cnt (dict);
181   size_t i;
182
183   case_create (trns_case, dict_get_next_value_idx (dict));
184   for (i = 0; i < var_cnt; i++) 
185     {
186       struct variable *v = dict_get_var (dict, i);
187       union value *value = case_data_rw (trns_case, v->fv);
188
189       if (v->type == NUMERIC)
190         value->f = v->reinit ? 0.0 : SYSMIS;
191       else
192         memset (value->s, ' ', v->width);
193     }
194 }
195
196 /* Makes all preparations for reading from the data source and writing
197    to the data sink. */
198 static void
199 open_active_file (void)
200 {
201   /* Make temp_dict refer to the dictionary right before data
202      reaches the sink */
203   if (!temporary)
204     {
205       temp_trns = n_trns;
206       temp_dict = default_dict;
207     }
208
209   /* Figure out compaction. */
210   compaction_necessary = (dict_get_next_value_idx (temp_dict)
211                           != dict_get_compacted_value_cnt (temp_dict));
212
213   /* Prepare sink. */
214   if (vfm_sink == NULL)
215     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
216   if (vfm_sink->class->open != NULL)
217     vfm_sink->class->open (vfm_sink);
218
219   /* Allocate memory for lag queue. */
220   if (n_lag > 0)
221     {
222       int i;
223   
224       lag_count = 0;
225       lag_head = 0;
226       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
227       for (i = 0; i < n_lag; i++)
228         case_nullify (&lag_queue[i]);
229     }
230
231   /* Close any unclosed DO IF or LOOP constructs. */
232   discard_ctl_stack ();
233 }
234
235 /* Transforms trns_case and writes it to the replacement active
236    file if advisable.  Returns nonzero if more cases can be
237    accepted, zero otherwise.  Do not call this function again
238    after it has returned zero once.  */
239 static int
240 write_case (struct write_case_data *wc_data)
241 {
242   /* Execute permanent transformations.  */
243   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
244                                 wc_data->cases_written + 1))
245     goto done;
246
247   /* N OF CASES. */
248   if (dict_get_case_limit (default_dict)
249       && wc_data->cases_written >= dict_get_case_limit (default_dict))
250     goto done;
251   wc_data->cases_written++;
252
253   /* Write case to LAG queue. */
254   if (n_lag)
255     lag_case (&wc_data->trns_case);
256
257   /* Write case to replacement active file. */
258   if (vfm_sink->class->write != NULL) 
259     {
260       if (compaction_necessary) 
261         {
262           dict_compact_case (temp_dict, &wc_data->sink_case, &wc_data->trns_case);
263           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
264         }
265       else
266         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
267     }
268   
269   /* Execute temporary transformations. */
270   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
271                                 wc_data->cases_written))
272     goto done;
273   
274   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
275   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
276       || (dict_get_case_limit (temp_dict)
277           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
278     goto done;
279   wc_data->cases_analyzed++;
280
281   /* Pass case to procedure. */
282   if (wc_data->proc_func != NULL)
283     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
284
285  done:
286   clear_case (&wc_data->trns_case);
287   return 1;
288 }
289
290 /* Transforms case C using the transformations in TRNS[] with
291    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
292    become case CASE_NUM (1-based) in the output file.  Returns
293    zero if the case was filtered out by one of the
294    transformations, nonzero otherwise. */
295 static int
296 execute_transformations (struct ccase *c,
297                          struct trns_header **trns,
298                          int first_idx, int last_idx,
299                          int case_num) 
300 {
301   int idx;
302
303   for (idx = first_idx; idx != last_idx; )
304     {
305       int retval = trns[idx]->proc (trns[idx], c, case_num);
306       switch (retval)
307         {
308         case -1:
309           idx++;
310           break;
311           
312         case -2:
313           return 0;
314           
315         default:
316           idx = retval;
317           break;
318         }
319     }
320
321   return 1;
322 }
323
324 /* Returns nonzero if case C with case number CASE_NUM should be
325    exclude as specified on FILTER or PROCESS IF, otherwise
326    zero. */
327 static int
328 filter_case (const struct ccase *c, int case_idx)
329 {
330   /* FILTER. */
331   struct variable *filter_var = dict_get_filter (default_dict);
332   if (filter_var != NULL) 
333     {
334       double f = case_num (c, filter_var->fv);
335       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
336         return 1;
337     }
338
339   /* PROCESS IF. */
340   if (process_if_expr != NULL
341       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
342     return 1;
343
344   return 0;
345 }
346
347 /* Add C to the lag queue. */
348 static void
349 lag_case (const struct ccase *c)
350 {
351   if (lag_count < n_lag)
352     lag_count++;
353   case_destroy (&lag_queue[lag_head]);
354   case_clone (&lag_queue[lag_head], c);
355   if (++lag_head >= n_lag)
356     lag_head = 0;
357 }
358
359 /* Clears the variables in C that need to be cleared between
360    processing cases.  */
361 static void
362 clear_case (struct ccase *c)
363 {
364   size_t var_cnt = dict_get_var_cnt (default_dict);
365   size_t i;
366   
367   for (i = 0; i < var_cnt; i++) 
368     {
369       struct variable *v = dict_get_var (default_dict, i);
370       if (v->init && v->reinit) 
371         {
372           if (v->type == NUMERIC)
373             case_data_rw (c, v->fv)->f = SYSMIS;
374           else
375             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
376         } 
377     }
378 }
379
380 /* Closes the active file. */
381 static void
382 close_active_file (void)
383 {
384   /* Free memory for lag queue, and turn off lagging. */
385   if (n_lag > 0)
386     {
387       int i;
388       
389       for (i = 0; i < n_lag; i++)
390         case_destroy (&lag_queue[i]);
391       free (lag_queue);
392       n_lag = 0;
393     }
394   
395   /* Dictionary from before TEMPORARY becomes permanent.. */
396   if (temporary)
397     {
398       dict_destroy (default_dict);
399       default_dict = temp_dict;
400       temp_dict = NULL;
401     }
402
403   /* Finish compaction. */
404   if (compaction_necessary)
405     dict_compact_values (default_dict);
406     
407   /* Free data source. */
408   if (vfm_source != NULL) 
409     {
410       free_case_source (vfm_source);
411       vfm_source = NULL;
412     }
413
414   /* Old data sink becomes new data source. */
415   if (vfm_sink->class->make_source != NULL)
416     vfm_source = vfm_sink->class->make_source (vfm_sink);
417   free_case_sink (vfm_sink);
418   vfm_sink = NULL;
419
420   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
421      and get rid of all the transformations. */
422   cancel_temporary ();
423   expr_free (process_if_expr);
424   process_if_expr = NULL;
425   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
426     dict_set_filter (default_dict, NULL);
427   dict_set_case_limit (default_dict, 0);
428   dict_clear_vectors (default_dict);
429   cancel_transformations ();
430 }
431 \f
432 /* Storage case stream. */
433
434 /* Information about storage sink or source. */
435 struct storage_stream_info 
436   {
437     struct casefile *casefile;  /* Storage. */
438   };
439
440 /* Initializes a storage sink. */
441 static void
442 storage_sink_open (struct case_sink *sink)
443 {
444   struct storage_stream_info *info;
445
446   sink->aux = info = xmalloc (sizeof *info);
447   info->casefile = casefile_create (sink->value_cnt);
448 }
449
450 /* Destroys storage stream represented by INFO. */
451 static void
452 destroy_storage_stream_info (struct storage_stream_info *info) 
453 {
454   if (info != NULL) 
455     {
456       casefile_destroy (info->casefile);
457       free (info); 
458     }
459 }
460
461 /* Writes case C to the storage sink SINK. */
462 static void
463 storage_sink_write (struct case_sink *sink, const struct ccase *c)
464 {
465   struct storage_stream_info *info = sink->aux;
466
467   casefile_append (info->casefile, c);
468 }
469
470 /* Destroys internal data in SINK. */
471 static void
472 storage_sink_destroy (struct case_sink *sink)
473 {
474   destroy_storage_stream_info (sink->aux);
475 }
476
477 /* Closes the sink and returns a storage source to read back the
478    written data. */
479 static struct case_source *
480 storage_sink_make_source (struct case_sink *sink) 
481 {
482   struct case_source *source
483     = create_case_source (&storage_source_class, sink->aux);
484   sink->aux = NULL;
485   return source;
486 }
487
488 /* Storage sink. */
489 const struct case_sink_class storage_sink_class = 
490   {
491     "storage",
492     storage_sink_open,
493     storage_sink_write,
494     storage_sink_destroy,
495     storage_sink_make_source,
496   };
497 \f
498 /* Storage source. */
499
500 /* Returns the number of cases that will be read by
501    storage_source_read(). */
502 static int
503 storage_source_count (const struct case_source *source) 
504 {
505   struct storage_stream_info *info = source->aux;
506
507   return casefile_get_case_cnt (info->casefile);
508 }
509
510 /* Reads all cases from the storage source and passes them one by one to
511    write_case(). */
512 static void
513 storage_source_read (struct case_source *source,
514                      struct ccase *output_case,
515                      write_case_func *write_case, write_case_data wc_data)
516 {
517   struct storage_stream_info *info = source->aux;
518   struct ccase casefile_case;
519   struct casereader *reader;
520
521   for (reader = casefile_get_reader (info->casefile);
522        casereader_read (reader, &casefile_case);
523        case_destroy (&casefile_case))
524     {
525       case_copy (output_case, 0,
526                  &casefile_case, 0,
527                  casefile_get_value_cnt (info->casefile));
528       write_case (wc_data);
529     }
530   casereader_destroy (reader);
531 }
532
533 /* Destroys the source's internal data. */
534 static void
535 storage_source_destroy (struct case_source *source)
536 {
537   destroy_storage_stream_info (source->aux);
538 }
539
540 /* Storage source. */
541 const struct case_source_class storage_source_class = 
542   {
543     "storage",
544     storage_source_count,
545     storage_source_read,
546     storage_source_destroy,
547   };
548
549 struct casefile *
550 storage_source_get_casefile (struct case_source *source) 
551 {
552   struct storage_stream_info *info = source->aux;
553
554   assert (source->class == &storage_source_class);
555   return info->casefile;
556 }
557
558 struct case_source *
559 storage_source_create (struct casefile *cf)
560 {
561   struct storage_stream_info *info;
562
563   info = xmalloc (sizeof *info);
564   info->casefile = cf;
565
566   return create_case_source (&storage_source_class, info);
567 }
568 \f
569 /* Null sink.  Used by a few procedures that keep track of output
570    themselves and would throw away anything that the sink
571    contained anyway. */
572
573 const struct case_sink_class null_sink_class = 
574   {
575     "null",
576     NULL,
577     NULL,
578     NULL,
579     NULL,
580   };
581 \f
582 /* Returns a pointer to the lagged case from N_BEFORE cases before the
583    current one, or NULL if there haven't been that many cases yet. */
584 struct ccase *
585 lagged_case (int n_before)
586 {
587   assert (n_before >= 1 );
588   assert (n_before <= n_lag);
589
590   if (n_before <= lag_count)
591     {
592       int index = lag_head - n_before;
593       if (index < 0)
594         index += n_lag;
595       return &lag_queue[index];
596     }
597   else
598     return NULL;
599 }
600    
601 /* Appends TRNS to t_trns[], the list of all transformations to be
602    performed on data as it is read from the active file. */
603 void
604 add_transformation (struct trns_header * trns)
605 {
606   if (n_trns >= m_trns)
607     {
608       m_trns += 16;
609       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
610     }
611   t_trns[n_trns] = trns;
612   trns->index = n_trns++;
613 }
614
615 /* Cancels all active transformations, including any transformations
616    created by the input program. */
617 void
618 cancel_transformations (void)
619 {
620   int i;
621   for (i = 0; i < n_trns; i++)
622     {
623       if (t_trns[i]->free)
624         t_trns[i]->free (t_trns[i]);
625       free (t_trns[i]);
626     }
627   n_trns = f_trns = 0;
628   free (t_trns);
629   t_trns = NULL;
630   m_trns = 0;
631 }
632 \f
633 /* Creates a case source with class CLASS and auxiliary data AUX
634    and based on dictionary DICT. */
635 struct case_source *
636 create_case_source (const struct case_source_class *class,
637                     void *aux) 
638 {
639   struct case_source *source = xmalloc (sizeof *source);
640   source->class = class;
641   source->aux = aux;
642   return source;
643 }
644
645 /* Destroys case source SOURCE.  It is the caller's responsible to
646    call the source's destroy function, if any. */
647 void
648 free_case_source (struct case_source *source) 
649 {
650   if (source != NULL) 
651     {
652       if (source->class->destroy != NULL)
653         source->class->destroy (source);
654       free (source);
655     }
656 }
657
658 /* Returns nonzero if a case source is "complex". */
659 int
660 case_source_is_complex (const struct case_source *source) 
661 {
662   return source != NULL && (source->class == &input_program_source_class
663                             || source->class == &file_type_source_class);
664 }
665
666 /* Returns nonzero if CLASS is the class of SOURCE. */
667 int
668 case_source_is_class (const struct case_source *source,
669                       const struct case_source_class *class) 
670 {
671   return source != NULL && source->class == class;
672 }
673
674 /* Creates a case sink to accept cases from the given DICT with
675    class CLASS and auxiliary data AUX. */
676 struct case_sink *
677 create_case_sink (const struct case_sink_class *class,
678                   const struct dictionary *dict,
679                   void *aux) 
680 {
681   struct case_sink *sink = xmalloc (sizeof *sink);
682   sink->class = class;
683   sink->value_cnt = dict_get_compacted_value_cnt (dict);
684   sink->aux = aux;
685   return sink;
686 }
687
688 /* Destroys case sink SINK.  */
689 void
690 free_case_sink (struct case_sink *sink) 
691 {
692   if (sink != NULL) 
693     {
694       if (sink->class->destroy != NULL)
695         sink->class->destroy (sink);
696       free (sink); 
697     }
698 }
699 \f
700 /* Represents auxiliary data for handling SPLIT FILE. */
701 struct split_aux_data 
702   {
703     size_t case_count;          /* Number of cases so far. */
704     struct ccase prev_case;     /* Data in previous case. */
705
706     /* Functions to call... */
707     void (*begin_func) (void *);               /* ...before data. */
708     int (*proc_func) (struct ccase *, void *); /* ...with data. */
709     void (*end_func) (void *);                 /* ...after data. */
710     void *func_aux;                            /* Auxiliary data. */ 
711   };
712
713 static int equal_splits (const struct ccase *, const struct ccase *);
714 static int procedure_with_splits_callback (struct ccase *, void *);
715 static void dump_splits (struct ccase *);
716
717 /* Like procedure(), but it automatically breaks the case stream
718    into SPLIT FILE break groups.  Before each group of cases with
719    identical SPLIT FILE variable values, BEGIN_FUNC is called.
720    Then PROC_FUNC is called with each case in the group.  
721    END_FUNC is called when the group is finished.  FUNC_AUX is
722    passed to each of the functions as auxiliary data.
723
724    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
725    and END_FUNC will be called at all. 
726
727    If SPLIT FILE is not in effect, then there is one break group
728    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
729    will be called once. */
730 void
731 procedure_with_splits (void (*begin_func) (void *aux),
732                        int (*proc_func) (struct ccase *, void *aux),
733                        void (*end_func) (void *aux),
734                        void *func_aux) 
735 {
736   struct split_aux_data split_aux;
737
738   split_aux.case_count = 0;
739   case_nullify (&split_aux.prev_case);
740   split_aux.begin_func = begin_func;
741   split_aux.proc_func = proc_func;
742   split_aux.end_func = end_func;
743   split_aux.func_aux = func_aux;
744
745   open_active_file ();
746   internal_procedure (procedure_with_splits_callback, &split_aux);
747   if (split_aux.case_count > 0 && end_func != NULL)
748     end_func (func_aux);
749   close_active_file ();
750
751   case_destroy (&split_aux.prev_case);
752 }
753
754 /* procedure() callback used by procedure_with_splits(). */
755 static int
756 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
757 {
758   struct split_aux_data *split_aux = split_aux_;
759
760   /* Start a new series if needed. */
761   if (split_aux->case_count == 0
762       || !equal_splits (c, &split_aux->prev_case))
763     {
764       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
765         split_aux->end_func (split_aux->func_aux);
766
767       dump_splits (c);
768       case_destroy (&split_aux->prev_case);
769       case_clone (&split_aux->prev_case, c);
770
771       if (split_aux->begin_func != NULL)
772         split_aux->begin_func (split_aux->func_aux);
773     }
774
775   split_aux->case_count++;
776   if (split_aux->proc_func != NULL)
777     return split_aux->proc_func (c, split_aux->func_aux);
778   else
779     return 1;
780 }
781
782 /* Compares the SPLIT FILE variables in cases A and B and returns
783    nonzero only if they differ. */
784 static int
785 equal_splits (const struct ccase *a, const struct ccase *b) 
786 {
787   return case_compare (a, b,
788                        dict_get_split_vars (default_dict),
789                        dict_get_split_cnt (default_dict)) == 0;
790 }
791
792 /* Dumps out the values of all the split variables for the case C. */
793 static void
794 dump_splits (struct ccase *c)
795 {
796   struct variable *const *split;
797   struct tab_table *t;
798   size_t split_cnt;
799   int i;
800
801   split_cnt = dict_get_split_cnt (default_dict);
802   if (split_cnt == 0)
803     return;
804
805   t = tab_create (3, split_cnt + 1, 0);
806   tab_dim (t, tab_natural_dimensions);
807   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
808   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
809   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
810   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
811   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
812   split = dict_get_split_vars (default_dict);
813   for (i = 0; i < split_cnt; i++)
814     {
815       struct variable *v = split[i];
816       char temp_buf[80];
817       const char *val_lab;
818
819       assert (v->type == NUMERIC || v->type == ALPHA);
820       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
821       
822       data_out (temp_buf, &v->print, case_data (c, v->fv));
823       
824       temp_buf[v->print.w] = 0;
825       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
826
827       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
828       if (val_lab)
829         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
830     }
831   tab_flags (t, SOMF_NO_TITLE);
832   tab_submit (t);
833 }
834 \f
835 /* Represents auxiliary data for handling SPLIT FILE in a
836    multipass procedure. */
837 struct multipass_split_aux_data 
838   {
839     struct ccase prev_case;     /* Data in previous case. */
840     struct casefile *casefile;  /* Accumulates data for a split. */
841
842     /* Function to call with the accumulated data. */
843     void (*split_func) (const struct casefile *, void *);
844     void *func_aux;                            /* Auxiliary data. */ 
845   };
846
847 static int multipass_split_callback (struct ccase *c, void *aux_);
848 static void multipass_split_output (struct multipass_split_aux_data *);
849
850 void
851 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
852                                                      void *),
853                                  void *func_aux) 
854 {
855   struct multipass_split_aux_data aux;
856
857   assert (split_func != NULL);
858
859   open_active_file ();
860
861   case_nullify (&aux.prev_case);
862   aux.casefile = NULL;
863   aux.split_func = split_func;
864   aux.func_aux = func_aux;
865
866   internal_procedure (multipass_split_callback, &aux);
867   if (aux.casefile != NULL)
868     multipass_split_output (&aux);
869   case_destroy (&aux.prev_case);
870
871   close_active_file ();
872 }
873
874 /* procedure() callback used by multipass_procedure_with_splits(). */
875 static int
876 multipass_split_callback (struct ccase *c, void *aux_)
877 {
878   struct multipass_split_aux_data *aux = aux_;
879
880   /* Start a new series if needed. */
881   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
882     {
883       /* Pass any cases to split_func. */
884       if (aux->casefile != NULL)
885         multipass_split_output (aux);
886
887       /* Start a new casefile. */
888       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
889
890       /* Record split values. */
891       dump_splits (c);
892       case_destroy (&aux->prev_case);
893       case_clone (&aux->prev_case, c);
894     }
895
896   casefile_append (aux->casefile, c);
897
898   return 1;
899 }
900
901 static void
902 multipass_split_output (struct multipass_split_aux_data *aux)
903 {
904   assert (aux->casefile != NULL);
905   aux->split_func (aux->casefile, aux->func_aux);
906   casefile_destroy (aux->casefile);
907   aux->casefile = NULL;
908 }