Add multipass procedures. Add two-pass moments calculation. Rewrite
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "casefile.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "settings.h"
38 #include "som.h"
39 #include "str.h"
40 #include "tab.h"
41 #include "var.h"
42 #include "value-labels.h"
43
44 /*
45    Virtual File Manager (vfm):
46
47    vfm is used to process data files.  It uses the model that
48    data is read from one stream (the data source), processed,
49    then written to another (the data sink).  The data source is
50    then deleted and the data sink becomes the data source for the
51    next procedure. */
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     /* Function to call for each case. */
57     int (*proc_func) (struct ccase *, void *); /* Function. */
58     void *aux;                                 /* Auxiliary data. */ 
59
60     struct ccase *trns_case;    /* Case used for transformations. */
61     struct ccase *sink_case;    /* Case written to sink, if
62                                    compaction is necessary. */
63     size_t cases_written;       /* Cases output so far. */
64     size_t cases_analyzed;      /* Cases passed to procedure so far. */
65   };
66
67 /* The current active file, from which cases are read. */
68 struct case_source *vfm_source;
69
70 /* The replacement active file, to which cases are written. */
71 struct case_sink *vfm_sink;
72
73 /* Nonzero if the case needs to have values deleted before being
74    stored, zero otherwise. */
75 static int compaction_necessary;
76
77 /* Time at which vfm was last invoked. */
78 time_t last_vfm_invocation;
79
80 /* Lag queue. */
81 int n_lag;                      /* Number of cases to lag. */
82 static int lag_count;           /* Number of cases in lag_queue so far. */
83 static int lag_head;            /* Index where next case will be added. */
84 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
85
86 static struct ccase *create_trns_case (struct dictionary *);
87 static void open_active_file (void);
88 static int write_case (struct write_case_data *wc_data);
89 static int execute_transformations (struct ccase *c,
90                                     struct trns_header **trns,
91                                     int first_idx, int last_idx,
92                                     int case_num);
93 static int filter_case (const struct ccase *c, int case_num);
94 static void lag_case (const struct ccase *c);
95 static void compact_case (struct ccase *dest, const struct ccase *src);
96 static void clear_case (struct ccase *c);
97 static void close_active_file (void);
98 \f
99 /* Public functions. */
100
101 /* Reads the data from the input program and writes it to a new
102    active file.  For each case we read from the input program, we
103    do the following
104
105    1. Execute permanent transformations.  If these drop the case,
106       start the next case from step 1.
107
108    2. N OF CASES.  If we have already written N cases, start the
109       next case from step 1.
110    
111    3. Write case to replacement active file.
112    
113    4. Execute temporary transformations.  If these drop the case,
114       start the next case from step 1.
115       
116    5. FILTER, PROCESS IF.  If these drop the case, start the next
117       case from step 1.
118    
119    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
120       cases, start the next case from step 1.
121       
122    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
123 void
124 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
125 {
126   static int recursive_call;
127
128   struct write_case_data wc_data;
129
130   assert (++recursive_call == 1);
131
132   wc_data.proc_func = proc_func;
133   wc_data.aux = aux;
134   wc_data.trns_case = create_trns_case (default_dict);
135   wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
136   wc_data.cases_written = 0;
137
138   last_vfm_invocation = time (NULL);
139
140   open_active_file ();
141   if (vfm_source != NULL) 
142     vfm_source->class->read (vfm_source,
143                              wc_data.trns_case,
144                              write_case, &wc_data);
145   close_active_file ();
146
147   free (wc_data.sink_case);
148   free (wc_data.trns_case);
149
150   assert (--recursive_call == 0);
151 }
152
153 /* Creates and returns a case, initializing it from the vectors
154    that say which `value's need to be initialized just once, and
155    which ones need to be re-initialized before every case. */
156 static struct ccase *
157 create_trns_case (struct dictionary *dict)
158 {
159   struct ccase *c = xmalloc (dict_get_case_size (dict));
160   size_t var_cnt = dict_get_var_cnt (dict);
161   size_t i;
162
163   for (i = 0; i < var_cnt; i++) 
164     {
165       struct variable *v = dict_get_var (dict, i);
166
167       if (v->type == NUMERIC) 
168         {
169           if (v->reinit)
170             c->data[v->fv].f = 0.0;
171           else
172             c->data[v->fv].f = SYSMIS;
173         }
174       else
175         memset (c->data[v->fv].s, ' ', v->width);
176     }
177   return c;
178 }
179
180 /* Makes all preparations for reading from the data source and writing
181    to the data sink. */
182 static void
183 open_active_file (void)
184 {
185   /* Make temp_dict refer to the dictionary right before data
186      reaches the sink */
187   if (!temporary)
188     {
189       temp_trns = n_trns;
190       temp_dict = default_dict;
191     }
192
193   /* Figure out compaction. */
194   compaction_necessary = (dict_get_next_value_idx (temp_dict)
195                           != dict_get_compacted_value_cnt (temp_dict));
196
197   /* Prepare sink. */
198   if (vfm_sink == NULL)
199     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
200   if (vfm_sink->class->open != NULL)
201     vfm_sink->class->open (vfm_sink);
202
203   /* Allocate memory for lag queue. */
204   if (n_lag > 0)
205     {
206       int i;
207   
208       lag_count = 0;
209       lag_head = 0;
210       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
211       for (i = 0; i < n_lag; i++)
212         lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
213     }
214
215   /* Close any unclosed DO IF or LOOP constructs. */
216   discard_ctl_stack ();
217 }
218
219 /* Transforms trns_case and writes it to the replacement active
220    file if advisable.  Returns nonzero if more cases can be
221    accepted, zero otherwise.  Do not call this function again
222    after it has returned zero once.  */
223 static int
224 write_case (struct write_case_data *wc_data)
225 {
226   /* Execute permanent transformations.  */
227   if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
228                                 wc_data->cases_written + 1))
229     goto done;
230
231   /* N OF CASES. */
232   if (dict_get_case_limit (default_dict)
233       && wc_data->cases_written >= dict_get_case_limit (default_dict))
234     goto done;
235   wc_data->cases_written++;
236
237   /* Write case to LAG queue. */
238   if (n_lag)
239     lag_case (wc_data->trns_case);
240
241   /* Write case to replacement active file. */
242   if (vfm_sink->class->write != NULL) 
243     {
244       if (compaction_necessary) 
245         {
246           compact_case (wc_data->sink_case, wc_data->trns_case);
247           vfm_sink->class->write (vfm_sink, wc_data->sink_case);
248         }
249       else
250         vfm_sink->class->write (vfm_sink, wc_data->trns_case);
251     }
252   
253   /* Execute temporary transformations. */
254   if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
255                                 wc_data->cases_written))
256     goto done;
257   
258   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
259   if (filter_case (wc_data->trns_case, wc_data->cases_written)
260       || (dict_get_case_limit (temp_dict)
261           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
262     goto done;
263   wc_data->cases_analyzed++;
264
265   /* Pass case to procedure. */
266   if (wc_data->proc_func != NULL)
267     wc_data->proc_func (wc_data->trns_case, wc_data->aux);
268
269  done:
270   clear_case (wc_data->trns_case);
271   return 1;
272 }
273
274 /* Transforms case C using the transformations in TRNS[] with
275    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
276    become case CASE_NUM (1-based) in the output file.  Returns
277    zero if the case was filtered out by one of the
278    transformations, nonzero otherwise. */
279 static int
280 execute_transformations (struct ccase *c,
281                          struct trns_header **trns,
282                          int first_idx, int last_idx,
283                          int case_num) 
284 {
285   int idx;
286
287   for (idx = first_idx; idx != last_idx; )
288     {
289       int retval = trns[idx]->proc (trns[idx], c, case_num);
290       switch (retval)
291         {
292         case -1:
293           idx++;
294           break;
295           
296         case -2:
297           return 0;
298           
299         default:
300           idx = retval;
301           break;
302         }
303     }
304
305   return 1;
306 }
307
308 /* Returns nonzero if case C with case number CASE_NUM should be
309    exclude as specified on FILTER or PROCESS IF, otherwise
310    zero. */
311 static int
312 filter_case (const struct ccase *c, int case_num)
313 {
314   /* FILTER. */
315   struct variable *filter_var = dict_get_filter (default_dict);
316   if (filter_var != NULL) 
317     {
318       double f = c->data[filter_var->fv].f;
319       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
320         return 1;
321     }
322
323   /* PROCESS IF. */
324   if (process_if_expr != NULL
325       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
326     return 1;
327
328   return 0;
329 }
330
331 /* Add C to the lag queue. */
332 static void
333 lag_case (const struct ccase *c)
334 {
335   if (lag_count < n_lag)
336     lag_count++;
337   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
338   if (++lag_head >= n_lag)
339     lag_head = 0;
340 }
341
342 /* Copies case SRC to case DEST, compacting it in the process. */
343 static void
344 compact_case (struct ccase *dest, const struct ccase *src)
345 {
346   int i;
347   int nval = 0;
348   size_t var_cnt;
349   
350   assert (compaction_necessary);
351
352   /* Copy all the variables except scratch variables from SRC to
353      DEST. */
354   /* FIXME: this should be temp_dict not default_dict I guess. */
355   var_cnt = dict_get_var_cnt (default_dict);
356   for (i = 0; i < var_cnt; i++)
357     {
358       struct variable *v = dict_get_var (default_dict, i);
359       
360       if (dict_class_from_id (v->name) == DC_SCRATCH)
361         continue;
362
363       if (v->type == NUMERIC)
364         dest->data[nval++] = src->data[v->fv];
365       else
366         {
367           int w = DIV_RND_UP (v->width, sizeof (union value));
368           
369           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
370           nval += w;
371         }
372     }
373 }
374
375 /* Clears the variables in C that need to be cleared between
376    processing cases.  */
377 static void
378 clear_case (struct ccase *c)
379 {
380   size_t var_cnt = dict_get_var_cnt (default_dict);
381   size_t i;
382   
383   for (i = 0; i < var_cnt; i++) 
384     {
385       struct variable *v = dict_get_var (default_dict, i);
386       if (v->init && v->reinit) 
387         {
388           if (v->type == NUMERIC) 
389             c->data[v->fv].f = SYSMIS;
390           else
391             memset (c->data[v->fv].s, ' ', v->width);
392         } 
393     }
394 }
395
396 /* Closes the active file. */
397 static void
398 close_active_file (void)
399 {
400   /* Free memory for lag queue, and turn off lagging. */
401   if (n_lag > 0)
402     {
403       int i;
404       
405       for (i = 0; i < n_lag; i++)
406         free (lag_queue[i]);
407       free (lag_queue);
408       n_lag = 0;
409     }
410   
411   /* Dictionary from before TEMPORARY becomes permanent.. */
412   if (temporary)
413     {
414       dict_destroy (default_dict);
415       default_dict = temp_dict;
416       temp_dict = NULL;
417     }
418
419   /* Finish compaction. */
420   if (compaction_necessary)
421     dict_compact_values (default_dict);
422     
423   /* Free data source. */
424   if (vfm_source != NULL) 
425     {
426       if (vfm_source->class->destroy != NULL)
427         vfm_source->class->destroy (vfm_source);
428       free (vfm_source);
429     }
430
431   /* Old data sink becomes new data source. */
432   if (vfm_sink->class->make_source != NULL)
433     vfm_source = vfm_sink->class->make_source (vfm_sink);
434   else 
435     {
436       if (vfm_sink->class->destroy != NULL)
437         vfm_sink->class->destroy (vfm_sink);
438       vfm_source = NULL; 
439     }
440   free_case_sink (vfm_sink);
441   vfm_sink = NULL;
442
443   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
444      and get rid of all the transformations. */
445   cancel_temporary ();
446   expr_free (process_if_expr);
447   process_if_expr = NULL;
448   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
449     dict_set_filter (default_dict, NULL);
450   dict_set_case_limit (default_dict, 0);
451   dict_clear_vectors (default_dict);
452   cancel_transformations ();
453 }
454 \f
455 /* Storage case stream. */
456
457 /* Information about storage sink or source. */
458 struct storage_stream_info 
459   {
460     struct casefile *casefile;  /* Storage. */
461   };
462
463 /* Initializes a storage sink. */
464 static void
465 storage_sink_open (struct case_sink *sink)
466 {
467   struct storage_stream_info *info;
468
469   sink->aux = info = xmalloc (sizeof *info);
470   info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
471 }
472
473 /* Destroys storage stream represented by INFO. */
474 static void
475 destroy_storage_stream_info (struct storage_stream_info *info) 
476 {
477   casefile_destroy (info->casefile);
478   free (info); 
479 }
480
481 /* Writes case C to the storage sink SINK. */
482 static void
483 storage_sink_write (struct case_sink *sink, const struct ccase *c)
484 {
485   struct storage_stream_info *info = sink->aux;
486
487   casefile_append (info->casefile, c);
488 }
489
490 /* Destroys internal data in SINK. */
491 static void
492 storage_sink_destroy (struct case_sink *sink)
493 {
494   destroy_storage_stream_info (sink->aux);
495 }
496
497 /* Closes and destroys the sink and returns a storage source to
498    read back the written data. */
499 static struct case_source *
500 storage_sink_make_source (struct case_sink *sink) 
501 {
502   return create_case_source (&storage_source_class, sink->dict, sink->aux);
503 }
504
505 /* Storage sink. */
506 const struct case_sink_class storage_sink_class = 
507   {
508     "storage",
509     storage_sink_open,
510     storage_sink_write,
511     storage_sink_destroy,
512     storage_sink_make_source,
513   };
514 \f
515 /* Storage source. */
516
517 /* Returns the number of cases that will be read by
518    storage_source_read(). */
519 static int
520 storage_source_count (const struct case_source *source) 
521 {
522   struct storage_stream_info *info = source->aux;
523
524   return casefile_get_case_cnt (info->casefile);
525 }
526
527 /* Reads all cases from the storage source and passes them one by one to
528    write_case(). */
529 static void
530 storage_source_read (struct case_source *source,
531                      struct ccase *output_case,
532                      write_case_func *write_case, write_case_data wc_data)
533 {
534   struct storage_stream_info *info = source->aux;
535   const struct ccase *casefile_case;
536   struct casereader *reader;
537
538   reader = casefile_get_reader (info->casefile);
539   while (casereader_read (reader, &casefile_case))
540     {
541       memcpy (output_case, casefile_case,
542               casefile_get_case_size (info->casefile));
543       write_case (wc_data);
544     }
545   casereader_destroy (reader);
546 }
547
548 /* Destroys the source's internal data. */
549 static void
550 storage_source_destroy (struct case_source *source)
551 {
552   destroy_storage_stream_info (source->aux);
553 }
554
555 /* Storage source. */
556 const struct case_source_class storage_source_class = 
557   {
558     "storage",
559     storage_source_count,
560     storage_source_read,
561     storage_source_destroy,
562   };
563
564 struct casefile *
565 storage_source_get_casefile (struct case_source *source) 
566 {
567   struct storage_stream_info *info = source->aux;
568
569   assert (source->class == &storage_source_class);
570   return info->casefile;
571 }
572 \f
573 /* Null sink.  Used by a few procedures that keep track of output
574    themselves and would throw away anything that the sink
575    contained anyway. */
576
577 const struct case_sink_class null_sink_class = 
578   {
579     "null",
580     NULL,
581     NULL,
582     NULL,
583     NULL,
584   };
585 \f
586 /* Returns a pointer to the lagged case from N_BEFORE cases before the
587    current one, or NULL if there haven't been that many cases yet. */
588 struct ccase *
589 lagged_case (int n_before)
590 {
591   assert (n_before <= n_lag);
592   if (n_before > lag_count)
593     return NULL;
594   
595   {
596     int index = lag_head - n_before;
597     if (index < 0)
598       index += n_lag;
599     return lag_queue[index];
600   }
601 }
602    
603 /* Appends TRNS to t_trns[], the list of all transformations to be
604    performed on data as it is read from the active file. */
605 void
606 add_transformation (struct trns_header * trns)
607 {
608   if (n_trns >= m_trns)
609     {
610       m_trns += 16;
611       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
612     }
613   t_trns[n_trns] = trns;
614   trns->index = n_trns++;
615 }
616
617 /* Cancels all active transformations, including any transformations
618    created by the input program. */
619 void
620 cancel_transformations (void)
621 {
622   int i;
623   for (i = 0; i < n_trns; i++)
624     {
625       if (t_trns[i]->free)
626         t_trns[i]->free (t_trns[i]);
627       free (t_trns[i]);
628     }
629   n_trns = f_trns = 0;
630   if (m_trns > 32)
631     {
632       free (t_trns);
633       m_trns = 0;
634     }
635 }
636 \f
637 /* Creates a case source with class CLASS and auxiliary data AUX
638    and based on dictionary DICT. */
639 struct case_source *
640 create_case_source (const struct case_source_class *class,
641                     const struct dictionary *dict,
642                     void *aux) 
643 {
644   struct case_source *source = xmalloc (sizeof *source);
645   source->class = class;
646   source->value_cnt = dict_get_next_value_idx (dict);
647   source->aux = aux;
648   return source;
649 }
650
651 /* Returns nonzero if a case source is "complex". */
652 int
653 case_source_is_complex (const struct case_source *source) 
654 {
655   return source != NULL && (source->class == &input_program_source_class
656                             || source->class == &file_type_source_class);
657 }
658
659 /* Returns nonzero if CLASS is the class of SOURCE. */
660 int
661 case_source_is_class (const struct case_source *source,
662                       const struct case_source_class *class) 
663 {
664   return source != NULL && source->class == class;
665 }
666
667 /* Creates a case sink with class CLASS and auxiliary data
668    AUX. */
669 struct case_sink *
670 create_case_sink (const struct case_sink_class *class,
671                   const struct dictionary *dict,
672                   void *aux) 
673 {
674   struct case_sink *sink = xmalloc (sizeof *sink);
675   sink->class = class;
676   sink->dict = dict;
677   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
678   sink->value_cnt = dict_get_compacted_value_cnt (dict);
679   sink->aux = aux;
680   return sink;
681 }
682
683 /* Destroys case sink SINK.  It is the caller's responsible to
684    call the sink's destroy function, if any. */
685 void
686 free_case_sink (struct case_sink *sink) 
687 {
688   free (sink->idx_to_fv);
689   free (sink);
690 }
691 \f
692 /* Represents auxiliary data for handling SPLIT FILE. */
693 struct split_aux_data 
694   {
695     size_t case_count;          /* Number of cases so far. */
696     struct ccase *prev_case;    /* Data in previous case. */
697
698     /* Functions to call... */
699     void (*begin_func) (void *);               /* ...before data. */
700     int (*proc_func) (struct ccase *, void *); /* ...with data. */
701     void (*end_func) (void *);                 /* ...after data. */
702     void *func_aux;                            /* Auxiliary data. */ 
703   };
704
705 static int equal_splits (const struct ccase *, const struct ccase *);
706 static int procedure_with_splits_callback (struct ccase *, void *);
707 static void dump_splits (struct ccase *);
708
709 /* Like procedure(), but it automatically breaks the case stream
710    into SPLIT FILE break groups.  Before each group of cases with
711    identical SPLIT FILE variable values, BEGIN_FUNC is called.
712    Then PROC_FUNC is called with each case in the group.  
713    END_FUNC is called when the group is finished.  FUNC_AUX is
714    passed to each of the functions as auxiliary data.
715
716    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
717    and END_FUNC will be called at all. 
718
719    If SPLIT FILE is not in effect, then there is one break group
720    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
721    will be called once. */
722 void
723 procedure_with_splits (void (*begin_func) (void *aux),
724                        int (*proc_func) (struct ccase *, void *aux),
725                        void (*end_func) (void *aux),
726                        void *func_aux) 
727 {
728   struct split_aux_data split_aux;
729
730   split_aux.case_count = 0;
731   split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
732   split_aux.begin_func = begin_func;
733   split_aux.proc_func = proc_func;
734   split_aux.end_func = end_func;
735   split_aux.func_aux = func_aux;
736
737   procedure (procedure_with_splits_callback, &split_aux);
738
739   if (split_aux.case_count > 0 && end_func != NULL)
740     end_func (func_aux);
741   free (split_aux.prev_case);
742 }
743
744 /* procedure() callback used by procedure_with_splits(). */
745 static int
746 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
747 {
748   struct split_aux_data *split_aux = split_aux_;
749
750   /* Start a new series if needed. */
751   if (split_aux->case_count == 0
752       || !equal_splits (c, split_aux->prev_case))
753     {
754       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
755         split_aux->end_func (split_aux->func_aux);
756
757       dump_splits (c);
758       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
759
760       if (split_aux->begin_func != NULL)
761         split_aux->begin_func (split_aux->func_aux);
762     }
763
764   split_aux->case_count++;
765   if (split_aux->proc_func != NULL)
766     return split_aux->proc_func (c, split_aux->func_aux);
767   else
768     return 1;
769 }
770
771 /* Compares the SPLIT FILE variables in cases A and B and returns
772    nonzero only if they differ. */
773 static int
774 equal_splits (const struct ccase *a, const struct ccase *b) 
775 {
776   struct variable *const *split;
777   size_t split_cnt;
778   size_t i;
779     
780   split = dict_get_split_vars (default_dict);
781   split_cnt = dict_get_split_cnt (default_dict);
782   for (i = 0; i < split_cnt; i++)
783     {
784       struct variable *v = split[i];
785       
786       switch (v->type)
787         {
788         case NUMERIC:
789           if (a->data[v->fv].f != b->data[v->fv].f)
790             return 0;
791           break;
792         case ALPHA:
793           if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
794             return 0;
795           break;
796         default:
797           assert (0);
798         }
799     }
800
801   return 1;
802 }
803
804 /* Dumps out the values of all the split variables for the case C. */
805 static void
806 dump_splits (struct ccase *c)
807 {
808   struct variable *const *split;
809   struct tab_table *t;
810   size_t split_cnt;
811   int i;
812
813   split_cnt = dict_get_split_cnt (default_dict);
814   if (split_cnt == 0)
815     return;
816
817   t = tab_create (3, split_cnt + 1, 0);
818   tab_dim (t, tab_natural_dimensions);
819   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
820   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
821   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
822   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
823   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
824   split = dict_get_split_vars (default_dict);
825   for (i = 0; i < split_cnt; i++)
826     {
827       struct variable *v = split[i];
828       char temp_buf[80];
829       const char *val_lab;
830
831       assert (v->type == NUMERIC || v->type == ALPHA);
832       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
833       
834       data_out (temp_buf, &v->print, &c->data[v->fv]);
835       
836       temp_buf[v->print.w] = 0;
837       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
838
839       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
840       if (val_lab)
841         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
842     }
843   tab_flags (t, SOMF_NO_TITLE);
844   tab_submit (t);
845 }
846 \f
847 /* Represents auxiliary data for handling SPLIT FILE in a
848    multipass procedure. */
849 struct multipass_split_aux_data 
850   {
851     struct ccase *prev_case;    /* Data in previous case. */
852     struct casefile *casefile;  /* Accumulates data for a split. */
853
854     /* Function to call with the accumulated data. */
855     void (*split_func) (const struct casefile *, void *);
856     void *func_aux;                            /* Auxiliary data. */ 
857   };
858
859 static int multipass_split_callback (struct ccase *c, void *aux_);
860 static void multipass_split_output (struct multipass_split_aux_data *);
861
862 void
863 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
864                                                      void *),
865                                  void *func_aux) 
866 {
867   struct multipass_split_aux_data aux;
868
869   assert (split_func != NULL);
870
871   aux.prev_case = xmalloc (dict_get_case_size (default_dict));
872   aux.casefile = NULL;
873   aux.split_func = split_func;
874   aux.func_aux = func_aux;
875   
876   procedure (multipass_split_callback, &aux);
877
878   if (aux.casefile != NULL)
879     multipass_split_output (&aux);
880   free (aux.prev_case);
881 }
882
883 /* procedure() callback used by multipass_procedure_with_splits(). */
884 static int
885 multipass_split_callback (struct ccase *c, void *aux_)
886 {
887   struct multipass_split_aux_data *aux = aux_;
888
889   /* Start a new series if needed. */
890   if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
891     {
892       /* Pass any cases to split_func. */
893       if (aux->casefile != NULL)
894         multipass_split_output (aux);
895
896       /* Start a new casefile. */
897       aux->casefile = casefile_create (dict_get_case_size (default_dict));
898
899       /* Record split values. */
900       dump_splits (c);
901       memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
902     }
903
904   casefile_append (aux->casefile, c);
905
906   return 1;
907 }
908
909 static void
910 multipass_split_output (struct multipass_split_aux_data *aux)
911 {
912   assert (aux->casefile != NULL);
913   aux->split_func (aux->casefile, aux->func_aux);
914   casefile_destroy (aux->casefile);
915   aux->casefile = NULL;
916 }