0d42ff5c7da143a6da663b957936ef2bb3c932aa
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "casefile.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "settings.h"
38 #include "som.h"
39 #include "str.h"
40 #include "tab.h"
41 #include "var.h"
42 #include "value-labels.h"
43
44 /*
45    Virtual File Manager (vfm):
46
47    vfm is used to process data files.  It uses the model that
48    data is read from one stream (the data source), processed,
49    then written to another (the data sink).  The data source is
50    then deleted and the data sink becomes the data source for the
51    next procedure. */
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     /* Function to call for each case. */
57     int (*proc_func) (struct ccase *, void *); /* Function. */
58     void *aux;                                 /* Auxiliary data. */ 
59
60     struct ccase *trns_case;    /* Case used for transformations. */
61     struct ccase *sink_case;    /* Case written to sink, if
62                                    compaction is necessary. */
63     size_t cases_written;       /* Cases output so far. */
64     size_t cases_analyzed;      /* Cases passed to procedure so far. */
65   };
66
67 /* The current active file, from which cases are read. */
68 struct case_source *vfm_source;
69
70 /* The replacement active file, to which cases are written. */
71 struct case_sink *vfm_sink;
72
73 /* Nonzero if the case needs to have values deleted before being
74    stored, zero otherwise. */
75 static int compaction_necessary;
76
77 /* Time at which vfm was last invoked. */
78 time_t last_vfm_invocation;
79
80 /* Lag queue. */
81 int n_lag;                      /* Number of cases to lag. */
82 static int lag_count;           /* Number of cases in lag_queue so far. */
83 static int lag_head;            /* Index where next case will be added. */
84 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
85
86 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
87                                 void *aux);
88 static struct ccase *create_trns_case (struct dictionary *);
89 static void open_active_file (void);
90 static int write_case (struct write_case_data *wc_data);
91 static int execute_transformations (struct ccase *c,
92                                     struct trns_header **trns,
93                                     int first_idx, int last_idx,
94                                     int case_num);
95 static int filter_case (const struct ccase *c, int case_num);
96 static void lag_case (const struct ccase *c);
97 static void compact_case (struct ccase *dest, const struct ccase *src);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Reads the data from the input program and writes it to a new
104    active file.  For each case we read from the input program, we
105    do the following
106
107    1. Execute permanent transformations.  If these drop the case,
108       start the next case from step 1.
109
110    2. N OF CASES.  If we have already written N cases, start the
111       next case from step 1.
112    
113    3. Write case to replacement active file.
114    
115    4. Execute temporary transformations.  If these drop the case,
116       start the next case from step 1.
117       
118    5. FILTER, PROCESS IF.  If these drop the case, start the next
119       case from step 1.
120    
121    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
122       cases, start the next case from step 1.
123       
124    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
125 void
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
127 {
128   open_active_file ();
129   internal_procedure (proc_func, aux);
130   close_active_file ();
131 }
132
133 /* Executes a procedure, as procedure(), except that the caller
134    is responsible for calling open_active_file() and
135    close_active_file(). */
136 static void
137 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
138 {
139   static int recursive_call;
140
141   struct write_case_data wc_data;
142
143   assert (++recursive_call == 1);
144
145   wc_data.proc_func = proc_func;
146   wc_data.aux = aux;
147   wc_data.trns_case = create_trns_case (default_dict);
148   wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
149   wc_data.cases_written = 0;
150
151   last_vfm_invocation = time (NULL);
152
153   if (vfm_source != NULL) 
154     vfm_source->class->read (vfm_source,
155                              wc_data.trns_case,
156                              write_case, &wc_data);
157
158   free (wc_data.sink_case);
159   free (wc_data.trns_case);
160
161   assert (--recursive_call == 0);
162 }
163
164 /* Creates and returns a case, initializing it from the vectors
165    that say which `value's need to be initialized just once, and
166    which ones need to be re-initialized before every case. */
167 static struct ccase *
168 create_trns_case (struct dictionary *dict)
169 {
170   struct ccase *c = xmalloc (dict_get_case_size (dict));
171   size_t var_cnt = dict_get_var_cnt (dict);
172   size_t i;
173
174   for (i = 0; i < var_cnt; i++) 
175     {
176       struct variable *v = dict_get_var (dict, i);
177
178       if (v->type == NUMERIC) 
179         {
180           if (v->reinit)
181             c->data[v->fv].f = 0.0;
182           else
183             c->data[v->fv].f = SYSMIS;
184         }
185       else
186         memset (c->data[v->fv].s, ' ', v->width);
187     }
188   return c;
189 }
190
191 /* Makes all preparations for reading from the data source and writing
192    to the data sink. */
193 static void
194 open_active_file (void)
195 {
196   /* Make temp_dict refer to the dictionary right before data
197      reaches the sink */
198   if (!temporary)
199     {
200       temp_trns = n_trns;
201       temp_dict = default_dict;
202     }
203
204   /* Figure out compaction. */
205   compaction_necessary = (dict_get_next_value_idx (temp_dict)
206                           != dict_get_compacted_value_cnt (temp_dict));
207
208   /* Prepare sink. */
209   if (vfm_sink == NULL)
210     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
211   if (vfm_sink->class->open != NULL)
212     vfm_sink->class->open (vfm_sink);
213
214   /* Allocate memory for lag queue. */
215   if (n_lag > 0)
216     {
217       int i;
218   
219       lag_count = 0;
220       lag_head = 0;
221       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
222       for (i = 0; i < n_lag; i++)
223         lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
224     }
225
226   /* Close any unclosed DO IF or LOOP constructs. */
227   discard_ctl_stack ();
228 }
229
230 /* Transforms trns_case and writes it to the replacement active
231    file if advisable.  Returns nonzero if more cases can be
232    accepted, zero otherwise.  Do not call this function again
233    after it has returned zero once.  */
234 static int
235 write_case (struct write_case_data *wc_data)
236 {
237   /* Execute permanent transformations.  */
238   if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
239                                 wc_data->cases_written + 1))
240     goto done;
241
242   /* N OF CASES. */
243   if (dict_get_case_limit (default_dict)
244       && wc_data->cases_written >= dict_get_case_limit (default_dict))
245     goto done;
246   wc_data->cases_written++;
247
248   /* Write case to LAG queue. */
249   if (n_lag)
250     lag_case (wc_data->trns_case);
251
252   /* Write case to replacement active file. */
253   if (vfm_sink->class->write != NULL) 
254     {
255       if (compaction_necessary) 
256         {
257           compact_case (wc_data->sink_case, wc_data->trns_case);
258           vfm_sink->class->write (vfm_sink, wc_data->sink_case);
259         }
260       else
261         vfm_sink->class->write (vfm_sink, wc_data->trns_case);
262     }
263   
264   /* Execute temporary transformations. */
265   if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
266                                 wc_data->cases_written))
267     goto done;
268   
269   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
270   if (filter_case (wc_data->trns_case, wc_data->cases_written)
271       || (dict_get_case_limit (temp_dict)
272           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
273     goto done;
274   wc_data->cases_analyzed++;
275
276   /* Pass case to procedure. */
277   if (wc_data->proc_func != NULL)
278     wc_data->proc_func (wc_data->trns_case, wc_data->aux);
279
280  done:
281   clear_case (wc_data->trns_case);
282   return 1;
283 }
284
285 /* Transforms case C using the transformations in TRNS[] with
286    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
287    become case CASE_NUM (1-based) in the output file.  Returns
288    zero if the case was filtered out by one of the
289    transformations, nonzero otherwise. */
290 static int
291 execute_transformations (struct ccase *c,
292                          struct trns_header **trns,
293                          int first_idx, int last_idx,
294                          int case_num) 
295 {
296   int idx;
297
298   for (idx = first_idx; idx != last_idx; )
299     {
300       int retval = trns[idx]->proc (trns[idx], c, case_num);
301       switch (retval)
302         {
303         case -1:
304           idx++;
305           break;
306           
307         case -2:
308           return 0;
309           
310         default:
311           idx = retval;
312           break;
313         }
314     }
315
316   return 1;
317 }
318
319 /* Returns nonzero if case C with case number CASE_NUM should be
320    exclude as specified on FILTER or PROCESS IF, otherwise
321    zero. */
322 static int
323 filter_case (const struct ccase *c, int case_num)
324 {
325   /* FILTER. */
326   struct variable *filter_var = dict_get_filter (default_dict);
327   if (filter_var != NULL) 
328     {
329       double f = c->data[filter_var->fv].f;
330       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
331         return 1;
332     }
333
334   /* PROCESS IF. */
335   if (process_if_expr != NULL
336       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
337     return 1;
338
339   return 0;
340 }
341
342 /* Add C to the lag queue. */
343 static void
344 lag_case (const struct ccase *c)
345 {
346   if (lag_count < n_lag)
347     lag_count++;
348   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
349   if (++lag_head >= n_lag)
350     lag_head = 0;
351 }
352
353 /* Copies case SRC to case DEST, compacting it in the process. */
354 static void
355 compact_case (struct ccase *dest, const struct ccase *src)
356 {
357   int i;
358   int nval = 0;
359   size_t var_cnt;
360   
361   assert (compaction_necessary);
362
363   /* Copy all the variables except scratch variables from SRC to
364      DEST. */
365   /* FIXME: this should be temp_dict not default_dict I guess. */
366   var_cnt = dict_get_var_cnt (default_dict);
367   for (i = 0; i < var_cnt; i++)
368     {
369       struct variable *v = dict_get_var (default_dict, i);
370       
371       if (dict_class_from_id (v->name) == DC_SCRATCH)
372         continue;
373
374       if (v->type == NUMERIC)
375         dest->data[nval++] = src->data[v->fv];
376       else
377         {
378           int w = DIV_RND_UP (v->width, sizeof (union value));
379           
380           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
381           nval += w;
382         }
383     }
384 }
385
386 /* Clears the variables in C that need to be cleared between
387    processing cases.  */
388 static void
389 clear_case (struct ccase *c)
390 {
391   size_t var_cnt = dict_get_var_cnt (default_dict);
392   size_t i;
393   
394   for (i = 0; i < var_cnt; i++) 
395     {
396       struct variable *v = dict_get_var (default_dict, i);
397       if (v->init && v->reinit) 
398         {
399           if (v->type == NUMERIC) 
400             c->data[v->fv].f = SYSMIS;
401           else
402             memset (c->data[v->fv].s, ' ', v->width);
403         } 
404     }
405 }
406
407 /* Closes the active file. */
408 static void
409 close_active_file (void)
410 {
411   /* Free memory for lag queue, and turn off lagging. */
412   if (n_lag > 0)
413     {
414       int i;
415       
416       for (i = 0; i < n_lag; i++)
417         free (lag_queue[i]);
418       free (lag_queue);
419       n_lag = 0;
420     }
421   
422   /* Dictionary from before TEMPORARY becomes permanent.. */
423   if (temporary)
424     {
425       dict_destroy (default_dict);
426       default_dict = temp_dict;
427       temp_dict = NULL;
428     }
429
430   /* Finish compaction. */
431   if (compaction_necessary)
432     dict_compact_values (default_dict);
433     
434   /* Free data source. */
435   if (vfm_source != NULL) 
436     {
437       if (vfm_source->class->destroy != NULL)
438         vfm_source->class->destroy (vfm_source);
439       free (vfm_source);
440     }
441
442   /* Old data sink becomes new data source. */
443   if (vfm_sink->class->make_source != NULL)
444     vfm_source = vfm_sink->class->make_source (vfm_sink);
445   else 
446     {
447       if (vfm_sink->class->destroy != NULL)
448         vfm_sink->class->destroy (vfm_sink);
449       vfm_source = NULL; 
450     }
451   free_case_sink (vfm_sink);
452   vfm_sink = NULL;
453
454   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
455      and get rid of all the transformations. */
456   cancel_temporary ();
457   expr_free (process_if_expr);
458   process_if_expr = NULL;
459   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
460     dict_set_filter (default_dict, NULL);
461   dict_set_case_limit (default_dict, 0);
462   dict_clear_vectors (default_dict);
463   cancel_transformations ();
464 }
465 \f
466 /* Storage case stream. */
467
468 /* Information about storage sink or source. */
469 struct storage_stream_info 
470   {
471     struct casefile *casefile;  /* Storage. */
472   };
473
474 /* Initializes a storage sink. */
475 static void
476 storage_sink_open (struct case_sink *sink)
477 {
478   struct storage_stream_info *info;
479
480   sink->aux = info = xmalloc (sizeof *info);
481   info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
482 }
483
484 /* Destroys storage stream represented by INFO. */
485 static void
486 destroy_storage_stream_info (struct storage_stream_info *info) 
487 {
488   casefile_destroy (info->casefile);
489   free (info); 
490 }
491
492 /* Writes case C to the storage sink SINK. */
493 static void
494 storage_sink_write (struct case_sink *sink, const struct ccase *c)
495 {
496   struct storage_stream_info *info = sink->aux;
497
498   casefile_append (info->casefile, c);
499 }
500
501 /* Destroys internal data in SINK. */
502 static void
503 storage_sink_destroy (struct case_sink *sink)
504 {
505   destroy_storage_stream_info (sink->aux);
506 }
507
508 /* Closes and destroys the sink and returns a storage source to
509    read back the written data. */
510 static struct case_source *
511 storage_sink_make_source (struct case_sink *sink) 
512 {
513   return create_case_source (&storage_source_class, sink->dict, sink->aux);
514 }
515
516 /* Storage sink. */
517 const struct case_sink_class storage_sink_class = 
518   {
519     "storage",
520     storage_sink_open,
521     storage_sink_write,
522     storage_sink_destroy,
523     storage_sink_make_source,
524   };
525 \f
526 /* Storage source. */
527
528 /* Returns the number of cases that will be read by
529    storage_source_read(). */
530 static int
531 storage_source_count (const struct case_source *source) 
532 {
533   struct storage_stream_info *info = source->aux;
534
535   return casefile_get_case_cnt (info->casefile);
536 }
537
538 /* Reads all cases from the storage source and passes them one by one to
539    write_case(). */
540 static void
541 storage_source_read (struct case_source *source,
542                      struct ccase *output_case,
543                      write_case_func *write_case, write_case_data wc_data)
544 {
545   struct storage_stream_info *info = source->aux;
546   const struct ccase *casefile_case;
547   struct casereader *reader;
548
549   reader = casefile_get_reader (info->casefile);
550   while (casereader_read (reader, &casefile_case))
551     {
552       memcpy (output_case, casefile_case,
553               casefile_get_case_size (info->casefile));
554       write_case (wc_data);
555     }
556   casereader_destroy (reader);
557 }
558
559 /* Destroys the source's internal data. */
560 static void
561 storage_source_destroy (struct case_source *source)
562 {
563   destroy_storage_stream_info (source->aux);
564 }
565
566 /* Storage source. */
567 const struct case_source_class storage_source_class = 
568   {
569     "storage",
570     storage_source_count,
571     storage_source_read,
572     storage_source_destroy,
573   };
574
575 struct casefile *
576 storage_source_get_casefile (struct case_source *source) 
577 {
578   struct storage_stream_info *info = source->aux;
579
580   assert (source->class == &storage_source_class);
581   return info->casefile;
582 }
583 \f
584 /* Null sink.  Used by a few procedures that keep track of output
585    themselves and would throw away anything that the sink
586    contained anyway. */
587
588 const struct case_sink_class null_sink_class = 
589   {
590     "null",
591     NULL,
592     NULL,
593     NULL,
594     NULL,
595   };
596 \f
597 /* Returns a pointer to the lagged case from N_BEFORE cases before the
598    current one, or NULL if there haven't been that many cases yet. */
599 struct ccase *
600 lagged_case (int n_before)
601 {
602   assert (n_before <= n_lag);
603   if (n_before > lag_count)
604     return NULL;
605   
606   {
607     int index = lag_head - n_before;
608     if (index < 0)
609       index += n_lag;
610     return lag_queue[index];
611   }
612 }
613    
614 /* Appends TRNS to t_trns[], the list of all transformations to be
615    performed on data as it is read from the active file. */
616 void
617 add_transformation (struct trns_header * trns)
618 {
619   if (n_trns >= m_trns)
620     {
621       m_trns += 16;
622       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
623     }
624   t_trns[n_trns] = trns;
625   trns->index = n_trns++;
626 }
627
628 /* Cancels all active transformations, including any transformations
629    created by the input program. */
630 void
631 cancel_transformations (void)
632 {
633   int i;
634   for (i = 0; i < n_trns; i++)
635     {
636       if (t_trns[i]->free)
637         t_trns[i]->free (t_trns[i]);
638       free (t_trns[i]);
639     }
640   n_trns = f_trns = 0;
641   if (m_trns > 32)
642     {
643       free (t_trns);
644       t_trns=NULL;
645       m_trns = 0;
646     }
647 }
648 \f
649 /* Creates a case source with class CLASS and auxiliary data AUX
650    and based on dictionary DICT. */
651 struct case_source *
652 create_case_source (const struct case_source_class *class,
653                     const struct dictionary *dict,
654                     void *aux) 
655 {
656   struct case_source *source = xmalloc (sizeof *source);
657   source->class = class;
658   source->value_cnt = dict_get_next_value_idx (dict);
659   source->aux = aux;
660   return source;
661 }
662
663 /* Returns nonzero if a case source is "complex". */
664 int
665 case_source_is_complex (const struct case_source *source) 
666 {
667   return source != NULL && (source->class == &input_program_source_class
668                             || source->class == &file_type_source_class);
669 }
670
671 /* Returns nonzero if CLASS is the class of SOURCE. */
672 int
673 case_source_is_class (const struct case_source *source,
674                       const struct case_source_class *class) 
675 {
676   return source != NULL && source->class == class;
677 }
678
679 /* Creates a case sink with class CLASS and auxiliary data
680    AUX. */
681 struct case_sink *
682 create_case_sink (const struct case_sink_class *class,
683                   const struct dictionary *dict,
684                   void *aux) 
685 {
686   struct case_sink *sink = xmalloc (sizeof *sink);
687   sink->class = class;
688   sink->dict = dict;
689   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
690   sink->value_cnt = dict_get_compacted_value_cnt (dict);
691   sink->aux = aux;
692   return sink;
693 }
694
695 /* Destroys case sink SINK.  It is the caller's responsible to
696    call the sink's destroy function, if any. */
697 void
698 free_case_sink (struct case_sink *sink) 
699 {
700   free (sink->idx_to_fv);
701   free (sink);
702 }
703 \f
704 /* Represents auxiliary data for handling SPLIT FILE. */
705 struct split_aux_data 
706   {
707     size_t case_count;          /* Number of cases so far. */
708     struct ccase *prev_case;    /* Data in previous case. */
709
710     /* Functions to call... */
711     void (*begin_func) (void *);               /* ...before data. */
712     int (*proc_func) (struct ccase *, void *); /* ...with data. */
713     void (*end_func) (void *);                 /* ...after data. */
714     void *func_aux;                            /* Auxiliary data. */ 
715   };
716
717 static int equal_splits (const struct ccase *, const struct ccase *);
718 static int procedure_with_splits_callback (struct ccase *, void *);
719 static void dump_splits (struct ccase *);
720
721 /* Like procedure(), but it automatically breaks the case stream
722    into SPLIT FILE break groups.  Before each group of cases with
723    identical SPLIT FILE variable values, BEGIN_FUNC is called.
724    Then PROC_FUNC is called with each case in the group.  
725    END_FUNC is called when the group is finished.  FUNC_AUX is
726    passed to each of the functions as auxiliary data.
727
728    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
729    and END_FUNC will be called at all. 
730
731    If SPLIT FILE is not in effect, then there is one break group
732    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
733    will be called once. */
734 void
735 procedure_with_splits (void (*begin_func) (void *aux),
736                        int (*proc_func) (struct ccase *, void *aux),
737                        void (*end_func) (void *aux),
738                        void *func_aux) 
739 {
740   struct split_aux_data split_aux;
741
742   split_aux.case_count = 0;
743   split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
744   split_aux.begin_func = begin_func;
745   split_aux.proc_func = proc_func;
746   split_aux.end_func = end_func;
747   split_aux.func_aux = func_aux;
748
749   procedure (procedure_with_splits_callback, &split_aux);
750
751   if (split_aux.case_count > 0 && end_func != NULL)
752     end_func (func_aux);
753   free (split_aux.prev_case);
754 }
755
756 /* procedure() callback used by procedure_with_splits(). */
757 static int
758 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
759 {
760   struct split_aux_data *split_aux = split_aux_;
761
762   /* Start a new series if needed. */
763   if (split_aux->case_count == 0
764       || !equal_splits (c, split_aux->prev_case))
765     {
766       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
767         split_aux->end_func (split_aux->func_aux);
768
769       dump_splits (c);
770       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
771
772       if (split_aux->begin_func != NULL)
773         split_aux->begin_func (split_aux->func_aux);
774     }
775
776   split_aux->case_count++;
777   if (split_aux->proc_func != NULL)
778     return split_aux->proc_func (c, split_aux->func_aux);
779   else
780     return 1;
781 }
782
783 /* Compares the SPLIT FILE variables in cases A and B and returns
784    nonzero only if they differ. */
785 static int
786 equal_splits (const struct ccase *a, const struct ccase *b) 
787 {
788   struct variable *const *split;
789   size_t split_cnt;
790   size_t i;
791     
792   split = dict_get_split_vars (default_dict);
793   split_cnt = dict_get_split_cnt (default_dict);
794   for (i = 0; i < split_cnt; i++)
795     {
796       struct variable *v = split[i];
797       
798       switch (v->type)
799         {
800         case NUMERIC:
801           if (a->data[v->fv].f != b->data[v->fv].f)
802             return 0;
803           break;
804         case ALPHA:
805           if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
806             return 0;
807           break;
808         default:
809           assert (0);
810         }
811     }
812
813   return 1;
814 }
815
816 /* Dumps out the values of all the split variables for the case C. */
817 static void
818 dump_splits (struct ccase *c)
819 {
820   struct variable *const *split;
821   struct tab_table *t;
822   size_t split_cnt;
823   int i;
824
825   split_cnt = dict_get_split_cnt (default_dict);
826   if (split_cnt == 0)
827     return;
828
829   t = tab_create (3, split_cnt + 1, 0);
830   tab_dim (t, tab_natural_dimensions);
831   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
832   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
833   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
834   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
835   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
836   split = dict_get_split_vars (default_dict);
837   for (i = 0; i < split_cnt; i++)
838     {
839       struct variable *v = split[i];
840       char temp_buf[80];
841       const char *val_lab;
842
843       assert (v->type == NUMERIC || v->type == ALPHA);
844       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
845       
846       data_out (temp_buf, &v->print, &c->data[v->fv]);
847       
848       temp_buf[v->print.w] = 0;
849       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
850
851       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
852       if (val_lab)
853         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
854     }
855   tab_flags (t, SOMF_NO_TITLE);
856   tab_submit (t);
857 }
858 \f
859 /* Represents auxiliary data for handling SPLIT FILE in a
860    multipass procedure. */
861 struct multipass_split_aux_data 
862   {
863     struct ccase *prev_case;    /* Data in previous case. */
864     struct casefile *casefile;  /* Accumulates data for a split. */
865
866     /* Function to call with the accumulated data. */
867     void (*split_func) (const struct casefile *, void *);
868     void *func_aux;                            /* Auxiliary data. */ 
869   };
870
871 static int multipass_split_callback (struct ccase *c, void *aux_);
872 static void multipass_split_output (struct multipass_split_aux_data *);
873
874 void
875 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
876                                                      void *),
877                                  void *func_aux) 
878 {
879   struct multipass_split_aux_data aux;
880
881   assert (split_func != NULL);
882
883   open_active_file ();
884
885   aux.prev_case = xmalloc (dict_get_case_size (default_dict));
886   aux.casefile = NULL;
887   aux.split_func = split_func;
888   aux.func_aux = func_aux;
889
890   internal_procedure (multipass_split_callback, &aux);
891   if (aux.casefile != NULL)
892     multipass_split_output (&aux);
893   free (aux.prev_case);
894
895   close_active_file ();
896 }
897
898 /* procedure() callback used by multipass_procedure_with_splits(). */
899 static int
900 multipass_split_callback (struct ccase *c, void *aux_)
901 {
902   struct multipass_split_aux_data *aux = aux_;
903
904   /* Start a new series if needed. */
905   if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
906     {
907       /* Pass any cases to split_func. */
908       if (aux->casefile != NULL)
909         multipass_split_output (aux);
910
911       /* Start a new casefile. */
912       aux->casefile = casefile_create (dict_get_case_size (default_dict));
913
914       /* Record split values. */
915       dump_splits (c);
916       memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
917     }
918
919   casefile_append (aux->casefile, c);
920
921   return 1;
922 }
923
924 static void
925 multipass_split_output (struct multipass_split_aux_data *aux)
926 {
927   assert (aux->casefile != NULL);
928   aux->split_func (aux->casefile, aux->func_aux);
929   casefile_destroy (aux->casefile);
930   aux->casefile = NULL;
931 }