Did some more work on bug 12859 and then realized that a *good*
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   if (proc_func == NULL
129       && case_source_is_class (vfm_source, &storage_source_class)
130       && vfm_sink == NULL
131       && !temporary
132       && n_trns == 0)
133     {
134       /* Nothing to do. */
135       return;
136     }
137
138   open_active_file ();
139   internal_procedure (proc_func, aux);
140   close_active_file ();
141 }
142
143 /* Executes a procedure, as procedure(), except that the caller
144    is responsible for calling open_active_file() and
145    close_active_file(). */
146 static void
147 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
148 {
149   static int recursive_call;
150
151   struct write_case_data wc_data;
152
153   assert (++recursive_call == 1);
154
155   wc_data.proc_func = proc_func;
156   wc_data.aux = aux;
157   create_trns_case (&wc_data.trns_case, default_dict);
158   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
159   wc_data.cases_written = 0;
160
161   last_vfm_invocation = time (NULL);
162
163   if (vfm_source != NULL) 
164     vfm_source->class->read (vfm_source,
165                              &wc_data.trns_case,
166                              write_case, &wc_data);
167
168   case_destroy (&wc_data.sink_case);
169   case_destroy (&wc_data.trns_case);
170
171   assert (--recursive_call == 0);
172 }
173
174 /* Creates and returns a case, initializing it from the vectors
175    that say which `value's need to be initialized just once, and
176    which ones need to be re-initialized before every case. */
177 static void
178 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
179 {
180   size_t var_cnt = dict_get_var_cnt (dict);
181   size_t i;
182
183   case_create (trns_case, dict_get_next_value_idx (dict));
184   for (i = 0; i < var_cnt; i++) 
185     {
186       struct variable *v = dict_get_var (dict, i);
187       union value *value = case_data_rw (trns_case, v->fv);
188
189       if (v->type == NUMERIC)
190         value->f = v->reinit ? 0.0 : SYSMIS;
191       else
192         memset (value->s, ' ', v->width);
193     }
194 }
195
196 /* Makes all preparations for reading from the data source and writing
197    to the data sink. */
198 static void
199 open_active_file (void)
200 {
201   /* Make temp_dict refer to the dictionary right before data
202      reaches the sink */
203   if (!temporary)
204     {
205       temp_trns = n_trns;
206       temp_dict = default_dict;
207     }
208
209   /* Figure out compaction. */
210   compaction_necessary = (dict_get_next_value_idx (temp_dict)
211                           != dict_get_compacted_value_cnt (temp_dict));
212
213   /* Prepare sink. */
214   if (vfm_sink == NULL)
215     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
216   if (vfm_sink->class->open != NULL)
217     vfm_sink->class->open (vfm_sink);
218
219   /* Allocate memory for lag queue. */
220   if (n_lag > 0)
221     {
222       int i;
223   
224       lag_count = 0;
225       lag_head = 0;
226       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
227       for (i = 0; i < n_lag; i++)
228         case_nullify (&lag_queue[i]);
229     }
230
231   /* Close any unclosed DO IF or LOOP constructs. */
232   discard_ctl_stack ();
233 }
234
235 /* Transforms trns_case and writes it to the replacement active
236    file if advisable.  Returns nonzero if more cases can be
237    accepted, zero otherwise.  Do not call this function again
238    after it has returned zero once.  */
239 static int
240 write_case (struct write_case_data *wc_data)
241 {
242   /* Execute permanent transformations.  */
243   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
244                                 wc_data->cases_written + 1))
245     goto done;
246
247   /* N OF CASES. */
248   if (dict_get_case_limit (default_dict)
249       && wc_data->cases_written >= dict_get_case_limit (default_dict))
250     goto done;
251   wc_data->cases_written++;
252
253   /* Write case to LAG queue. */
254   if (n_lag)
255     lag_case (&wc_data->trns_case);
256
257   /* Write case to replacement active file. */
258   if (vfm_sink->class->write != NULL) 
259     {
260       if (compaction_necessary) 
261         {
262           dict_compact_case (temp_dict, &wc_data->sink_case,
263                              &wc_data->trns_case);
264           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
265         }
266       else
267         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
268     }
269   
270   /* Execute temporary transformations. */
271   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
272                                 wc_data->cases_written))
273     goto done;
274   
275   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
276   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
277       || (dict_get_case_limit (temp_dict)
278           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
279     goto done;
280   wc_data->cases_analyzed++;
281
282   /* Pass case to procedure. */
283   if (wc_data->proc_func != NULL)
284     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
285
286  done:
287   clear_case (&wc_data->trns_case);
288   return 1;
289 }
290
291 /* Transforms case C using the transformations in TRNS[] with
292    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
293    become case CASE_NUM (1-based) in the output file.  Returns
294    zero if the case was filtered out by one of the
295    transformations, nonzero otherwise. */
296 static int
297 execute_transformations (struct ccase *c,
298                          struct trns_header **trns,
299                          int first_idx, int last_idx,
300                          int case_num) 
301 {
302   int idx;
303
304   for (idx = first_idx; idx != last_idx; )
305     {
306       int retval = trns[idx]->proc (trns[idx], c, case_num);
307       switch (retval)
308         {
309         case -1:
310           idx++;
311           break;
312           
313         case -2:
314           return 0;
315           
316         default:
317           idx = retval;
318           break;
319         }
320     }
321
322   return 1;
323 }
324
325 /* Returns nonzero if case C with case number CASE_NUM should be
326    exclude as specified on FILTER or PROCESS IF, otherwise
327    zero. */
328 static int
329 filter_case (const struct ccase *c, int case_idx)
330 {
331   /* FILTER. */
332   struct variable *filter_var = dict_get_filter (default_dict);
333   if (filter_var != NULL) 
334     {
335       double f = case_num (c, filter_var->fv);
336       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
337         return 1;
338     }
339
340   /* PROCESS IF. */
341   if (process_if_expr != NULL
342       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
343     return 1;
344
345   return 0;
346 }
347
348 /* Add C to the lag queue. */
349 static void
350 lag_case (const struct ccase *c)
351 {
352   if (lag_count < n_lag)
353     lag_count++;
354   case_destroy (&lag_queue[lag_head]);
355   case_clone (&lag_queue[lag_head], c);
356   if (++lag_head >= n_lag)
357     lag_head = 0;
358 }
359
360 /* Clears the variables in C that need to be cleared between
361    processing cases.  */
362 static void
363 clear_case (struct ccase *c)
364 {
365   size_t var_cnt = dict_get_var_cnt (default_dict);
366   size_t i;
367   
368   for (i = 0; i < var_cnt; i++) 
369     {
370       struct variable *v = dict_get_var (default_dict, i);
371       if (v->init && v->reinit) 
372         {
373           if (v->type == NUMERIC)
374             case_data_rw (c, v->fv)->f = SYSMIS;
375           else
376             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
377         } 
378     }
379 }
380
381 /* Closes the active file. */
382 static void
383 close_active_file (void)
384 {
385   /* Free memory for lag queue, and turn off lagging. */
386   if (n_lag > 0)
387     {
388       int i;
389       
390       for (i = 0; i < n_lag; i++)
391         case_destroy (&lag_queue[i]);
392       free (lag_queue);
393       n_lag = 0;
394     }
395   
396   /* Dictionary from before TEMPORARY becomes permanent.. */
397   if (temporary)
398     {
399       dict_destroy (default_dict);
400       default_dict = temp_dict;
401       temp_dict = NULL;
402     }
403
404   /* Finish compaction. */
405   if (compaction_necessary)
406     dict_compact_values (default_dict);
407     
408   /* Free data source. */
409   if (vfm_source != NULL) 
410     {
411       free_case_source (vfm_source);
412       vfm_source = NULL;
413     }
414
415   /* Old data sink becomes new data source. */
416   if (vfm_sink->class->make_source != NULL)
417     vfm_source = vfm_sink->class->make_source (vfm_sink);
418   free_case_sink (vfm_sink);
419   vfm_sink = NULL;
420
421   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
422      and get rid of all the transformations. */
423   cancel_temporary ();
424   expr_free (process_if_expr);
425   process_if_expr = NULL;
426   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
427     dict_set_filter (default_dict, NULL);
428   dict_set_case_limit (default_dict, 0);
429   dict_clear_vectors (default_dict);
430   cancel_transformations ();
431 }
432 \f
433 /* Storage case stream. */
434
435 /* Information about storage sink or source. */
436 struct storage_stream_info 
437   {
438     struct casefile *casefile;  /* Storage. */
439   };
440
441 /* Initializes a storage sink. */
442 static void
443 storage_sink_open (struct case_sink *sink)
444 {
445   struct storage_stream_info *info;
446
447   sink->aux = info = xmalloc (sizeof *info);
448   info->casefile = casefile_create (sink->value_cnt);
449 }
450
451 /* Destroys storage stream represented by INFO. */
452 static void
453 destroy_storage_stream_info (struct storage_stream_info *info) 
454 {
455   if (info != NULL) 
456     {
457       casefile_destroy (info->casefile);
458       free (info); 
459     }
460 }
461
462 /* Writes case C to the storage sink SINK. */
463 static void
464 storage_sink_write (struct case_sink *sink, const struct ccase *c)
465 {
466   struct storage_stream_info *info = sink->aux;
467
468   casefile_append (info->casefile, c);
469 }
470
471 /* Destroys internal data in SINK. */
472 static void
473 storage_sink_destroy (struct case_sink *sink)
474 {
475   destroy_storage_stream_info (sink->aux);
476 }
477
478 /* Closes the sink and returns a storage source to read back the
479    written data. */
480 static struct case_source *
481 storage_sink_make_source (struct case_sink *sink) 
482 {
483   struct case_source *source
484     = create_case_source (&storage_source_class, sink->aux);
485   sink->aux = NULL;
486   return source;
487 }
488
489 /* Storage sink. */
490 const struct case_sink_class storage_sink_class = 
491   {
492     "storage",
493     storage_sink_open,
494     storage_sink_write,
495     storage_sink_destroy,
496     storage_sink_make_source,
497   };
498 \f
499 /* Storage source. */
500
501 /* Returns the number of cases that will be read by
502    storage_source_read(). */
503 static int
504 storage_source_count (const struct case_source *source) 
505 {
506   struct storage_stream_info *info = source->aux;
507
508   return casefile_get_case_cnt (info->casefile);
509 }
510
511 /* Reads all cases from the storage source and passes them one by one to
512    write_case(). */
513 static void
514 storage_source_read (struct case_source *source,
515                      struct ccase *output_case,
516                      write_case_func *write_case, write_case_data wc_data)
517 {
518   struct storage_stream_info *info = source->aux;
519   struct ccase casefile_case;
520   struct casereader *reader;
521
522   for (reader = casefile_get_reader (info->casefile);
523        casereader_read (reader, &casefile_case);
524        case_destroy (&casefile_case))
525     {
526       case_copy (output_case, 0,
527                  &casefile_case, 0,
528                  casefile_get_value_cnt (info->casefile));
529       write_case (wc_data);
530     }
531   casereader_destroy (reader);
532 }
533
534 /* Destroys the source's internal data. */
535 static void
536 storage_source_destroy (struct case_source *source)
537 {
538   destroy_storage_stream_info (source->aux);
539 }
540
541 /* Storage source. */
542 const struct case_source_class storage_source_class = 
543   {
544     "storage",
545     storage_source_count,
546     storage_source_read,
547     storage_source_destroy,
548   };
549
550 struct casefile *
551 storage_source_get_casefile (struct case_source *source) 
552 {
553   struct storage_stream_info *info = source->aux;
554
555   assert (source->class == &storage_source_class);
556   return info->casefile;
557 }
558
559 struct case_source *
560 storage_source_create (struct casefile *cf)
561 {
562   struct storage_stream_info *info;
563
564   info = xmalloc (sizeof *info);
565   info->casefile = cf;
566
567   return create_case_source (&storage_source_class, info);
568 }
569 \f
570 /* Null sink.  Used by a few procedures that keep track of output
571    themselves and would throw away anything that the sink
572    contained anyway. */
573
574 const struct case_sink_class null_sink_class = 
575   {
576     "null",
577     NULL,
578     NULL,
579     NULL,
580     NULL,
581   };
582 \f
583 /* Returns a pointer to the lagged case from N_BEFORE cases before the
584    current one, or NULL if there haven't been that many cases yet. */
585 struct ccase *
586 lagged_case (int n_before)
587 {
588   assert (n_before >= 1 );
589   assert (n_before <= n_lag);
590
591   if (n_before <= lag_count)
592     {
593       int index = lag_head - n_before;
594       if (index < 0)
595         index += n_lag;
596       return &lag_queue[index];
597     }
598   else
599     return NULL;
600 }
601    
602 /* Appends TRNS to t_trns[], the list of all transformations to be
603    performed on data as it is read from the active file. */
604 void
605 add_transformation (struct trns_header * trns)
606 {
607   if (n_trns >= m_trns)
608     {
609       m_trns += 16;
610       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
611     }
612   t_trns[n_trns] = trns;
613   trns->index = n_trns++;
614 }
615
616 /* Cancels all active transformations, including any transformations
617    created by the input program. */
618 void
619 cancel_transformations (void)
620 {
621   int i;
622   for (i = 0; i < n_trns; i++)
623     {
624       if (t_trns[i]->free)
625         t_trns[i]->free (t_trns[i]);
626       free (t_trns[i]);
627     }
628   n_trns = f_trns = 0;
629   free (t_trns);
630   t_trns = NULL;
631   m_trns = 0;
632 }
633 \f
634 /* Creates a case source with class CLASS and auxiliary data AUX
635    and based on dictionary DICT. */
636 struct case_source *
637 create_case_source (const struct case_source_class *class,
638                     void *aux) 
639 {
640   struct case_source *source = xmalloc (sizeof *source);
641   source->class = class;
642   source->aux = aux;
643   return source;
644 }
645
646 /* Destroys case source SOURCE.  It is the caller's responsible to
647    call the source's destroy function, if any. */
648 void
649 free_case_source (struct case_source *source) 
650 {
651   if (source != NULL) 
652     {
653       if (source->class->destroy != NULL)
654         source->class->destroy (source);
655       free (source);
656     }
657 }
658
659 /* Returns nonzero if a case source is "complex". */
660 int
661 case_source_is_complex (const struct case_source *source) 
662 {
663   return source != NULL && (source->class == &input_program_source_class
664                             || source->class == &file_type_source_class);
665 }
666
667 /* Returns nonzero if CLASS is the class of SOURCE. */
668 int
669 case_source_is_class (const struct case_source *source,
670                       const struct case_source_class *class) 
671 {
672   return source != NULL && source->class == class;
673 }
674
675 /* Creates a case sink to accept cases from the given DICT with
676    class CLASS and auxiliary data AUX. */
677 struct case_sink *
678 create_case_sink (const struct case_sink_class *class,
679                   const struct dictionary *dict,
680                   void *aux) 
681 {
682   struct case_sink *sink = xmalloc (sizeof *sink);
683   sink->class = class;
684   sink->value_cnt = dict_get_compacted_value_cnt (dict);
685   sink->aux = aux;
686   return sink;
687 }
688
689 /* Destroys case sink SINK.  */
690 void
691 free_case_sink (struct case_sink *sink) 
692 {
693   if (sink != NULL) 
694     {
695       if (sink->class->destroy != NULL)
696         sink->class->destroy (sink);
697       free (sink); 
698     }
699 }
700 \f
701 /* Represents auxiliary data for handling SPLIT FILE. */
702 struct split_aux_data 
703   {
704     size_t case_count;          /* Number of cases so far. */
705     struct ccase prev_case;     /* Data in previous case. */
706
707     /* Functions to call... */
708     void (*begin_func) (void *);               /* ...before data. */
709     int (*proc_func) (struct ccase *, void *); /* ...with data. */
710     void (*end_func) (void *);                 /* ...after data. */
711     void *func_aux;                            /* Auxiliary data. */ 
712   };
713
714 static int equal_splits (const struct ccase *, const struct ccase *);
715 static int procedure_with_splits_callback (struct ccase *, void *);
716 static void dump_splits (struct ccase *);
717
718 /* Like procedure(), but it automatically breaks the case stream
719    into SPLIT FILE break groups.  Before each group of cases with
720    identical SPLIT FILE variable values, BEGIN_FUNC is called.
721    Then PROC_FUNC is called with each case in the group.  
722    END_FUNC is called when the group is finished.  FUNC_AUX is
723    passed to each of the functions as auxiliary data.
724
725    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
726    and END_FUNC will be called at all. 
727
728    If SPLIT FILE is not in effect, then there is one break group
729    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
730    will be called once. */
731 void
732 procedure_with_splits (void (*begin_func) (void *aux),
733                        int (*proc_func) (struct ccase *, void *aux),
734                        void (*end_func) (void *aux),
735                        void *func_aux) 
736 {
737   struct split_aux_data split_aux;
738
739   split_aux.case_count = 0;
740   case_nullify (&split_aux.prev_case);
741   split_aux.begin_func = begin_func;
742   split_aux.proc_func = proc_func;
743   split_aux.end_func = end_func;
744   split_aux.func_aux = func_aux;
745
746   open_active_file ();
747   internal_procedure (procedure_with_splits_callback, &split_aux);
748   if (split_aux.case_count > 0 && end_func != NULL)
749     end_func (func_aux);
750   close_active_file ();
751
752   case_destroy (&split_aux.prev_case);
753 }
754
755 /* procedure() callback used by procedure_with_splits(). */
756 static int
757 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
758 {
759   struct split_aux_data *split_aux = split_aux_;
760
761   /* Start a new series if needed. */
762   if (split_aux->case_count == 0
763       || !equal_splits (c, &split_aux->prev_case))
764     {
765       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
766         split_aux->end_func (split_aux->func_aux);
767
768       dump_splits (c);
769       case_destroy (&split_aux->prev_case);
770       case_clone (&split_aux->prev_case, c);
771
772       if (split_aux->begin_func != NULL)
773         split_aux->begin_func (split_aux->func_aux);
774     }
775
776   split_aux->case_count++;
777   if (split_aux->proc_func != NULL)
778     return split_aux->proc_func (c, split_aux->func_aux);
779   else
780     return 1;
781 }
782
783 /* Compares the SPLIT FILE variables in cases A and B and returns
784    nonzero only if they differ. */
785 static int
786 equal_splits (const struct ccase *a, const struct ccase *b) 
787 {
788   return case_compare (a, b,
789                        dict_get_split_vars (default_dict),
790                        dict_get_split_cnt (default_dict)) == 0;
791 }
792
793 /* Dumps out the values of all the split variables for the case C. */
794 static void
795 dump_splits (struct ccase *c)
796 {
797   struct variable *const *split;
798   struct tab_table *t;
799   size_t split_cnt;
800   int i;
801
802   split_cnt = dict_get_split_cnt (default_dict);
803   if (split_cnt == 0)
804     return;
805
806   t = tab_create (3, split_cnt + 1, 0);
807   tab_dim (t, tab_natural_dimensions);
808   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
809   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
810   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
811   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
812   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
813   split = dict_get_split_vars (default_dict);
814   for (i = 0; i < split_cnt; i++)
815     {
816       struct variable *v = split[i];
817       char temp_buf[80];
818       const char *val_lab;
819
820       assert (v->type == NUMERIC || v->type == ALPHA);
821       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
822       
823       data_out (temp_buf, &v->print, case_data (c, v->fv));
824       
825       temp_buf[v->print.w] = 0;
826       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
827
828       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
829       if (val_lab)
830         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
831     }
832   tab_flags (t, SOMF_NO_TITLE);
833   tab_submit (t);
834 }
835 \f
836 /* Represents auxiliary data for handling SPLIT FILE in a
837    multipass procedure. */
838 struct multipass_split_aux_data 
839   {
840     struct ccase prev_case;     /* Data in previous case. */
841     struct casefile *casefile;  /* Accumulates data for a split. */
842
843     /* Function to call with the accumulated data. */
844     void (*split_func) (const struct casefile *, void *);
845     void *func_aux;                            /* Auxiliary data. */ 
846   };
847
848 static int multipass_split_callback (struct ccase *c, void *aux_);
849 static void multipass_split_output (struct multipass_split_aux_data *);
850
851 void
852 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
853                                                      void *),
854                                  void *func_aux) 
855 {
856   struct multipass_split_aux_data aux;
857
858   assert (split_func != NULL);
859
860   open_active_file ();
861
862   case_nullify (&aux.prev_case);
863   aux.casefile = NULL;
864   aux.split_func = split_func;
865   aux.func_aux = func_aux;
866
867   internal_procedure (multipass_split_callback, &aux);
868   if (aux.casefile != NULL)
869     multipass_split_output (&aux);
870   case_destroy (&aux.prev_case);
871
872   close_active_file ();
873 }
874
875 /* procedure() callback used by multipass_procedure_with_splits(). */
876 static int
877 multipass_split_callback (struct ccase *c, void *aux_)
878 {
879   struct multipass_split_aux_data *aux = aux_;
880
881   /* Start a new series if needed. */
882   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
883     {
884       /* Pass any cases to split_func. */
885       if (aux->casefile != NULL)
886         multipass_split_output (aux);
887
888       /* Start a new casefile. */
889       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
890
891       /* Record split values. */
892       dump_splits (c);
893       case_destroy (&aux->prev_case);
894       case_clone (&aux->prev_case, c);
895     }
896
897   casefile_append (aux->casefile, c);
898
899   return 1;
900 }
901
902 static void
903 multipass_split_output (struct multipass_split_aux_data *aux)
904 {
905   assert (aux->casefile != NULL);
906   aux->split_func (aux->casefile, aux->func_aux);
907   casefile_destroy (aux->casefile);
908   aux->casefile = NULL;
909 }