Revise.
[pspp] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /*
49    Virtual File Manager (vfm):
50
51    vfm is used to process data files.  It uses the model that
52    data is read from one stream (the data source), processed,
53    then written to another (the data sink).  The data source is
54    then deleted and the data sink becomes the data source for the
55    next procedure. */
56
57 /* Procedure execution data. */
58 struct write_case_data
59   {
60     /* Function to call for each case. */
61     int (*proc_func) (struct ccase *, void *); /* Function. */
62     void *aux;                                 /* Auxiliary data. */ 
63
64     struct ccase trns_case;     /* Case used for transformations. */
65     struct ccase sink_case;     /* Case written to sink, if
66                                    compaction is necessary. */
67     size_t cases_written;       /* Cases output so far. */
68     size_t cases_analyzed;      /* Cases passed to procedure so far. */
69   };
70
71 /* The current active file, from which cases are read. */
72 struct case_source *vfm_source;
73
74 /* The replacement active file, to which cases are written. */
75 struct case_sink *vfm_sink;
76
77 /* Nonzero if the case needs to have values deleted before being
78    stored, zero otherwise. */
79 static int compaction_necessary;
80
81 /* Time at which vfm was last invoked. */
82 time_t last_vfm_invocation;
83
84 /* Lag queue. */
85 int n_lag;                      /* Number of cases to lag. */
86 static int lag_count;           /* Number of cases in lag_queue so far. */
87 static int lag_head;            /* Index where next case will be added. */
88 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
89
90 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
91                                 void *aux);
92 static void create_trns_case (struct ccase *, struct dictionary *);
93 static void open_active_file (void);
94 static int write_case (struct write_case_data *wc_data);
95 static int execute_transformations (struct ccase *c,
96                                     struct trns_header **trns,
97                                     int first_idx, int last_idx,
98                                     int case_num);
99 static int filter_case (const struct ccase *c, int case_num);
100 static void lag_case (const struct ccase *c);
101 static void clear_case (struct ccase *c);
102 static void close_active_file (void);
103 \f
104 /* Public functions. */
105
106 /* Reads the data from the input program and writes it to a new
107    active file.  For each case we read from the input program, we
108    do the following
109
110    1. Execute permanent transformations.  If these drop the case,
111       start the next case from step 1.
112
113    2. N OF CASES.  If we have already written N cases, start the
114       next case from step 1.
115    
116    3. Write case to replacement active file.
117    
118    4. Execute temporary transformations.  If these drop the case,
119       start the next case from step 1.
120       
121    5. FILTER, PROCESS IF.  If these drop the case, start the next
122       case from step 1.
123    
124    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
125       cases, start the next case from step 1.
126       
127    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
128 void
129 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
130 {
131   if (proc_func == NULL
132       && case_source_is_class (vfm_source, &storage_source_class)
133       && vfm_sink == NULL
134       && !temporary
135       && n_trns == 0)
136     {
137       /* Nothing to do. */
138       return;
139     }
140
141   open_active_file ();
142   internal_procedure (proc_func, aux);
143   close_active_file ();
144 }
145
146 /* Executes a procedure, as procedure(), except that the caller
147    is responsible for calling open_active_file() and
148    close_active_file(). */
149 static void
150 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
151 {
152   static int recursive_call;
153
154   struct write_case_data wc_data;
155
156   assert (++recursive_call == 1);
157
158   wc_data.proc_func = proc_func;
159   wc_data.aux = aux;
160   create_trns_case (&wc_data.trns_case, default_dict);
161   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
162   wc_data.cases_written = 0;
163
164   last_vfm_invocation = time (NULL);
165
166   if (vfm_source != NULL) 
167     vfm_source->class->read (vfm_source,
168                              &wc_data.trns_case,
169                              write_case, &wc_data);
170
171   case_destroy (&wc_data.sink_case);
172   case_destroy (&wc_data.trns_case);
173
174   assert (--recursive_call == 0);
175 }
176
177 /* Creates and returns a case, initializing it from the vectors
178    that say which `value's need to be initialized just once, and
179    which ones need to be re-initialized before every case. */
180 static void
181 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
182 {
183   size_t var_cnt = dict_get_var_cnt (dict);
184   size_t i;
185
186   case_create (trns_case, dict_get_next_value_idx (dict));
187   for (i = 0; i < var_cnt; i++) 
188     {
189       struct variable *v = dict_get_var (dict, i);
190       union value *value = case_data_rw (trns_case, v->fv);
191
192       if (v->type == NUMERIC)
193         value->f = v->reinit ? 0.0 : SYSMIS;
194       else
195         memset (value->s, ' ', v->width);
196     }
197 }
198
199 /* Makes all preparations for reading from the data source and writing
200    to the data sink. */
201 static void
202 open_active_file (void)
203 {
204   /* Make temp_dict refer to the dictionary right before data
205      reaches the sink */
206   if (!temporary)
207     {
208       temp_trns = n_trns;
209       temp_dict = default_dict;
210     }
211
212   /* Figure out compaction. */
213   compaction_necessary = (dict_get_next_value_idx (temp_dict)
214                           != dict_get_compacted_value_cnt (temp_dict));
215
216   /* Prepare sink. */
217   if (vfm_sink == NULL)
218     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
219   if (vfm_sink->class->open != NULL)
220     vfm_sink->class->open (vfm_sink);
221
222   /* Allocate memory for lag queue. */
223   if (n_lag > 0)
224     {
225       int i;
226   
227       lag_count = 0;
228       lag_head = 0;
229       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
230       for (i = 0; i < n_lag; i++)
231         case_nullify (&lag_queue[i]);
232     }
233
234   /* Close any unclosed DO IF or LOOP constructs. */
235   discard_ctl_stack ();
236 }
237
238 /* Transforms trns_case and writes it to the replacement active
239    file if advisable.  Returns nonzero if more cases can be
240    accepted, zero otherwise.  Do not call this function again
241    after it has returned zero once.  */
242 static int
243 write_case (struct write_case_data *wc_data)
244 {
245   /* Execute permanent transformations.  */
246   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
247                                 wc_data->cases_written + 1))
248     goto done;
249
250   /* N OF CASES. */
251   if (dict_get_case_limit (default_dict)
252       && wc_data->cases_written >= dict_get_case_limit (default_dict))
253     goto done;
254   wc_data->cases_written++;
255
256   /* Write case to LAG queue. */
257   if (n_lag)
258     lag_case (&wc_data->trns_case);
259
260   /* Write case to replacement active file. */
261   if (vfm_sink->class->write != NULL) 
262     {
263       if (compaction_necessary) 
264         {
265           dict_compact_case (temp_dict, &wc_data->sink_case,
266                              &wc_data->trns_case);
267           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
268         }
269       else
270         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
271     }
272   
273   /* Execute temporary transformations. */
274   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
275                                 wc_data->cases_written))
276     goto done;
277   
278   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
279   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
280       || (dict_get_case_limit (temp_dict)
281           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
282     goto done;
283   wc_data->cases_analyzed++;
284
285   /* Pass case to procedure. */
286   if (wc_data->proc_func != NULL)
287     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
288
289  done:
290   clear_case (&wc_data->trns_case);
291   return 1;
292 }
293
294 /* Transforms case C using the transformations in TRNS[] with
295    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
296    become case CASE_NUM (1-based) in the output file.  Returns
297    zero if the case was filtered out by one of the
298    transformations, nonzero otherwise. */
299 static int
300 execute_transformations (struct ccase *c,
301                          struct trns_header **trns,
302                          int first_idx, int last_idx,
303                          int case_num) 
304 {
305   int idx;
306
307   for (idx = first_idx; idx != last_idx; )
308     {
309       int retval = trns[idx]->proc (trns[idx], c, case_num);
310       switch (retval)
311         {
312         case -1:
313           idx++;
314           break;
315           
316         case -2:
317           return 0;
318           
319         default:
320           idx = retval;
321           break;
322         }
323     }
324
325   return 1;
326 }
327
328 /* Returns nonzero if case C with case number CASE_NUM should be
329    exclude as specified on FILTER or PROCESS IF, otherwise
330    zero. */
331 static int
332 filter_case (const struct ccase *c, int case_idx)
333 {
334   /* FILTER. */
335   struct variable *filter_var = dict_get_filter (default_dict);
336   if (filter_var != NULL) 
337     {
338       double f = case_num (c, filter_var->fv);
339       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
340         return 1;
341     }
342
343   /* PROCESS IF. */
344   if (process_if_expr != NULL
345       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
346     return 1;
347
348   return 0;
349 }
350
351 /* Add C to the lag queue. */
352 static void
353 lag_case (const struct ccase *c)
354 {
355   if (lag_count < n_lag)
356     lag_count++;
357   case_destroy (&lag_queue[lag_head]);
358   case_clone (&lag_queue[lag_head], c);
359   if (++lag_head >= n_lag)
360     lag_head = 0;
361 }
362
363 /* Clears the variables in C that need to be cleared between
364    processing cases.  */
365 static void
366 clear_case (struct ccase *c)
367 {
368   size_t var_cnt = dict_get_var_cnt (default_dict);
369   size_t i;
370   
371   for (i = 0; i < var_cnt; i++) 
372     {
373       struct variable *v = dict_get_var (default_dict, i);
374       if (v->init && v->reinit) 
375         {
376           if (v->type == NUMERIC)
377             case_data_rw (c, v->fv)->f = SYSMIS;
378           else
379             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
380         } 
381     }
382 }
383
384 /* Closes the active file. */
385 static void
386 close_active_file (void)
387 {
388   /* Free memory for lag queue, and turn off lagging. */
389   if (n_lag > 0)
390     {
391       int i;
392       
393       for (i = 0; i < n_lag; i++)
394         case_destroy (&lag_queue[i]);
395       free (lag_queue);
396       n_lag = 0;
397     }
398   
399   /* Dictionary from before TEMPORARY becomes permanent.. */
400   if (temporary)
401     {
402       dict_destroy (default_dict);
403       default_dict = temp_dict;
404       temp_dict = NULL;
405     }
406
407   /* Finish compaction. */
408   if (compaction_necessary)
409     dict_compact_values (default_dict);
410     
411   /* Free data source. */
412   free_case_source (vfm_source);
413   vfm_source = NULL;
414
415   /* Old data sink becomes new data source. */
416   if (vfm_sink->class->make_source != NULL)
417     vfm_source = vfm_sink->class->make_source (vfm_sink);
418   free_case_sink (vfm_sink);
419   vfm_sink = NULL;
420
421   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
422      and get rid of all the transformations. */
423   cancel_temporary ();
424   expr_free (process_if_expr);
425   process_if_expr = NULL;
426   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
427     dict_set_filter (default_dict, NULL);
428   dict_set_case_limit (default_dict, 0);
429   dict_clear_vectors (default_dict);
430   cancel_transformations ();
431 }
432 \f
433 /* Storage case stream. */
434
435 /* Information about storage sink or source. */
436 struct storage_stream_info 
437   {
438     struct casefile *casefile;  /* Storage. */
439   };
440
441 /* Initializes a storage sink. */
442 static void
443 storage_sink_open (struct case_sink *sink)
444 {
445   struct storage_stream_info *info;
446
447   sink->aux = info = xmalloc (sizeof *info);
448   info->casefile = casefile_create (sink->value_cnt);
449 }
450
451 /* Destroys storage stream represented by INFO. */
452 static void
453 destroy_storage_stream_info (struct storage_stream_info *info) 
454 {
455   if (info != NULL) 
456     {
457       casefile_destroy (info->casefile);
458       free (info); 
459     }
460 }
461
462 /* Writes case C to the storage sink SINK. */
463 static void
464 storage_sink_write (struct case_sink *sink, const struct ccase *c)
465 {
466   struct storage_stream_info *info = sink->aux;
467
468   casefile_append (info->casefile, c);
469 }
470
471 /* Destroys internal data in SINK. */
472 static void
473 storage_sink_destroy (struct case_sink *sink)
474 {
475   destroy_storage_stream_info (sink->aux);
476 }
477
478 /* Closes the sink and returns a storage source to read back the
479    written data. */
480 static struct case_source *
481 storage_sink_make_source (struct case_sink *sink) 
482 {
483   struct case_source *source
484     = create_case_source (&storage_source_class, sink->aux);
485   sink->aux = NULL;
486   return source;
487 }
488
489 /* Storage sink. */
490 const struct case_sink_class storage_sink_class = 
491   {
492     "storage",
493     storage_sink_open,
494     storage_sink_write,
495     storage_sink_destroy,
496     storage_sink_make_source,
497   };
498 \f
499 /* Storage source. */
500
501 /* Returns the number of cases that will be read by
502    storage_source_read(). */
503 static int
504 storage_source_count (const struct case_source *source) 
505 {
506   struct storage_stream_info *info = source->aux;
507
508   return casefile_get_case_cnt (info->casefile);
509 }
510
511 /* Reads all cases from the storage source and passes them one by one to
512    write_case(). */
513 static void
514 storage_source_read (struct case_source *source,
515                      struct ccase *output_case,
516                      write_case_func *write_case, write_case_data wc_data)
517 {
518   struct storage_stream_info *info = source->aux;
519   struct ccase casefile_case;
520   struct casereader *reader;
521
522   for (reader = casefile_get_reader (info->casefile);
523        casereader_read (reader, &casefile_case);
524        case_destroy (&casefile_case))
525     {
526       case_copy (output_case, 0,
527                  &casefile_case, 0,
528                  casefile_get_value_cnt (info->casefile));
529       write_case (wc_data);
530     }
531   casereader_destroy (reader);
532 }
533
534 /* Destroys the source's internal data. */
535 static void
536 storage_source_destroy (struct case_source *source)
537 {
538   destroy_storage_stream_info (source->aux);
539 }
540
541 /* Storage source. */
542 const struct case_source_class storage_source_class = 
543   {
544     "storage",
545     storage_source_count,
546     storage_source_read,
547     storage_source_destroy,
548   };
549
550 struct casefile *
551 storage_source_get_casefile (struct case_source *source) 
552 {
553   struct storage_stream_info *info = source->aux;
554
555   assert (source->class == &storage_source_class);
556   return info->casefile;
557 }
558
559 struct case_source *
560 storage_source_create (struct casefile *cf)
561 {
562   struct storage_stream_info *info;
563
564   info = xmalloc (sizeof *info);
565   info->casefile = cf;
566
567   return create_case_source (&storage_source_class, info);
568 }
569 \f
570 /* Null sink.  Used by a few procedures that keep track of output
571    themselves and would throw away anything that the sink
572    contained anyway. */
573
574 const struct case_sink_class null_sink_class = 
575   {
576     "null",
577     NULL,
578     NULL,
579     NULL,
580     NULL,
581   };
582 \f
583 /* Returns a pointer to the lagged case from N_BEFORE cases before the
584    current one, or NULL if there haven't been that many cases yet. */
585 struct ccase *
586 lagged_case (int n_before)
587 {
588   assert (n_before >= 1 );
589   assert (n_before <= n_lag);
590
591   if (n_before <= lag_count)
592     {
593       int index = lag_head - n_before;
594       if (index < 0)
595         index += n_lag;
596       return &lag_queue[index];
597     }
598   else
599     return NULL;
600 }
601    
602 /* Appends TRNS to t_trns[], the list of all transformations to be
603    performed on data as it is read from the active file. */
604 void
605 add_transformation (struct trns_header * trns)
606 {
607   if (n_trns >= m_trns)
608     {
609       m_trns += 16;
610       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
611     }
612   t_trns[n_trns] = trns;
613   trns->index = n_trns++;
614 }
615
616 /* Cancels all active transformations, including any transformations
617    created by the input program. */
618 void
619 cancel_transformations (void)
620 {
621   int i;
622   for (i = 0; i < n_trns; i++)
623     {
624       if (t_trns[i]->free)
625         t_trns[i]->free (t_trns[i]);
626       free (t_trns[i]);
627     }
628   n_trns = f_trns = 0;
629   free (t_trns);
630   t_trns = NULL;
631   m_trns = 0;
632 }
633 \f
634 /* Creates a case source with class CLASS and auxiliary data AUX
635    and based on dictionary DICT. */
636 struct case_source *
637 create_case_source (const struct case_source_class *class,
638                     void *aux) 
639 {
640   struct case_source *source = xmalloc (sizeof *source);
641   source->class = class;
642   source->aux = aux;
643   return source;
644 }
645
646 /* Destroys case source SOURCE.  It is the caller's responsible to
647    call the source's destroy function, if any. */
648 void
649 free_case_source (struct case_source *source) 
650 {
651   if (source != NULL) 
652     {
653       if (source->class->destroy != NULL)
654         source->class->destroy (source);
655       free (source);
656     }
657 }
658
659 /* Returns nonzero if a case source is "complex". */
660 int
661 case_source_is_complex (const struct case_source *source) 
662 {
663   return source != NULL && (source->class == &input_program_source_class
664                             || source->class == &file_type_source_class);
665 }
666
667 /* Returns nonzero if CLASS is the class of SOURCE. */
668 int
669 case_source_is_class (const struct case_source *source,
670                       const struct case_source_class *class) 
671 {
672   return source != NULL && source->class == class;
673 }
674
675 /* Creates a case sink to accept cases from the given DICT with
676    class CLASS and auxiliary data AUX. */
677 struct case_sink *
678 create_case_sink (const struct case_sink_class *class,
679                   const struct dictionary *dict,
680                   void *aux) 
681 {
682   struct case_sink *sink = xmalloc (sizeof *sink);
683   sink->class = class;
684   sink->value_cnt = dict_get_compacted_value_cnt (dict);
685   sink->aux = aux;
686   return sink;
687 }
688
689 /* Destroys case sink SINK.  */
690 void
691 free_case_sink (struct case_sink *sink) 
692 {
693   if (sink != NULL) 
694     {
695       if (sink->class->destroy != NULL)
696         sink->class->destroy (sink);
697       free (sink); 
698     }
699 }
700 \f
701 /* Represents auxiliary data for handling SPLIT FILE. */
702 struct split_aux_data 
703   {
704     size_t case_count;          /* Number of cases so far. */
705     struct ccase prev_case;     /* Data in previous case. */
706
707     /* Functions to call... */
708     void (*begin_func) (void *);               /* ...before data. */
709     int (*proc_func) (struct ccase *, void *); /* ...with data. */
710     void (*end_func) (void *);                 /* ...after data. */
711     void *func_aux;                            /* Auxiliary data. */ 
712   };
713
714 static int equal_splits (const struct ccase *, const struct ccase *);
715 static int procedure_with_splits_callback (struct ccase *, void *);
716 static void dump_splits (struct ccase *);
717
718 /* Like procedure(), but it automatically breaks the case stream
719    into SPLIT FILE break groups.  Before each group of cases with
720    identical SPLIT FILE variable values, BEGIN_FUNC is called.
721    Then PROC_FUNC is called with each case in the group.  
722    END_FUNC is called when the group is finished.  FUNC_AUX is
723    passed to each of the functions as auxiliary data.
724
725    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
726    and END_FUNC will be called at all. 
727
728    If SPLIT FILE is not in effect, then there is one break group
729    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
730    will be called once. */
731 void
732 procedure_with_splits (void (*begin_func) (void *aux),
733                        int (*proc_func) (struct ccase *, void *aux),
734                        void (*end_func) (void *aux),
735                        void *func_aux) 
736 {
737   struct split_aux_data split_aux;
738
739   split_aux.case_count = 0;
740   case_nullify (&split_aux.prev_case);
741   split_aux.begin_func = begin_func;
742   split_aux.proc_func = proc_func;
743   split_aux.end_func = end_func;
744   split_aux.func_aux = func_aux;
745
746   open_active_file ();
747   internal_procedure (procedure_with_splits_callback, &split_aux);
748   if (split_aux.case_count > 0 && end_func != NULL)
749     end_func (func_aux);
750   close_active_file ();
751
752   case_destroy (&split_aux.prev_case);
753 }
754
755 /* procedure() callback used by procedure_with_splits(). */
756 static int
757 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
758 {
759   struct split_aux_data *split_aux = split_aux_;
760
761   /* Start a new series if needed. */
762   if (split_aux->case_count == 0
763       || !equal_splits (c, &split_aux->prev_case))
764     {
765       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
766         split_aux->end_func (split_aux->func_aux);
767
768       dump_splits (c);
769       case_destroy (&split_aux->prev_case);
770       case_clone (&split_aux->prev_case, c);
771
772       if (split_aux->begin_func != NULL)
773         split_aux->begin_func (split_aux->func_aux);
774     }
775
776   split_aux->case_count++;
777   if (split_aux->proc_func != NULL)
778     return split_aux->proc_func (c, split_aux->func_aux);
779   else
780     return 1;
781 }
782
783 /* Compares the SPLIT FILE variables in cases A and B and returns
784    nonzero only if they differ. */
785 static int
786 equal_splits (const struct ccase *a, const struct ccase *b) 
787 {
788   return case_compare (a, b,
789                        dict_get_split_vars (default_dict),
790                        dict_get_split_cnt (default_dict)) == 0;
791 }
792
793 /* Dumps out the values of all the split variables for the case C. */
794 static void
795 dump_splits (struct ccase *c)
796 {
797   struct variable *const *split;
798   struct tab_table *t;
799   size_t split_cnt;
800   int i;
801
802   split_cnt = dict_get_split_cnt (default_dict);
803   if (split_cnt == 0)
804     return;
805
806   t = tab_create (3, split_cnt + 1, 0);
807   tab_dim (t, tab_natural_dimensions);
808   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
809   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
810   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
811   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
812   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
813   split = dict_get_split_vars (default_dict);
814   for (i = 0; i < split_cnt; i++)
815     {
816       struct variable *v = split[i];
817       char temp_buf[80];
818       const char *val_lab;
819
820       assert (v->type == NUMERIC || v->type == ALPHA);
821       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
822       
823       data_out (temp_buf, &v->print, case_data (c, v->fv));
824       
825       temp_buf[v->print.w] = 0;
826       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
827
828       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
829       if (val_lab)
830         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
831     }
832   tab_flags (t, SOMF_NO_TITLE);
833   tab_submit (t);
834 }
835 \f
836 /* Represents auxiliary data for handling SPLIT FILE in a
837    multipass procedure. */
838 struct multipass_split_aux_data 
839   {
840     struct ccase prev_case;     /* Data in previous case. */
841     struct casefile *casefile;  /* Accumulates data for a split. */
842
843     /* Function to call with the accumulated data. */
844     void (*split_func) (const struct casefile *, void *);
845     void *func_aux;                            /* Auxiliary data. */ 
846   };
847
848 static int multipass_split_callback (struct ccase *c, void *aux_);
849 static void multipass_split_output (struct multipass_split_aux_data *);
850
851 void
852 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
853                                                      void *),
854                                  void *func_aux) 
855 {
856   struct multipass_split_aux_data aux;
857
858   assert (split_func != NULL);
859
860   open_active_file ();
861
862   case_nullify (&aux.prev_case);
863   aux.casefile = NULL;
864   aux.split_func = split_func;
865   aux.func_aux = func_aux;
866
867   internal_procedure (multipass_split_callback, &aux);
868   if (aux.casefile != NULL)
869     multipass_split_output (&aux);
870   case_destroy (&aux.prev_case);
871
872   close_active_file ();
873 }
874
875 /* procedure() callback used by multipass_procedure_with_splits(). */
876 static int
877 multipass_split_callback (struct ccase *c, void *aux_)
878 {
879   struct multipass_split_aux_data *aux = aux_;
880
881   /* Start a new series if needed. */
882   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
883     {
884       /* Pass any cases to split_func. */
885       if (aux->casefile != NULL)
886         multipass_split_output (aux);
887
888       /* Start a new casefile. */
889       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
890
891       /* Record split values. */
892       dump_splits (c);
893       case_destroy (&aux->prev_case);
894       case_clone (&aux->prev_case, c);
895     }
896
897   casefile_append (aux->casefile, c);
898
899   return 1;
900 }
901
902 static void
903 multipass_split_output (struct multipass_split_aux_data *aux)
904 {
905   assert (aux->casefile != NULL);
906   aux->split_func (aux->casefile, aux->func_aux);
907   casefile_destroy (aux->casefile);
908   aux->casefile = NULL;
909 }