Had to get last call to multipass_split_output() inside
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "casefile.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "settings.h"
38 #include "som.h"
39 #include "str.h"
40 #include "tab.h"
41 #include "var.h"
42 #include "value-labels.h"
43
44 /*
45    Virtual File Manager (vfm):
46
47    vfm is used to process data files.  It uses the model that
48    data is read from one stream (the data source), processed,
49    then written to another (the data sink).  The data source is
50    then deleted and the data sink becomes the data source for the
51    next procedure. */
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     /* Function to call for each case. */
57     int (*proc_func) (struct ccase *, void *); /* Function. */
58     void *aux;                                 /* Auxiliary data. */ 
59
60     struct ccase *trns_case;    /* Case used for transformations. */
61     struct ccase *sink_case;    /* Case written to sink, if
62                                    compaction is necessary. */
63     size_t cases_written;       /* Cases output so far. */
64     size_t cases_analyzed;      /* Cases passed to procedure so far. */
65   };
66
67 /* The current active file, from which cases are read. */
68 struct case_source *vfm_source;
69
70 /* The replacement active file, to which cases are written. */
71 struct case_sink *vfm_sink;
72
73 /* Nonzero if the case needs to have values deleted before being
74    stored, zero otherwise. */
75 static int compaction_necessary;
76
77 /* Time at which vfm was last invoked. */
78 time_t last_vfm_invocation;
79
80 /* Lag queue. */
81 int n_lag;                      /* Number of cases to lag. */
82 static int lag_count;           /* Number of cases in lag_queue so far. */
83 static int lag_head;            /* Index where next case will be added. */
84 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
85
86 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
87                                 void *aux);
88 static struct ccase *create_trns_case (struct dictionary *);
89 static void open_active_file (void);
90 static int write_case (struct write_case_data *wc_data);
91 static int execute_transformations (struct ccase *c,
92                                     struct trns_header **trns,
93                                     int first_idx, int last_idx,
94                                     int case_num);
95 static int filter_case (const struct ccase *c, int case_num);
96 static void lag_case (const struct ccase *c);
97 static void compact_case (struct ccase *dest, const struct ccase *src);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   open_active_file ();
129   internal_procedure (proc_func, aux);
130   close_active_file ();
131 }
132
133 /* Executes a procedure, as procedure(), except that the caller
134    is responsible for calling open_active_file() and
135    close_active_file(). */
136 static void
137 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
138 {
139   static int recursive_call;
140
141   struct write_case_data wc_data;
142
143   assert (++recursive_call == 1);
144
145   wc_data.proc_func = proc_func;
146   wc_data.aux = aux;
147   wc_data.trns_case = create_trns_case (default_dict);
148   wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
149   wc_data.cases_written = 0;
150
151   last_vfm_invocation = time (NULL);
152
153   if (vfm_source != NULL) 
154     vfm_source->class->read (vfm_source,
155                              wc_data.trns_case,
156                              write_case, &wc_data);
157
158   free (wc_data.sink_case);
159   free (wc_data.trns_case);
160
161   assert (--recursive_call == 0);
162 }
163
164 /* Creates and returns a case, initializing it from the vectors
165    that say which `value's need to be initialized just once, and
166    which ones need to be re-initialized before every case. */
167 static struct ccase *
168 create_trns_case (struct dictionary *dict)
169 {
170   struct ccase *c = xmalloc (dict_get_case_size (dict));
171   size_t var_cnt = dict_get_var_cnt (dict);
172   size_t i;
173
174   for (i = 0; i < var_cnt; i++) 
175     {
176       struct variable *v = dict_get_var (dict, i);
177
178       if (v->type == NUMERIC) 
179         {
180           if (v->reinit)
181             c->data[v->fv].f = 0.0;
182           else
183             c->data[v->fv].f = SYSMIS;
184         }
185       else
186         memset (c->data[v->fv].s, ' ', v->width);
187     }
188   return c;
189 }
190
191 /* Makes all preparations for reading from the data source and writing
192    to the data sink. */
193 static void
194 open_active_file (void)
195 {
196   /* Make temp_dict refer to the dictionary right before data
197      reaches the sink */
198   if (!temporary)
199     {
200       temp_trns = n_trns;
201       temp_dict = default_dict;
202     }
203
204   /* Figure out compaction. */
205   compaction_necessary = (dict_get_next_value_idx (temp_dict)
206                           != dict_get_compacted_value_cnt (temp_dict));
207
208   /* Prepare sink. */
209   if (vfm_sink == NULL)
210     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
211   if (vfm_sink->class->open != NULL)
212     vfm_sink->class->open (vfm_sink);
213
214   /* Allocate memory for lag queue. */
215   if (n_lag > 0)
216     {
217       int i;
218   
219       lag_count = 0;
220       lag_head = 0;
221       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
222       for (i = 0; i < n_lag; i++)
223         lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
224     }
225
226   /* Close any unclosed DO IF or LOOP constructs. */
227   discard_ctl_stack ();
228 }
229
230 /* Transforms trns_case and writes it to the replacement active
231    file if advisable.  Returns nonzero if more cases can be
232    accepted, zero otherwise.  Do not call this function again
233    after it has returned zero once.  */
234 static int
235 write_case (struct write_case_data *wc_data)
236 {
237   /* Execute permanent transformations.  */
238   if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
239                                 wc_data->cases_written + 1))
240     goto done;
241
242   /* N OF CASES. */
243   if (dict_get_case_limit (default_dict)
244       && wc_data->cases_written >= dict_get_case_limit (default_dict))
245     goto done;
246   wc_data->cases_written++;
247
248   /* Write case to LAG queue. */
249   if (n_lag)
250     lag_case (wc_data->trns_case);
251
252   /* Write case to replacement active file. */
253   if (vfm_sink->class->write != NULL) 
254     {
255       if (compaction_necessary) 
256         {
257           compact_case (wc_data->sink_case, wc_data->trns_case);
258           vfm_sink->class->write (vfm_sink, wc_data->sink_case);
259         }
260       else
261         vfm_sink->class->write (vfm_sink, wc_data->trns_case);
262     }
263   
264   /* Execute temporary transformations. */
265   if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
266                                 wc_data->cases_written))
267     goto done;
268   
269   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
270   if (filter_case (wc_data->trns_case, wc_data->cases_written)
271       || (dict_get_case_limit (temp_dict)
272           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
273     goto done;
274   wc_data->cases_analyzed++;
275
276   /* Pass case to procedure. */
277   if (wc_data->proc_func != NULL)
278     wc_data->proc_func (wc_data->trns_case, wc_data->aux);
279
280  done:
281   clear_case (wc_data->trns_case);
282   return 1;
283 }
284
285 /* Transforms case C using the transformations in TRNS[] with
286    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
287    become case CASE_NUM (1-based) in the output file.  Returns
288    zero if the case was filtered out by one of the
289    transformations, nonzero otherwise. */
290 static int
291 execute_transformations (struct ccase *c,
292                          struct trns_header **trns,
293                          int first_idx, int last_idx,
294                          int case_num) 
295 {
296   int idx;
297
298   for (idx = first_idx; idx != last_idx; )
299     {
300       int retval = trns[idx]->proc (trns[idx], c, case_num);
301       switch (retval)
302         {
303         case -1:
304           idx++;
305           break;
306           
307         case -2:
308           return 0;
309           
310         default:
311           idx = retval;
312           break;
313         }
314     }
315
316   return 1;
317 }
318
319 /* Returns nonzero if case C with case number CASE_NUM should be
320    exclude as specified on FILTER or PROCESS IF, otherwise
321    zero. */
322 static int
323 filter_case (const struct ccase *c, int case_num)
324 {
325   /* FILTER. */
326   struct variable *filter_var = dict_get_filter (default_dict);
327   if (filter_var != NULL) 
328     {
329       double f = c->data[filter_var->fv].f;
330       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
331         return 1;
332     }
333
334   /* PROCESS IF. */
335   if (process_if_expr != NULL
336       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
337     return 1;
338
339   return 0;
340 }
341
342 /* Add C to the lag queue. */
343 static void
344 lag_case (const struct ccase *c)
345 {
346   if (lag_count < n_lag)
347     lag_count++;
348   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
349   if (++lag_head >= n_lag)
350     lag_head = 0;
351 }
352
353 /* Copies case SRC to case DEST, compacting it in the process. */
354 static void
355 compact_case (struct ccase *dest, const struct ccase *src)
356 {
357   int i;
358   int nval = 0;
359   size_t var_cnt;
360   
361   assert (compaction_necessary);
362
363   /* Copy all the variables except scratch variables from SRC to
364      DEST. */
365   /* FIXME: this should be temp_dict not default_dict I guess. */
366   var_cnt = dict_get_var_cnt (default_dict);
367   for (i = 0; i < var_cnt; i++)
368     {
369       struct variable *v = dict_get_var (default_dict, i);
370       
371       if (dict_class_from_id (v->name) == DC_SCRATCH)
372         continue;
373
374       if (v->type == NUMERIC)
375         dest->data[nval++] = src->data[v->fv];
376       else
377         {
378           int w = DIV_RND_UP (v->width, sizeof (union value));
379           
380           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
381           nval += w;
382         }
383     }
384 }
385
386 /* Clears the variables in C that need to be cleared between
387    processing cases.  */
388 static void
389 clear_case (struct ccase *c)
390 {
391   size_t var_cnt = dict_get_var_cnt (default_dict);
392   size_t i;
393   
394   for (i = 0; i < var_cnt; i++) 
395     {
396       struct variable *v = dict_get_var (default_dict, i);
397       if (v->init && v->reinit) 
398         {
399           if (v->type == NUMERIC) 
400             c->data[v->fv].f = SYSMIS;
401           else
402             memset (c->data[v->fv].s, ' ', v->width);
403         } 
404     }
405 }
406
407 /* Closes the active file. */
408 static void
409 close_active_file (void)
410 {
411   /* Free memory for lag queue, and turn off lagging. */
412   if (n_lag > 0)
413     {
414       int i;
415       
416       for (i = 0; i < n_lag; i++)
417         free (lag_queue[i]);
418       free (lag_queue);
419       n_lag = 0;
420     }
421   
422   /* Dictionary from before TEMPORARY becomes permanent.. */
423   if (temporary)
424     {
425       dict_destroy (default_dict);
426       default_dict = temp_dict;
427       temp_dict = NULL;
428     }
429
430   /* Finish compaction. */
431   if (compaction_necessary)
432     dict_compact_values (default_dict);
433     
434   /* Free data source. */
435   if (vfm_source != NULL) 
436     {
437       if (vfm_source->class->destroy != NULL)
438         vfm_source->class->destroy (vfm_source);
439       free (vfm_source);
440     }
441
442   /* Old data sink becomes new data source. */
443   if (vfm_sink->class->make_source != NULL)
444     vfm_source = vfm_sink->class->make_source (vfm_sink);
445   else 
446     {
447       if (vfm_sink->class->destroy != NULL)
448         vfm_sink->class->destroy (vfm_sink);
449       vfm_source = NULL; 
450     }
451   free_case_sink (vfm_sink);
452   vfm_sink = NULL;
453
454   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
455      and get rid of all the transformations. */
456   cancel_temporary ();
457   expr_free (process_if_expr);
458   process_if_expr = NULL;
459   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
460     dict_set_filter (default_dict, NULL);
461   dict_set_case_limit (default_dict, 0);
462   dict_clear_vectors (default_dict);
463   cancel_transformations ();
464 }
465 \f
466 /* Storage case stream. */
467
468 /* Information about storage sink or source. */
469 struct storage_stream_info 
470   {
471     struct casefile *casefile;  /* Storage. */
472   };
473
474 /* Initializes a storage sink. */
475 static void
476 storage_sink_open (struct case_sink *sink)
477 {
478   struct storage_stream_info *info;
479
480   sink->aux = info = xmalloc (sizeof *info);
481   info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
482 }
483
484 /* Destroys storage stream represented by INFO. */
485 static void
486 destroy_storage_stream_info (struct storage_stream_info *info) 
487 {
488   casefile_destroy (info->casefile);
489   free (info); 
490 }
491
492 /* Writes case C to the storage sink SINK. */
493 static void
494 storage_sink_write (struct case_sink *sink, const struct ccase *c)
495 {
496   struct storage_stream_info *info = sink->aux;
497
498   casefile_append (info->casefile, c);
499 }
500
501 /* Destroys internal data in SINK. */
502 static void
503 storage_sink_destroy (struct case_sink *sink)
504 {
505   destroy_storage_stream_info (sink->aux);
506 }
507
508 /* Closes and destroys the sink and returns a storage source to
509    read back the written data. */
510 static struct case_source *
511 storage_sink_make_source (struct case_sink *sink) 
512 {
513   return create_case_source (&storage_source_class, sink->dict, sink->aux);
514 }
515
516 /* Storage sink. */
517 const struct case_sink_class storage_sink_class = 
518   {
519     "storage",
520     storage_sink_open,
521     storage_sink_write,
522     storage_sink_destroy,
523     storage_sink_make_source,
524   };
525 \f
526 /* Storage source. */
527
528 /* Returns the number of cases that will be read by
529    storage_source_read(). */
530 static int
531 storage_source_count (const struct case_source *source) 
532 {
533   struct storage_stream_info *info = source->aux;
534
535   return casefile_get_case_cnt (info->casefile);
536 }
537
538 /* Reads all cases from the storage source and passes them one by one to
539    write_case(). */
540 static void
541 storage_source_read (struct case_source *source,
542                      struct ccase *output_case,
543                      write_case_func *write_case, write_case_data wc_data)
544 {
545   struct storage_stream_info *info = source->aux;
546   const struct ccase *casefile_case;
547   struct casereader *reader;
548
549   reader = casefile_get_reader (info->casefile);
550   while (casereader_read (reader, &casefile_case))
551     {
552       memcpy (output_case, casefile_case,
553               casefile_get_case_size (info->casefile));
554       write_case (wc_data);
555     }
556   casereader_destroy (reader);
557 }
558
559 /* Destroys the source's internal data. */
560 static void
561 storage_source_destroy (struct case_source *source)
562 {
563   destroy_storage_stream_info (source->aux);
564 }
565
566 /* Storage source. */
567 const struct case_source_class storage_source_class = 
568   {
569     "storage",
570     storage_source_count,
571     storage_source_read,
572     storage_source_destroy,
573   };
574
575 struct casefile *
576 storage_source_get_casefile (struct case_source *source) 
577 {
578   struct storage_stream_info *info = source->aux;
579
580   assert (source->class == &storage_source_class);
581   return info->casefile;
582 }
583 \f
584 /* Null sink.  Used by a few procedures that keep track of output
585    themselves and would throw away anything that the sink
586    contained anyway. */
587
588 const struct case_sink_class null_sink_class = 
589   {
590     "null",
591     NULL,
592     NULL,
593     NULL,
594     NULL,
595   };
596 \f
597 /* Returns a pointer to the lagged case from N_BEFORE cases before the
598    current one, or NULL if there haven't been that many cases yet. */
599 struct ccase *
600 lagged_case (int n_before)
601 {
602   assert (n_before <= n_lag);
603   if (n_before > lag_count)
604     return NULL;
605   
606   {
607     int index = lag_head - n_before;
608     if (index < 0)
609       index += n_lag;
610     return lag_queue[index];
611   }
612 }
613    
614 /* Appends TRNS to t_trns[], the list of all transformations to be
615    performed on data as it is read from the active file. */
616 void
617 add_transformation (struct trns_header * trns)
618 {
619   if (n_trns >= m_trns)
620     {
621       m_trns += 16;
622       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
623     }
624   t_trns[n_trns] = trns;
625   trns->index = n_trns++;
626 }
627
628 /* Cancels all active transformations, including any transformations
629    created by the input program. */
630 void
631 cancel_transformations (void)
632 {
633   int i;
634   for (i = 0; i < n_trns; i++)
635     {
636       if (t_trns[i]->free)
637         t_trns[i]->free (t_trns[i]);
638       free (t_trns[i]);
639     }
640   n_trns = f_trns = 0;
641   if (m_trns > 32)
642     {
643       free (t_trns);
644       m_trns = 0;
645     }
646 }
647 \f
648 /* Creates a case source with class CLASS and auxiliary data AUX
649    and based on dictionary DICT. */
650 struct case_source *
651 create_case_source (const struct case_source_class *class,
652                     const struct dictionary *dict,
653                     void *aux) 
654 {
655   struct case_source *source = xmalloc (sizeof *source);
656   source->class = class;
657   source->value_cnt = dict_get_next_value_idx (dict);
658   source->aux = aux;
659   return source;
660 }
661
662 /* Returns nonzero if a case source is "complex". */
663 int
664 case_source_is_complex (const struct case_source *source) 
665 {
666   return source != NULL && (source->class == &input_program_source_class
667                             || source->class == &file_type_source_class);
668 }
669
670 /* Returns nonzero if CLASS is the class of SOURCE. */
671 int
672 case_source_is_class (const struct case_source *source,
673                       const struct case_source_class *class) 
674 {
675   return source != NULL && source->class == class;
676 }
677
678 /* Creates a case sink with class CLASS and auxiliary data
679    AUX. */
680 struct case_sink *
681 create_case_sink (const struct case_sink_class *class,
682                   const struct dictionary *dict,
683                   void *aux) 
684 {
685   struct case_sink *sink = xmalloc (sizeof *sink);
686   sink->class = class;
687   sink->dict = dict;
688   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
689   sink->value_cnt = dict_get_compacted_value_cnt (dict);
690   sink->aux = aux;
691   return sink;
692 }
693
694 /* Destroys case sink SINK.  It is the caller's responsible to
695    call the sink's destroy function, if any. */
696 void
697 free_case_sink (struct case_sink *sink) 
698 {
699   free (sink->idx_to_fv);
700   free (sink);
701 }
702 \f
703 /* Represents auxiliary data for handling SPLIT FILE. */
704 struct split_aux_data 
705   {
706     size_t case_count;          /* Number of cases so far. */
707     struct ccase *prev_case;    /* Data in previous case. */
708
709     /* Functions to call... */
710     void (*begin_func) (void *);               /* ...before data. */
711     int (*proc_func) (struct ccase *, void *); /* ...with data. */
712     void (*end_func) (void *);                 /* ...after data. */
713     void *func_aux;                            /* Auxiliary data. */ 
714   };
715
716 static int equal_splits (const struct ccase *, const struct ccase *);
717 static int procedure_with_splits_callback (struct ccase *, void *);
718 static void dump_splits (struct ccase *);
719
720 /* Like procedure(), but it automatically breaks the case stream
721    into SPLIT FILE break groups.  Before each group of cases with
722    identical SPLIT FILE variable values, BEGIN_FUNC is called.
723    Then PROC_FUNC is called with each case in the group.  
724    END_FUNC is called when the group is finished.  FUNC_AUX is
725    passed to each of the functions as auxiliary data.
726
727    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
728    and END_FUNC will be called at all. 
729
730    If SPLIT FILE is not in effect, then there is one break group
731    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
732    will be called once. */
733 void
734 procedure_with_splits (void (*begin_func) (void *aux),
735                        int (*proc_func) (struct ccase *, void *aux),
736                        void (*end_func) (void *aux),
737                        void *func_aux) 
738 {
739   struct split_aux_data split_aux;
740
741   split_aux.case_count = 0;
742   split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
743   split_aux.begin_func = begin_func;
744   split_aux.proc_func = proc_func;
745   split_aux.end_func = end_func;
746   split_aux.func_aux = func_aux;
747
748   procedure (procedure_with_splits_callback, &split_aux);
749
750   if (split_aux.case_count > 0 && end_func != NULL)
751     end_func (func_aux);
752   free (split_aux.prev_case);
753 }
754
755 /* procedure() callback used by procedure_with_splits(). */
756 static int
757 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
758 {
759   struct split_aux_data *split_aux = split_aux_;
760
761   /* Start a new series if needed. */
762   if (split_aux->case_count == 0
763       || !equal_splits (c, split_aux->prev_case))
764     {
765       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
766         split_aux->end_func (split_aux->func_aux);
767
768       dump_splits (c);
769       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
770
771       if (split_aux->begin_func != NULL)
772         split_aux->begin_func (split_aux->func_aux);
773     }
774
775   split_aux->case_count++;
776   if (split_aux->proc_func != NULL)
777     return split_aux->proc_func (c, split_aux->func_aux);
778   else
779     return 1;
780 }
781
782 /* Compares the SPLIT FILE variables in cases A and B and returns
783    nonzero only if they differ. */
784 static int
785 equal_splits (const struct ccase *a, const struct ccase *b) 
786 {
787   struct variable *const *split;
788   size_t split_cnt;
789   size_t i;
790     
791   split = dict_get_split_vars (default_dict);
792   split_cnt = dict_get_split_cnt (default_dict);
793   for (i = 0; i < split_cnt; i++)
794     {
795       struct variable *v = split[i];
796       
797       switch (v->type)
798         {
799         case NUMERIC:
800           if (a->data[v->fv].f != b->data[v->fv].f)
801             return 0;
802           break;
803         case ALPHA:
804           if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
805             return 0;
806           break;
807         default:
808           assert (0);
809         }
810     }
811
812   return 1;
813 }
814
815 /* Dumps out the values of all the split variables for the case C. */
816 static void
817 dump_splits (struct ccase *c)
818 {
819   struct variable *const *split;
820   struct tab_table *t;
821   size_t split_cnt;
822   int i;
823
824   split_cnt = dict_get_split_cnt (default_dict);
825   if (split_cnt == 0)
826     return;
827
828   t = tab_create (3, split_cnt + 1, 0);
829   tab_dim (t, tab_natural_dimensions);
830   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
831   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
832   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
833   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
834   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
835   split = dict_get_split_vars (default_dict);
836   for (i = 0; i < split_cnt; i++)
837     {
838       struct variable *v = split[i];
839       char temp_buf[80];
840       const char *val_lab;
841
842       assert (v->type == NUMERIC || v->type == ALPHA);
843       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
844       
845       data_out (temp_buf, &v->print, &c->data[v->fv]);
846       
847       temp_buf[v->print.w] = 0;
848       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
849
850       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
851       if (val_lab)
852         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
853     }
854   tab_flags (t, SOMF_NO_TITLE);
855   tab_submit (t);
856 }
857 \f
858 /* Represents auxiliary data for handling SPLIT FILE in a
859    multipass procedure. */
860 struct multipass_split_aux_data 
861   {
862     struct ccase *prev_case;    /* Data in previous case. */
863     struct casefile *casefile;  /* Accumulates data for a split. */
864
865     /* Function to call with the accumulated data. */
866     void (*split_func) (const struct casefile *, void *);
867     void *func_aux;                            /* Auxiliary data. */ 
868   };
869
870 static int multipass_split_callback (struct ccase *c, void *aux_);
871 static void multipass_split_output (struct multipass_split_aux_data *);
872
873 void
874 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
875                                                      void *),
876                                  void *func_aux) 
877 {
878   struct multipass_split_aux_data aux;
879
880   assert (split_func != NULL);
881
882   open_active_file ();
883
884   aux.prev_case = xmalloc (dict_get_case_size (default_dict));
885   aux.casefile = NULL;
886   aux.split_func = split_func;
887   aux.func_aux = func_aux;
888
889   internal_procedure (multipass_split_callback, &aux);
890   if (aux.casefile != NULL)
891     multipass_split_output (&aux);
892   free (aux.prev_case);
893
894   close_active_file ();
895 }
896
897 /* procedure() callback used by multipass_procedure_with_splits(). */
898 static int
899 multipass_split_callback (struct ccase *c, void *aux_)
900 {
901   struct multipass_split_aux_data *aux = aux_;
902
903   /* Start a new series if needed. */
904   if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
905     {
906       /* Pass any cases to split_func. */
907       if (aux->casefile != NULL)
908         multipass_split_output (aux);
909
910       /* Start a new casefile. */
911       aux->casefile = casefile_create (dict_get_case_size (default_dict));
912
913       /* Record split values. */
914       dump_splits (c);
915       memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
916     }
917
918   casefile_append (aux->casefile, c);
919
920   return 1;
921 }
922
923 static void
924 multipass_split_output (struct multipass_split_aux_data *aux)
925 {
926   assert (aux->casefile != NULL);
927   aux->split_func (aux->casefile, aux->func_aux);
928   casefile_destroy (aux->casefile);
929   aux->casefile = NULL;
930 }