3b403995e70ea0ce54ec6a39067c9a818010a12a
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   if (proc_func == NULL
129       && case_source_is_class (vfm_source, &storage_source_class)
130       && vfm_sink == NULL
131       && !temporary
132       && n_trns == 0)
133     {
134       /* Nothing to do. */
135       return;
136     }
137
138   open_active_file ();
139   internal_procedure (proc_func, aux);
140   close_active_file ();
141 }
142
143 /* Executes a procedure, as procedure(), except that the caller
144    is responsible for calling open_active_file() and
145    close_active_file(). */
146 static void
147 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
148 {
149   static int recursive_call;
150
151   struct write_case_data wc_data;
152
153   assert (++recursive_call == 1);
154
155   wc_data.proc_func = proc_func;
156   wc_data.aux = aux;
157   create_trns_case (&wc_data.trns_case, default_dict);
158   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
159   wc_data.cases_written = 0;
160
161   last_vfm_invocation = time (NULL);
162
163   if (vfm_source != NULL) 
164     vfm_source->class->read (vfm_source,
165                              &wc_data.trns_case,
166                              write_case, &wc_data);
167
168   case_destroy (&wc_data.sink_case);
169   case_destroy (&wc_data.trns_case);
170
171   assert (--recursive_call == 0);
172 }
173
174 /* Creates and returns a case, initializing it from the vectors
175    that say which `value's need to be initialized just once, and
176    which ones need to be re-initialized before every case. */
177 static void
178 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
179 {
180   size_t var_cnt = dict_get_var_cnt (dict);
181   size_t i;
182
183   case_create (trns_case, dict_get_next_value_idx (dict));
184   for (i = 0; i < var_cnt; i++) 
185     {
186       struct variable *v = dict_get_var (dict, i);
187       union value *value = case_data_rw (trns_case, v->fv);
188
189       if (v->type == NUMERIC)
190         value->f = v->reinit ? 0.0 : SYSMIS;
191       else
192         memset (value->s, ' ', v->width);
193     }
194 }
195
196 /* Makes all preparations for reading from the data source and writing
197    to the data sink. */
198 static void
199 open_active_file (void)
200 {
201   /* Make temp_dict refer to the dictionary right before data
202      reaches the sink */
203   if (!temporary)
204     {
205       temp_trns = n_trns;
206       temp_dict = default_dict;
207     }
208
209   /* Figure out compaction. */
210   compaction_necessary = (dict_get_next_value_idx (temp_dict)
211                           != dict_get_compacted_value_cnt (temp_dict));
212
213   /* Prepare sink. */
214   if (vfm_sink == NULL)
215     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
216   if (vfm_sink->class->open != NULL)
217     vfm_sink->class->open (vfm_sink);
218
219   /* Allocate memory for lag queue. */
220   if (n_lag > 0)
221     {
222       int i;
223   
224       lag_count = 0;
225       lag_head = 0;
226       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
227       for (i = 0; i < n_lag; i++)
228         case_nullify (&lag_queue[i]);
229     }
230
231   /* Close any unclosed DO IF or LOOP constructs. */
232   discard_ctl_stack ();
233 }
234
235 /* Transforms trns_case and writes it to the replacement active
236    file if advisable.  Returns nonzero if more cases can be
237    accepted, zero otherwise.  Do not call this function again
238    after it has returned zero once.  */
239 static int
240 write_case (struct write_case_data *wc_data)
241 {
242   /* Execute permanent transformations.  */
243   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
244                                 wc_data->cases_written + 1))
245     goto done;
246
247   /* N OF CASES. */
248   if (dict_get_case_limit (default_dict)
249       && wc_data->cases_written >= dict_get_case_limit (default_dict))
250     goto done;
251   wc_data->cases_written++;
252
253   /* Write case to LAG queue. */
254   if (n_lag)
255     lag_case (&wc_data->trns_case);
256
257   /* Write case to replacement active file. */
258   if (vfm_sink->class->write != NULL) 
259     {
260       if (compaction_necessary) 
261         {
262           dict_compact_case (temp_dict, &wc_data->sink_case,
263                              &wc_data->trns_case);
264           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
265         }
266       else
267         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
268     }
269   
270   /* Execute temporary transformations. */
271   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
272                                 wc_data->cases_written))
273     goto done;
274   
275   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
276   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
277       || (dict_get_case_limit (temp_dict)
278           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
279     goto done;
280   wc_data->cases_analyzed++;
281
282   /* Pass case to procedure. */
283   if (wc_data->proc_func != NULL)
284     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
285
286  done:
287   clear_case (&wc_data->trns_case);
288   return 1;
289 }
290
291 /* Transforms case C using the transformations in TRNS[] with
292    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
293    become case CASE_NUM (1-based) in the output file.  Returns
294    zero if the case was filtered out by one of the
295    transformations, nonzero otherwise. */
296 static int
297 execute_transformations (struct ccase *c,
298                          struct trns_header **trns,
299                          int first_idx, int last_idx,
300                          int case_num) 
301 {
302   int idx;
303
304   for (idx = first_idx; idx != last_idx; )
305     {
306       int retval = trns[idx]->proc (trns[idx], c, case_num);
307       switch (retval)
308         {
309         case -1:
310           idx++;
311           break;
312           
313         case -2:
314           return 0;
315           
316         default:
317           idx = retval;
318           break;
319         }
320     }
321
322   return 1;
323 }
324
325 /* Returns nonzero if case C with case number CASE_NUM should be
326    exclude as specified on FILTER or PROCESS IF, otherwise
327    zero. */
328 static int
329 filter_case (const struct ccase *c, int case_idx)
330 {
331   /* FILTER. */
332   struct variable *filter_var = dict_get_filter (default_dict);
333   if (filter_var != NULL) 
334     {
335       double f = case_num (c, filter_var->fv);
336       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
337         return 1;
338     }
339
340   /* PROCESS IF. */
341   if (process_if_expr != NULL
342       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
343     return 1;
344
345   return 0;
346 }
347
348 /* Add C to the lag queue. */
349 static void
350 lag_case (const struct ccase *c)
351 {
352   if (lag_count < n_lag)
353     lag_count++;
354   case_destroy (&lag_queue[lag_head]);
355   case_clone (&lag_queue[lag_head], c);
356   if (++lag_head >= n_lag)
357     lag_head = 0;
358 }
359
360 /* Clears the variables in C that need to be cleared between
361    processing cases.  */
362 static void
363 clear_case (struct ccase *c)
364 {
365   size_t var_cnt = dict_get_var_cnt (default_dict);
366   size_t i;
367   
368   for (i = 0; i < var_cnt; i++) 
369     {
370       struct variable *v = dict_get_var (default_dict, i);
371       if (v->init && v->reinit) 
372         {
373           if (v->type == NUMERIC)
374             case_data_rw (c, v->fv)->f = SYSMIS;
375           else
376             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
377         } 
378     }
379 }
380
381 /* Closes the active file. */
382 static void
383 close_active_file (void)
384 {
385   /* Free memory for lag queue, and turn off lagging. */
386   if (n_lag > 0)
387     {
388       int i;
389       
390       for (i = 0; i < n_lag; i++)
391         case_destroy (&lag_queue[i]);
392       free (lag_queue);
393       n_lag = 0;
394     }
395   
396   /* Dictionary from before TEMPORARY becomes permanent.. */
397   if (temporary)
398     {
399       dict_destroy (default_dict);
400       default_dict = temp_dict;
401       temp_dict = NULL;
402     }
403
404   /* Finish compaction. */
405   if (compaction_necessary)
406     dict_compact_values (default_dict);
407     
408   /* Free data source. */
409   free_case_source (vfm_source);
410   vfm_source = NULL;
411
412   /* Old data sink becomes new data source. */
413   if (vfm_sink->class->make_source != NULL)
414     vfm_source = vfm_sink->class->make_source (vfm_sink);
415   free_case_sink (vfm_sink);
416   vfm_sink = NULL;
417
418   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
419      and get rid of all the transformations. */
420   cancel_temporary ();
421   expr_free (process_if_expr);
422   process_if_expr = NULL;
423   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
424     dict_set_filter (default_dict, NULL);
425   dict_set_case_limit (default_dict, 0);
426   dict_clear_vectors (default_dict);
427   cancel_transformations ();
428 }
429 \f
430 /* Storage case stream. */
431
432 /* Information about storage sink or source. */
433 struct storage_stream_info 
434   {
435     struct casefile *casefile;  /* Storage. */
436   };
437
438 /* Initializes a storage sink. */
439 static void
440 storage_sink_open (struct case_sink *sink)
441 {
442   struct storage_stream_info *info;
443
444   sink->aux = info = xmalloc (sizeof *info);
445   info->casefile = casefile_create (sink->value_cnt);
446 }
447
448 /* Destroys storage stream represented by INFO. */
449 static void
450 destroy_storage_stream_info (struct storage_stream_info *info) 
451 {
452   if (info != NULL) 
453     {
454       casefile_destroy (info->casefile);
455       free (info); 
456     }
457 }
458
459 /* Writes case C to the storage sink SINK. */
460 static void
461 storage_sink_write (struct case_sink *sink, const struct ccase *c)
462 {
463   struct storage_stream_info *info = sink->aux;
464
465   casefile_append (info->casefile, c);
466 }
467
468 /* Destroys internal data in SINK. */
469 static void
470 storage_sink_destroy (struct case_sink *sink)
471 {
472   destroy_storage_stream_info (sink->aux);
473 }
474
475 /* Closes the sink and returns a storage source to read back the
476    written data. */
477 static struct case_source *
478 storage_sink_make_source (struct case_sink *sink) 
479 {
480   struct case_source *source
481     = create_case_source (&storage_source_class, sink->aux);
482   sink->aux = NULL;
483   return source;
484 }
485
486 /* Storage sink. */
487 const struct case_sink_class storage_sink_class = 
488   {
489     "storage",
490     storage_sink_open,
491     storage_sink_write,
492     storage_sink_destroy,
493     storage_sink_make_source,
494   };
495 \f
496 /* Storage source. */
497
498 /* Returns the number of cases that will be read by
499    storage_source_read(). */
500 static int
501 storage_source_count (const struct case_source *source) 
502 {
503   struct storage_stream_info *info = source->aux;
504
505   return casefile_get_case_cnt (info->casefile);
506 }
507
508 /* Reads all cases from the storage source and passes them one by one to
509    write_case(). */
510 static void
511 storage_source_read (struct case_source *source,
512                      struct ccase *output_case,
513                      write_case_func *write_case, write_case_data wc_data)
514 {
515   struct storage_stream_info *info = source->aux;
516   struct ccase casefile_case;
517   struct casereader *reader;
518
519   for (reader = casefile_get_reader (info->casefile);
520        casereader_read (reader, &casefile_case);
521        case_destroy (&casefile_case))
522     {
523       case_copy (output_case, 0,
524                  &casefile_case, 0,
525                  casefile_get_value_cnt (info->casefile));
526       write_case (wc_data);
527     }
528   casereader_destroy (reader);
529 }
530
531 /* Destroys the source's internal data. */
532 static void
533 storage_source_destroy (struct case_source *source)
534 {
535   destroy_storage_stream_info (source->aux);
536 }
537
538 /* Storage source. */
539 const struct case_source_class storage_source_class = 
540   {
541     "storage",
542     storage_source_count,
543     storage_source_read,
544     storage_source_destroy,
545   };
546
547 struct casefile *
548 storage_source_get_casefile (struct case_source *source) 
549 {
550   struct storage_stream_info *info = source->aux;
551
552   assert (source->class == &storage_source_class);
553   return info->casefile;
554 }
555
556 struct case_source *
557 storage_source_create (struct casefile *cf)
558 {
559   struct storage_stream_info *info;
560
561   info = xmalloc (sizeof *info);
562   info->casefile = cf;
563
564   return create_case_source (&storage_source_class, info);
565 }
566 \f
567 /* Null sink.  Used by a few procedures that keep track of output
568    themselves and would throw away anything that the sink
569    contained anyway. */
570
571 const struct case_sink_class null_sink_class = 
572   {
573     "null",
574     NULL,
575     NULL,
576     NULL,
577     NULL,
578   };
579 \f
580 /* Returns a pointer to the lagged case from N_BEFORE cases before the
581    current one, or NULL if there haven't been that many cases yet. */
582 struct ccase *
583 lagged_case (int n_before)
584 {
585   assert (n_before >= 1 );
586   assert (n_before <= n_lag);
587
588   if (n_before <= lag_count)
589     {
590       int index = lag_head - n_before;
591       if (index < 0)
592         index += n_lag;
593       return &lag_queue[index];
594     }
595   else
596     return NULL;
597 }
598    
599 /* Appends TRNS to t_trns[], the list of all transformations to be
600    performed on data as it is read from the active file. */
601 void
602 add_transformation (struct trns_header * trns)
603 {
604   if (n_trns >= m_trns)
605     {
606       m_trns += 16;
607       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
608     }
609   t_trns[n_trns] = trns;
610   trns->index = n_trns++;
611 }
612
613 /* Cancels all active transformations, including any transformations
614    created by the input program. */
615 void
616 cancel_transformations (void)
617 {
618   int i;
619   for (i = 0; i < n_trns; i++)
620     {
621       if (t_trns[i]->free)
622         t_trns[i]->free (t_trns[i]);
623       free (t_trns[i]);
624     }
625   n_trns = f_trns = 0;
626   free (t_trns);
627   t_trns = NULL;
628   m_trns = 0;
629 }
630 \f
631 /* Creates a case source with class CLASS and auxiliary data AUX
632    and based on dictionary DICT. */
633 struct case_source *
634 create_case_source (const struct case_source_class *class,
635                     void *aux) 
636 {
637   struct case_source *source = xmalloc (sizeof *source);
638   source->class = class;
639   source->aux = aux;
640   return source;
641 }
642
643 /* Destroys case source SOURCE.  It is the caller's responsible to
644    call the source's destroy function, if any. */
645 void
646 free_case_source (struct case_source *source) 
647 {
648   if (source != NULL) 
649     {
650       if (source->class->destroy != NULL)
651         source->class->destroy (source);
652       free (source);
653     }
654 }
655
656 /* Returns nonzero if a case source is "complex". */
657 int
658 case_source_is_complex (const struct case_source *source) 
659 {
660   return source != NULL && (source->class == &input_program_source_class
661                             || source->class == &file_type_source_class);
662 }
663
664 /* Returns nonzero if CLASS is the class of SOURCE. */
665 int
666 case_source_is_class (const struct case_source *source,
667                       const struct case_source_class *class) 
668 {
669   return source != NULL && source->class == class;
670 }
671
672 /* Creates a case sink to accept cases from the given DICT with
673    class CLASS and auxiliary data AUX. */
674 struct case_sink *
675 create_case_sink (const struct case_sink_class *class,
676                   const struct dictionary *dict,
677                   void *aux) 
678 {
679   struct case_sink *sink = xmalloc (sizeof *sink);
680   sink->class = class;
681   sink->value_cnt = dict_get_compacted_value_cnt (dict);
682   sink->aux = aux;
683   return sink;
684 }
685
686 /* Destroys case sink SINK.  */
687 void
688 free_case_sink (struct case_sink *sink) 
689 {
690   if (sink != NULL) 
691     {
692       if (sink->class->destroy != NULL)
693         sink->class->destroy (sink);
694       free (sink); 
695     }
696 }
697 \f
698 /* Represents auxiliary data for handling SPLIT FILE. */
699 struct split_aux_data 
700   {
701     size_t case_count;          /* Number of cases so far. */
702     struct ccase prev_case;     /* Data in previous case. */
703
704     /* Functions to call... */
705     void (*begin_func) (void *);               /* ...before data. */
706     int (*proc_func) (struct ccase *, void *); /* ...with data. */
707     void (*end_func) (void *);                 /* ...after data. */
708     void *func_aux;                            /* Auxiliary data. */ 
709   };
710
711 static int equal_splits (const struct ccase *, const struct ccase *);
712 static int procedure_with_splits_callback (struct ccase *, void *);
713 static void dump_splits (struct ccase *);
714
715 /* Like procedure(), but it automatically breaks the case stream
716    into SPLIT FILE break groups.  Before each group of cases with
717    identical SPLIT FILE variable values, BEGIN_FUNC is called.
718    Then PROC_FUNC is called with each case in the group.  
719    END_FUNC is called when the group is finished.  FUNC_AUX is
720    passed to each of the functions as auxiliary data.
721
722    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
723    and END_FUNC will be called at all. 
724
725    If SPLIT FILE is not in effect, then there is one break group
726    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
727    will be called once. */
728 void
729 procedure_with_splits (void (*begin_func) (void *aux),
730                        int (*proc_func) (struct ccase *, void *aux),
731                        void (*end_func) (void *aux),
732                        void *func_aux) 
733 {
734   struct split_aux_data split_aux;
735
736   split_aux.case_count = 0;
737   case_nullify (&split_aux.prev_case);
738   split_aux.begin_func = begin_func;
739   split_aux.proc_func = proc_func;
740   split_aux.end_func = end_func;
741   split_aux.func_aux = func_aux;
742
743   open_active_file ();
744   internal_procedure (procedure_with_splits_callback, &split_aux);
745   if (split_aux.case_count > 0 && end_func != NULL)
746     end_func (func_aux);
747   close_active_file ();
748
749   case_destroy (&split_aux.prev_case);
750 }
751
752 /* procedure() callback used by procedure_with_splits(). */
753 static int
754 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
755 {
756   struct split_aux_data *split_aux = split_aux_;
757
758   /* Start a new series if needed. */
759   if (split_aux->case_count == 0
760       || !equal_splits (c, &split_aux->prev_case))
761     {
762       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
763         split_aux->end_func (split_aux->func_aux);
764
765       dump_splits (c);
766       case_destroy (&split_aux->prev_case);
767       case_clone (&split_aux->prev_case, c);
768
769       if (split_aux->begin_func != NULL)
770         split_aux->begin_func (split_aux->func_aux);
771     }
772
773   split_aux->case_count++;
774   if (split_aux->proc_func != NULL)
775     return split_aux->proc_func (c, split_aux->func_aux);
776   else
777     return 1;
778 }
779
780 /* Compares the SPLIT FILE variables in cases A and B and returns
781    nonzero only if they differ. */
782 static int
783 equal_splits (const struct ccase *a, const struct ccase *b) 
784 {
785   return case_compare (a, b,
786                        dict_get_split_vars (default_dict),
787                        dict_get_split_cnt (default_dict)) == 0;
788 }
789
790 /* Dumps out the values of all the split variables for the case C. */
791 static void
792 dump_splits (struct ccase *c)
793 {
794   struct variable *const *split;
795   struct tab_table *t;
796   size_t split_cnt;
797   int i;
798
799   split_cnt = dict_get_split_cnt (default_dict);
800   if (split_cnt == 0)
801     return;
802
803   t = tab_create (3, split_cnt + 1, 0);
804   tab_dim (t, tab_natural_dimensions);
805   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
806   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
807   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
808   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
809   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
810   split = dict_get_split_vars (default_dict);
811   for (i = 0; i < split_cnt; i++)
812     {
813       struct variable *v = split[i];
814       char temp_buf[80];
815       const char *val_lab;
816
817       assert (v->type == NUMERIC || v->type == ALPHA);
818       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
819       
820       data_out (temp_buf, &v->print, case_data (c, v->fv));
821       
822       temp_buf[v->print.w] = 0;
823       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
824
825       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
826       if (val_lab)
827         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
828     }
829   tab_flags (t, SOMF_NO_TITLE);
830   tab_submit (t);
831 }
832 \f
833 /* Represents auxiliary data for handling SPLIT FILE in a
834    multipass procedure. */
835 struct multipass_split_aux_data 
836   {
837     struct ccase prev_case;     /* Data in previous case. */
838     struct casefile *casefile;  /* Accumulates data for a split. */
839
840     /* Function to call with the accumulated data. */
841     void (*split_func) (const struct casefile *, void *);
842     void *func_aux;                            /* Auxiliary data. */ 
843   };
844
845 static int multipass_split_callback (struct ccase *c, void *aux_);
846 static void multipass_split_output (struct multipass_split_aux_data *);
847
848 void
849 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
850                                                      void *),
851                                  void *func_aux) 
852 {
853   struct multipass_split_aux_data aux;
854
855   assert (split_func != NULL);
856
857   open_active_file ();
858
859   case_nullify (&aux.prev_case);
860   aux.casefile = NULL;
861   aux.split_func = split_func;
862   aux.func_aux = func_aux;
863
864   internal_procedure (multipass_split_callback, &aux);
865   if (aux.casefile != NULL)
866     multipass_split_output (&aux);
867   case_destroy (&aux.prev_case);
868
869   close_active_file ();
870 }
871
872 /* procedure() callback used by multipass_procedure_with_splits(). */
873 static int
874 multipass_split_callback (struct ccase *c, void *aux_)
875 {
876   struct multipass_split_aux_data *aux = aux_;
877
878   /* Start a new series if needed. */
879   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
880     {
881       /* Pass any cases to split_func. */
882       if (aux->casefile != NULL)
883         multipass_split_output (aux);
884
885       /* Start a new casefile. */
886       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
887
888       /* Record split values. */
889       dump_splits (c);
890       case_destroy (&aux->prev_case);
891       case_clone (&aux->prev_case, c);
892     }
893
894   casefile_append (aux->casefile, c);
895
896   return 1;
897 }
898
899 static void
900 multipass_split_output (struct multipass_split_aux_data *aux)
901 {
902   assert (aux->casefile != NULL);
903   aux->split_func (aux->casefile, aux->func_aux);
904   casefile_destroy (aux->casefile);
905   aux->casefile = NULL;
906 }