49cd4a4ce7a01bccceeec16635a4a1987611beb7
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   if (proc_func == NULL
129       && case_source_is_class (vfm_source, &storage_source_class)
130       && vfm_sink == NULL
131       && !temporary
132       && n_trns == 0)
133     {
134       /* Nothing to do. */
135       return;
136     }
137
138   open_active_file ();
139   internal_procedure (proc_func, aux);
140   close_active_file ();
141 }
142
143 /* Executes a procedure, as procedure(), except that the caller
144    is responsible for calling open_active_file() and
145    close_active_file(). */
146 static void
147 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
148 {
149   static int recursive_call;
150
151   struct write_case_data wc_data;
152
153   assert (++recursive_call == 1);
154
155   wc_data.proc_func = proc_func;
156   wc_data.aux = aux;
157   create_trns_case (&wc_data.trns_case, default_dict);
158   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
159   wc_data.cases_written = 0;
160
161   last_vfm_invocation = time (NULL);
162
163   if (vfm_source != NULL) 
164     vfm_source->class->read (vfm_source,
165                              &wc_data.trns_case,
166                              write_case, &wc_data);
167
168   case_destroy (&wc_data.sink_case);
169   case_destroy (&wc_data.trns_case);
170
171   assert (--recursive_call == 0);
172 }
173
174 /* Creates and returns a case, initializing it from the vectors
175    that say which `value's need to be initialized just once, and
176    which ones need to be re-initialized before every case. */
177 static void
178 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
179 {
180   size_t var_cnt = dict_get_var_cnt (dict);
181   size_t i;
182
183   case_create (trns_case, dict_get_next_value_idx (dict));
184   for (i = 0; i < var_cnt; i++) 
185     {
186       struct variable *v = dict_get_var (dict, i);
187       union value *value = case_data_rw (trns_case, v->fv);
188
189       if (v->type == NUMERIC)
190         value->f = v->reinit ? 0.0 : SYSMIS;
191       else
192         memset (value->s, ' ', v->width);
193     }
194 }
195
196 /* Makes all preparations for reading from the data source and writing
197    to the data sink. */
198 static void
199 open_active_file (void)
200 {
201   /* Make temp_dict refer to the dictionary right before data
202      reaches the sink */
203   if (!temporary)
204     {
205       temp_trns = n_trns;
206       temp_dict = default_dict;
207     }
208
209   /* Figure out compaction. */
210   compaction_necessary = (dict_get_next_value_idx (temp_dict)
211                           != dict_get_compacted_value_cnt (temp_dict));
212
213   /* Prepare sink. */
214   if (vfm_sink == NULL)
215     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
216   if (vfm_sink->class->open != NULL)
217     vfm_sink->class->open (vfm_sink);
218
219   /* Allocate memory for lag queue. */
220   if (n_lag > 0)
221     {
222       int i;
223   
224       lag_count = 0;
225       lag_head = 0;
226       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
227       for (i = 0; i < n_lag; i++)
228         case_nullify (&lag_queue[i]);
229     }
230
231   /* Close any unclosed DO IF or LOOP constructs. */
232   discard_ctl_stack ();
233 }
234
235 /* Transforms trns_case and writes it to the replacement active
236    file if advisable.  Returns nonzero if more cases can be
237    accepted, zero otherwise.  Do not call this function again
238    after it has returned zero once.  */
239 static int
240 write_case (struct write_case_data *wc_data)
241 {
242   /* Execute permanent transformations.  */
243   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
244                                 wc_data->cases_written + 1))
245     goto done;
246
247   /* N OF CASES. */
248   if (dict_get_case_limit (default_dict)
249       && wc_data->cases_written >= dict_get_case_limit (default_dict))
250     goto done;
251   wc_data->cases_written++;
252
253   /* Write case to LAG queue. */
254   if (n_lag)
255     lag_case (&wc_data->trns_case);
256
257   /* Write case to replacement active file. */
258   if (vfm_sink->class->write != NULL) 
259     {
260       if (compaction_necessary) 
261         {
262           dict_compact_case (temp_dict, &wc_data->sink_case, &wc_data->trns_case);
263           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
264         }
265       else
266         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
267     }
268   
269   /* Execute temporary transformations. */
270   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
271                                 wc_data->cases_written))
272     goto done;
273   
274   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
275   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
276       || (dict_get_case_limit (temp_dict)
277           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
278     goto done;
279   wc_data->cases_analyzed++;
280
281   /* Pass case to procedure. */
282   if (wc_data->proc_func != NULL)
283     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
284
285  done:
286   clear_case (&wc_data->trns_case);
287   return 1;
288 }
289
290 /* Transforms case C using the transformations in TRNS[] with
291    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
292    become case CASE_NUM (1-based) in the output file.  Returns
293    zero if the case was filtered out by one of the
294    transformations, nonzero otherwise. */
295 static int
296 execute_transformations (struct ccase *c,
297                          struct trns_header **trns,
298                          int first_idx, int last_idx,
299                          int case_num) 
300 {
301   int idx;
302
303   for (idx = first_idx; idx != last_idx; )
304     {
305       int retval = trns[idx]->proc (trns[idx], c, case_num);
306       switch (retval)
307         {
308         case -1:
309           idx++;
310           break;
311           
312         case -2:
313           return 0;
314           
315         default:
316           idx = retval;
317           break;
318         }
319     }
320
321   return 1;
322 }
323
324 /* Returns nonzero if case C with case number CASE_NUM should be
325    exclude as specified on FILTER or PROCESS IF, otherwise
326    zero. */
327 static int
328 filter_case (const struct ccase *c, int case_idx)
329 {
330   /* FILTER. */
331   struct variable *filter_var = dict_get_filter (default_dict);
332   if (filter_var != NULL) 
333     {
334       double f = case_num (c, filter_var->fv);
335       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
336         return 1;
337     }
338
339   /* PROCESS IF. */
340   if (process_if_expr != NULL
341       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
342     return 1;
343
344   return 0;
345 }
346
347 /* Add C to the lag queue. */
348 static void
349 lag_case (const struct ccase *c)
350 {
351   if (lag_count < n_lag)
352     lag_count++;
353   case_destroy (&lag_queue[lag_head]);
354   case_clone (&lag_queue[lag_head], c);
355   if (++lag_head >= n_lag)
356     lag_head = 0;
357 }
358
359 /* Clears the variables in C that need to be cleared between
360    processing cases.  */
361 static void
362 clear_case (struct ccase *c)
363 {
364   size_t var_cnt = dict_get_var_cnt (default_dict);
365   size_t i;
366   
367   for (i = 0; i < var_cnt; i++) 
368     {
369       struct variable *v = dict_get_var (default_dict, i);
370       if (v->init && v->reinit) 
371         {
372           if (v->type == NUMERIC)
373             case_data_rw (c, v->fv)->f = SYSMIS;
374           else
375             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
376         } 
377     }
378 }
379
380 /* Closes the active file. */
381 static void
382 close_active_file (void)
383 {
384   /* Free memory for lag queue, and turn off lagging. */
385   if (n_lag > 0)
386     {
387       int i;
388       
389       for (i = 0; i < n_lag; i++)
390         case_destroy (&lag_queue[i]);
391       free (lag_queue);
392       n_lag = 0;
393     }
394   
395   /* Dictionary from before TEMPORARY becomes permanent.. */
396   if (temporary)
397     {
398       dict_destroy (default_dict);
399       default_dict = temp_dict;
400       temp_dict = NULL;
401     }
402
403   /* Finish compaction. */
404   if (compaction_necessary)
405     dict_compact_values (default_dict);
406     
407   /* Free data source. */
408   if (vfm_source != NULL) 
409     {
410       free_case_source (vfm_source);
411       vfm_source = NULL;
412     }
413
414   /* Old data sink becomes new data source. */
415   if (vfm_sink->class->make_source != NULL)
416     vfm_source = vfm_sink->class->make_source (vfm_sink);
417   free_case_sink (vfm_sink);
418   vfm_sink = NULL;
419
420   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
421      and get rid of all the transformations. */
422   cancel_temporary ();
423   expr_free (process_if_expr);
424   process_if_expr = NULL;
425   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
426     dict_set_filter (default_dict, NULL);
427   dict_set_case_limit (default_dict, 0);
428   dict_clear_vectors (default_dict);
429   cancel_transformations ();
430 }
431 \f
432 /* Storage case stream. */
433
434 /* Information about storage sink or source. */
435 struct storage_stream_info 
436   {
437     struct casefile *casefile;  /* Storage. */
438   };
439
440 /* Initializes a storage sink. */
441 static void
442 storage_sink_open (struct case_sink *sink)
443 {
444   struct storage_stream_info *info;
445
446   sink->aux = info = xmalloc (sizeof *info);
447   info->casefile = casefile_create (sink->value_cnt);
448 }
449
450 /* Destroys storage stream represented by INFO. */
451 static void
452 destroy_storage_stream_info (struct storage_stream_info *info) 
453 {
454   if (info != NULL) 
455     {
456       casefile_destroy (info->casefile);
457       free (info); 
458     }
459 }
460
461 /* Writes case C to the storage sink SINK. */
462 static void
463 storage_sink_write (struct case_sink *sink, const struct ccase *c)
464 {
465   struct storage_stream_info *info = sink->aux;
466
467   casefile_append (info->casefile, c);
468 }
469
470 /* Destroys internal data in SINK. */
471 static void
472 storage_sink_destroy (struct case_sink *sink)
473 {
474   destroy_storage_stream_info (sink->aux);
475 }
476
477 /* Closes the sink and returns a storage source to read back the
478    written data. */
479 static struct case_source *
480 storage_sink_make_source (struct case_sink *sink) 
481 {
482   struct case_source *source
483     = create_case_source (&storage_source_class, sink->aux);
484   sink->aux = NULL;
485   return source;
486 }
487
488 /* Storage sink. */
489 const struct case_sink_class storage_sink_class = 
490   {
491     "storage",
492     storage_sink_open,
493     storage_sink_write,
494     storage_sink_destroy,
495     storage_sink_make_source,
496   };
497 \f
498 /* Storage source. */
499
500 /* Returns the number of cases that will be read by
501    storage_source_read(). */
502 static int
503 storage_source_count (const struct case_source *source) 
504 {
505   struct storage_stream_info *info = source->aux;
506
507   return casefile_get_case_cnt (info->casefile);
508 }
509
510 /* Reads all cases from the storage source and passes them one by one to
511    write_case(). */
512 static void
513 storage_source_read (struct case_source *source,
514                      struct ccase *output_case,
515                      write_case_func *write_case, write_case_data wc_data)
516 {
517   struct storage_stream_info *info = source->aux;
518   struct ccase casefile_case;
519   struct casereader *reader;
520
521   for (reader = casefile_get_reader (info->casefile);
522        casereader_read (reader, &casefile_case);
523        case_destroy (&casefile_case))
524     {
525       case_copy (output_case, 0,
526                  &casefile_case, 0,
527                  casefile_get_value_cnt (info->casefile));
528       write_case (wc_data);
529     }
530   casereader_destroy (reader);
531 }
532
533 /* Destroys the source's internal data. */
534 static void
535 storage_source_destroy (struct case_source *source)
536 {
537   destroy_storage_stream_info (source->aux);
538 }
539
540 /* Storage source. */
541 const struct case_source_class storage_source_class = 
542   {
543     "storage",
544     storage_source_count,
545     storage_source_read,
546     storage_source_destroy,
547   };
548
549 struct casefile *
550 storage_source_get_casefile (struct case_source *source) 
551 {
552   struct storage_stream_info *info = source->aux;
553
554   assert (source->class == &storage_source_class);
555   return info->casefile;
556 }
557
558 struct case_source *
559 storage_source_create (struct casefile *cf)
560 {
561   struct storage_stream_info *info;
562
563   info = xmalloc (sizeof *info);
564   info->casefile = cf;
565
566   return create_case_source (&storage_source_class, info);
567 }
568 \f
569 /* Null sink.  Used by a few procedures that keep track of output
570    themselves and would throw away anything that the sink
571    contained anyway. */
572
573 const struct case_sink_class null_sink_class = 
574   {
575     "null",
576     NULL,
577     NULL,
578     NULL,
579     NULL,
580   };
581 \f
582 /* Returns a pointer to the lagged case from N_BEFORE cases before the
583    current one, or NULL if there haven't been that many cases yet. */
584 struct ccase *
585 lagged_case (int n_before)
586 {
587   assert (n_before >= 1 && n_before <= n_lag);
588   if (n_before <= lag_count)
589     {
590       int index = lag_head - n_before;
591       if (index < 0)
592         index += n_lag;
593       return &lag_queue[index];
594     }
595   else
596     return NULL;
597 }
598    
599 /* Appends TRNS to t_trns[], the list of all transformations to be
600    performed on data as it is read from the active file. */
601 void
602 add_transformation (struct trns_header * trns)
603 {
604   if (n_trns >= m_trns)
605     {
606       m_trns += 16;
607       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
608     }
609   t_trns[n_trns] = trns;
610   trns->index = n_trns++;
611 }
612
613 /* Cancels all active transformations, including any transformations
614    created by the input program. */
615 void
616 cancel_transformations (void)
617 {
618   int i;
619   for (i = 0; i < n_trns; i++)
620     {
621       if (t_trns[i]->free)
622         t_trns[i]->free (t_trns[i]);
623       free (t_trns[i]);
624     }
625   n_trns = f_trns = 0;
626   free (t_trns);
627   t_trns=NULL;
628   m_trns = 0;
629 }
630 \f
631 /* Creates a case source with class CLASS and auxiliary data AUX
632    and based on dictionary DICT. */
633 struct case_source *
634 create_case_source (const struct case_source_class *class,
635                     void *aux) 
636 {
637   struct case_source *source = xmalloc (sizeof *source);
638   source->class = class;
639   source->aux = aux;
640   return source;
641 }
642
643 /* Destroys case source SOURCE.  It is the caller's responsible to
644    call the source's destroy function, if any. */
645 void
646 free_case_source (struct case_source *source) 
647 {
648   if (source != NULL) 
649     {
650       if (source->class->destroy != NULL)
651         source->class->destroy (source);
652       free (source);
653     }
654 }
655
656 /* Returns nonzero if a case source is "complex". */
657 int
658 case_source_is_complex (const struct case_source *source) 
659 {
660   return source != NULL && (source->class == &input_program_source_class
661                             || source->class == &file_type_source_class);
662 }
663
664 /* Returns nonzero if CLASS is the class of SOURCE. */
665 int
666 case_source_is_class (const struct case_source *source,
667                       const struct case_source_class *class) 
668 {
669   return source != NULL && source->class == class;
670 }
671
672 /* Creates a case sink to accept cases from the given DICT with
673    class CLASS and auxiliary data AUX. */
674 struct case_sink *
675 create_case_sink (const struct case_sink_class *class,
676                   const struct dictionary *dict,
677                   void *aux) 
678 {
679   struct case_sink *sink = xmalloc (sizeof *sink);
680   sink->class = class;
681   sink->value_cnt = dict_get_compacted_value_cnt (dict);
682   sink->aux = aux;
683   return sink;
684 }
685
686 /* Destroys case sink SINK.  */
687 void
688 free_case_sink (struct case_sink *sink) 
689 {
690   if (sink != NULL) 
691     {
692       if (sink->class->destroy != NULL)
693         sink->class->destroy (sink);
694       free (sink); 
695     }
696 }
697 \f
698 /* Represents auxiliary data for handling SPLIT FILE. */
699 struct split_aux_data 
700   {
701     size_t case_count;          /* Number of cases so far. */
702     struct ccase prev_case;     /* Data in previous case. */
703
704     /* Functions to call... */
705     void (*begin_func) (void *);               /* ...before data. */
706     int (*proc_func) (struct ccase *, void *); /* ...with data. */
707     void (*end_func) (void *);                 /* ...after data. */
708     void *func_aux;                            /* Auxiliary data. */ 
709   };
710
711 static int equal_splits (const struct ccase *, const struct ccase *);
712 static int procedure_with_splits_callback (struct ccase *, void *);
713 static void dump_splits (struct ccase *);
714
715 /* Like procedure(), but it automatically breaks the case stream
716    into SPLIT FILE break groups.  Before each group of cases with
717    identical SPLIT FILE variable values, BEGIN_FUNC is called.
718    Then PROC_FUNC is called with each case in the group.  
719    END_FUNC is called when the group is finished.  FUNC_AUX is
720    passed to each of the functions as auxiliary data.
721
722    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
723    and END_FUNC will be called at all. 
724
725    If SPLIT FILE is not in effect, then there is one break group
726    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
727    will be called once. */
728 void
729 procedure_with_splits (void (*begin_func) (void *aux),
730                        int (*proc_func) (struct ccase *, void *aux),
731                        void (*end_func) (void *aux),
732                        void *func_aux) 
733 {
734   struct split_aux_data split_aux;
735
736   split_aux.case_count = 0;
737   case_nullify (&split_aux.prev_case);
738   split_aux.begin_func = begin_func;
739   split_aux.proc_func = proc_func;
740   split_aux.end_func = end_func;
741   split_aux.func_aux = func_aux;
742
743   open_active_file ();
744   internal_procedure (procedure_with_splits_callback, &split_aux);
745   if (split_aux.case_count > 0 && end_func != NULL)
746     end_func (func_aux);
747   close_active_file ();
748
749   case_destroy (&split_aux.prev_case);
750 }
751
752 /* procedure() callback used by procedure_with_splits(). */
753 static int
754 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
755 {
756   struct split_aux_data *split_aux = split_aux_;
757
758   /* Start a new series if needed. */
759   if (split_aux->case_count == 0
760       || !equal_splits (c, &split_aux->prev_case))
761     {
762       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
763         split_aux->end_func (split_aux->func_aux);
764
765       dump_splits (c);
766       case_destroy (&split_aux->prev_case);
767       case_clone (&split_aux->prev_case, c);
768
769       if (split_aux->begin_func != NULL)
770         split_aux->begin_func (split_aux->func_aux);
771     }
772
773   split_aux->case_count++;
774   if (split_aux->proc_func != NULL)
775     return split_aux->proc_func (c, split_aux->func_aux);
776   else
777     return 1;
778 }
779
780 /* Compares the SPLIT FILE variables in cases A and B and returns
781    nonzero only if they differ. */
782 static int
783 equal_splits (const struct ccase *a, const struct ccase *b) 
784 {
785   return case_compare (a, b,
786                        dict_get_split_vars (default_dict),
787                        dict_get_split_cnt (default_dict)) == 0;
788 }
789
790 /* Dumps out the values of all the split variables for the case C. */
791 static void
792 dump_splits (struct ccase *c)
793 {
794   struct variable *const *split;
795   struct tab_table *t;
796   size_t split_cnt;
797   int i;
798
799   split_cnt = dict_get_split_cnt (default_dict);
800   if (split_cnt == 0)
801     return;
802
803   t = tab_create (3, split_cnt + 1, 0);
804   tab_dim (t, tab_natural_dimensions);
805   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
806   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
807   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
808   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
809   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
810   split = dict_get_split_vars (default_dict);
811   for (i = 0; i < split_cnt; i++)
812     {
813       struct variable *v = split[i];
814       char temp_buf[80];
815       const char *val_lab;
816
817       assert (v->type == NUMERIC || v->type == ALPHA);
818       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
819       
820       data_out (temp_buf, &v->print, case_data (c, v->fv));
821       
822       temp_buf[v->print.w] = 0;
823       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
824
825       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
826       if (val_lab)
827         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
828     }
829   tab_flags (t, SOMF_NO_TITLE);
830   tab_submit (t);
831 }
832 \f
833 /* Represents auxiliary data for handling SPLIT FILE in a
834    multipass procedure. */
835 struct multipass_split_aux_data 
836   {
837     struct ccase prev_case;     /* Data in previous case. */
838     struct casefile *casefile;  /* Accumulates data for a split. */
839
840     /* Function to call with the accumulated data. */
841     void (*split_func) (const struct casefile *, void *);
842     void *func_aux;                            /* Auxiliary data. */ 
843   };
844
845 static int multipass_split_callback (struct ccase *c, void *aux_);
846 static void multipass_split_output (struct multipass_split_aux_data *);
847
848 void
849 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
850                                                      void *),
851                                  void *func_aux) 
852 {
853   struct multipass_split_aux_data aux;
854
855   assert (split_func != NULL);
856
857   open_active_file ();
858
859   case_nullify (&aux.prev_case);
860   aux.casefile = NULL;
861   aux.split_func = split_func;
862   aux.func_aux = func_aux;
863
864   internal_procedure (multipass_split_callback, &aux);
865   if (aux.casefile != NULL)
866     multipass_split_output (&aux);
867   case_destroy (&aux.prev_case);
868
869   close_active_file ();
870 }
871
872 /* procedure() callback used by multipass_procedure_with_splits(). */
873 static int
874 multipass_split_callback (struct ccase *c, void *aux_)
875 {
876   struct multipass_split_aux_data *aux = aux_;
877
878   /* Start a new series if needed. */
879   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
880     {
881       /* Pass any cases to split_func. */
882       if (aux->casefile != NULL)
883         multipass_split_output (aux);
884
885       /* Start a new casefile. */
886       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
887
888       /* Record split values. */
889       dump_splits (c);
890       case_destroy (&aux->prev_case);
891       case_clone (&aux->prev_case, c);
892     }
893
894   casefile_append (aux->casefile, c);
895
896   return 1;
897 }
898
899 static void
900 multipass_split_output (struct multipass_split_aux_data *aux)
901 {
902   assert (aux->casefile != NULL);
903   aux->split_func (aux->casefile, aux->func_aux);
904   casefile_destroy (aux->casefile);
905   aux->casefile = NULL;
906 }