Fix assertion for proper Huffman merge pattern: 0 == 1 modulo 1.
[pspp] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expr.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void compact_case (struct ccase *dest, const struct ccase *src);
99 static void clear_case (struct ccase *c);
100 static void close_active_file (void);
101 \f
102 /* Public functions. */
103
104 /* Reads the data from the input program and writes it to a new
105    active file.  For each case we read from the input program, we
106    do the following
107
108    1. Execute permanent transformations.  If these drop the case,
109       start the next case from step 1.
110
111    2. N OF CASES.  If we have already written N cases, start the
112       next case from step 1.
113    
114    3. Write case to replacement active file.
115    
116    4. Execute temporary transformations.  If these drop the case,
117       start the next case from step 1.
118       
119    5. FILTER, PROCESS IF.  If these drop the case, start the next
120       case from step 1.
121    
122    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
123       cases, start the next case from step 1.
124       
125    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
126 void
127 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
128 {
129   if (proc_func == NULL
130       && case_source_is_class (vfm_source, &storage_source_class)
131       && vfm_sink == NULL
132       && !temporary
133       && n_trns == 0)
134     {
135       /* Nothing to do. */
136       return;
137     }
138
139   open_active_file ();
140   internal_procedure (proc_func, aux);
141   close_active_file ();
142 }
143
144 /* Executes a procedure, as procedure(), except that the caller
145    is responsible for calling open_active_file() and
146    close_active_file(). */
147 static void
148 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
149 {
150   static int recursive_call;
151
152   struct write_case_data wc_data;
153
154   assert (++recursive_call == 1);
155
156   wc_data.proc_func = proc_func;
157   wc_data.aux = aux;
158   create_trns_case (&wc_data.trns_case, default_dict);
159   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
160   wc_data.cases_written = 0;
161
162   last_vfm_invocation = time (NULL);
163
164   if (vfm_source != NULL) 
165     vfm_source->class->read (vfm_source,
166                              &wc_data.trns_case,
167                              write_case, &wc_data);
168
169   case_destroy (&wc_data.sink_case);
170   case_destroy (&wc_data.trns_case);
171
172   assert (--recursive_call == 0);
173 }
174
175 /* Creates and returns a case, initializing it from the vectors
176    that say which `value's need to be initialized just once, and
177    which ones need to be re-initialized before every case. */
178 static void
179 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
180 {
181   size_t var_cnt = dict_get_var_cnt (dict);
182   size_t i;
183
184   case_create (trns_case, dict_get_next_value_idx (dict));
185   for (i = 0; i < var_cnt; i++) 
186     {
187       struct variable *v = dict_get_var (dict, i);
188       union value *value = case_data_rw (trns_case, v->fv);
189
190       if (v->type == NUMERIC)
191         value->f = v->reinit ? 0.0 : SYSMIS;
192       else
193         memset (value->s, ' ', v->width);
194     }
195 }
196
197 /* Makes all preparations for reading from the data source and writing
198    to the data sink. */
199 static void
200 open_active_file (void)
201 {
202   /* Make temp_dict refer to the dictionary right before data
203      reaches the sink */
204   if (!temporary)
205     {
206       temp_trns = n_trns;
207       temp_dict = default_dict;
208     }
209
210   /* Figure out compaction. */
211   compaction_necessary = (dict_get_next_value_idx (temp_dict)
212                           != dict_get_compacted_value_cnt (temp_dict));
213
214   /* Prepare sink. */
215   if (vfm_sink == NULL)
216     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
217   if (vfm_sink->class->open != NULL)
218     vfm_sink->class->open (vfm_sink);
219
220   /* Allocate memory for lag queue. */
221   if (n_lag > 0)
222     {
223       int i;
224   
225       lag_count = 0;
226       lag_head = 0;
227       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
228       for (i = 0; i < n_lag; i++)
229         case_nullify (&lag_queue[i]);
230     }
231
232   /* Close any unclosed DO IF or LOOP constructs. */
233   discard_ctl_stack ();
234 }
235
236 /* Transforms trns_case and writes it to the replacement active
237    file if advisable.  Returns nonzero if more cases can be
238    accepted, zero otherwise.  Do not call this function again
239    after it has returned zero once.  */
240 static int
241 write_case (struct write_case_data *wc_data)
242 {
243   /* Execute permanent transformations.  */
244   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
245                                 wc_data->cases_written + 1))
246     goto done;
247
248   /* N OF CASES. */
249   if (dict_get_case_limit (default_dict)
250       && wc_data->cases_written >= dict_get_case_limit (default_dict))
251     goto done;
252   wc_data->cases_written++;
253
254   /* Write case to LAG queue. */
255   if (n_lag)
256     lag_case (&wc_data->trns_case);
257
258   /* Write case to replacement active file. */
259   if (vfm_sink->class->write != NULL) 
260     {
261       if (compaction_necessary) 
262         {
263           compact_case (&wc_data->sink_case, &wc_data->trns_case);
264           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
265         }
266       else
267         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
268     }
269   
270   /* Execute temporary transformations. */
271   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
272                                 wc_data->cases_written))
273     goto done;
274   
275   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
276   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
277       || (dict_get_case_limit (temp_dict)
278           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
279     goto done;
280   wc_data->cases_analyzed++;
281
282   /* Pass case to procedure. */
283   if (wc_data->proc_func != NULL)
284     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
285
286  done:
287   clear_case (&wc_data->trns_case);
288   return 1;
289 }
290
291 /* Transforms case C using the transformations in TRNS[] with
292    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
293    become case CASE_NUM (1-based) in the output file.  Returns
294    zero if the case was filtered out by one of the
295    transformations, nonzero otherwise. */
296 static int
297 execute_transformations (struct ccase *c,
298                          struct trns_header **trns,
299                          int first_idx, int last_idx,
300                          int case_num) 
301 {
302   int idx;
303
304   for (idx = first_idx; idx != last_idx; )
305     {
306       int retval = trns[idx]->proc (trns[idx], c, case_num);
307       switch (retval)
308         {
309         case -1:
310           idx++;
311           break;
312           
313         case -2:
314           return 0;
315           
316         default:
317           idx = retval;
318           break;
319         }
320     }
321
322   return 1;
323 }
324
325 /* Returns nonzero if case C with case number CASE_NUM should be
326    exclude as specified on FILTER or PROCESS IF, otherwise
327    zero. */
328 static int
329 filter_case (const struct ccase *c, int case_idx)
330 {
331   /* FILTER. */
332   struct variable *filter_var = dict_get_filter (default_dict);
333   if (filter_var != NULL) 
334     {
335       double f = case_num (c, filter_var->fv);
336       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
337         return 1;
338     }
339
340   /* PROCESS IF. */
341   if (process_if_expr != NULL
342       && expr_evaluate (process_if_expr, c, case_idx, NULL) != 1.0)
343     return 1;
344
345   return 0;
346 }
347
348 /* Add C to the lag queue. */
349 static void
350 lag_case (const struct ccase *c)
351 {
352   if (lag_count < n_lag)
353     lag_count++;
354   case_destroy (&lag_queue[lag_head]);
355   case_clone (&lag_queue[lag_head], c);
356   if (++lag_head >= n_lag)
357     lag_head = 0;
358 }
359
360 /* Copies case SRC to case DEST, compacting it in the process. */
361 static void
362 compact_case (struct ccase *dest, const struct ccase *src)
363 {
364   int i;
365   int nval = 0;
366   size_t var_cnt;
367   
368   assert (compaction_necessary);
369
370   /* Copy all the variables except scratch variables from SRC to
371      DEST. */
372   /* FIXME: this should be temp_dict not default_dict I guess. */
373   var_cnt = dict_get_var_cnt (default_dict);
374   for (i = 0; i < var_cnt; i++)
375     {
376       struct variable *v = dict_get_var (default_dict, i);
377       
378       if (dict_class_from_id (v->name) == DC_SCRATCH)
379         continue;
380
381       if (v->type == NUMERIC) 
382         {
383           case_data_rw (dest, nval)->f = case_num (src, v->fv);
384           nval++; 
385         }
386       else
387         {
388           int w = DIV_RND_UP (v->width, sizeof (union value));
389           
390           memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
391                   w * sizeof (union value));
392           nval += w;
393         }
394     }
395 }
396
397 /* Clears the variables in C that need to be cleared between
398    processing cases.  */
399 static void
400 clear_case (struct ccase *c)
401 {
402   size_t var_cnt = dict_get_var_cnt (default_dict);
403   size_t i;
404   
405   for (i = 0; i < var_cnt; i++) 
406     {
407       struct variable *v = dict_get_var (default_dict, i);
408       if (v->init && v->reinit) 
409         {
410           if (v->type == NUMERIC)
411             case_data_rw (c, v->fv)->f = SYSMIS;
412           else
413             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
414         } 
415     }
416 }
417
418 /* Closes the active file. */
419 static void
420 close_active_file (void)
421 {
422   /* Free memory for lag queue, and turn off lagging. */
423   if (n_lag > 0)
424     {
425       int i;
426       
427       for (i = 0; i < n_lag; i++)
428         case_destroy (&lag_queue[i]);
429       free (lag_queue);
430       n_lag = 0;
431     }
432   
433   /* Dictionary from before TEMPORARY becomes permanent.. */
434   if (temporary)
435     {
436       dict_destroy (default_dict);
437       default_dict = temp_dict;
438       temp_dict = NULL;
439     }
440
441   /* Finish compaction. */
442   if (compaction_necessary)
443     dict_compact_values (default_dict);
444     
445   /* Free data source. */
446   if (vfm_source != NULL) 
447     {
448       free_case_source (vfm_source);
449       vfm_source = NULL;
450     }
451
452   /* Old data sink becomes new data source. */
453   if (vfm_sink->class->make_source != NULL)
454     vfm_source = vfm_sink->class->make_source (vfm_sink);
455   free_case_sink (vfm_sink);
456   vfm_sink = NULL;
457
458   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
459      and get rid of all the transformations. */
460   cancel_temporary ();
461   expr_free (process_if_expr);
462   process_if_expr = NULL;
463   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
464     dict_set_filter (default_dict, NULL);
465   dict_set_case_limit (default_dict, 0);
466   dict_clear_vectors (default_dict);
467   cancel_transformations ();
468 }
469 \f
470 /* Storage case stream. */
471
472 /* Information about storage sink or source. */
473 struct storage_stream_info 
474   {
475     struct casefile *casefile;  /* Storage. */
476   };
477
478 /* Initializes a storage sink. */
479 static void
480 storage_sink_open (struct case_sink *sink)
481 {
482   struct storage_stream_info *info;
483
484   sink->aux = info = xmalloc (sizeof *info);
485   info->casefile = casefile_create (sink->value_cnt);
486 }
487
488 /* Destroys storage stream represented by INFO. */
489 static void
490 destroy_storage_stream_info (struct storage_stream_info *info) 
491 {
492   if (info != NULL) 
493     {
494       casefile_destroy (info->casefile);
495       free (info); 
496     }
497 }
498
499 /* Writes case C to the storage sink SINK. */
500 static void
501 storage_sink_write (struct case_sink *sink, const struct ccase *c)
502 {
503   struct storage_stream_info *info = sink->aux;
504
505   casefile_append (info->casefile, c);
506 }
507
508 /* Destroys internal data in SINK. */
509 static void
510 storage_sink_destroy (struct case_sink *sink)
511 {
512   destroy_storage_stream_info (sink->aux);
513 }
514
515 /* Closes the sink and returns a storage source to read back the
516    written data. */
517 static struct case_source *
518 storage_sink_make_source (struct case_sink *sink) 
519 {
520   struct case_source *source
521     = create_case_source (&storage_source_class, sink->dict, sink->aux);
522   sink->aux = NULL;
523   return source;
524 }
525
526 /* Storage sink. */
527 const struct case_sink_class storage_sink_class = 
528   {
529     "storage",
530     storage_sink_open,
531     storage_sink_write,
532     storage_sink_destroy,
533     storage_sink_make_source,
534   };
535 \f
536 /* Storage source. */
537
538 /* Returns the number of cases that will be read by
539    storage_source_read(). */
540 static int
541 storage_source_count (const struct case_source *source) 
542 {
543   struct storage_stream_info *info = source->aux;
544
545   return casefile_get_case_cnt (info->casefile);
546 }
547
548 /* Reads all cases from the storage source and passes them one by one to
549    write_case(). */
550 static void
551 storage_source_read (struct case_source *source,
552                      struct ccase *output_case,
553                      write_case_func *write_case, write_case_data wc_data)
554 {
555   struct storage_stream_info *info = source->aux;
556   struct ccase casefile_case;
557   struct casereader *reader;
558
559   for (reader = casefile_get_reader (info->casefile);
560        casereader_read (reader, &casefile_case);
561        case_destroy (&casefile_case))
562     {
563       case_copy (output_case, 0,
564                  &casefile_case, 0,
565                  casefile_get_value_cnt (info->casefile));
566       write_case (wc_data);
567     }
568   casereader_destroy (reader);
569 }
570
571 /* Destroys the source's internal data. */
572 static void
573 storage_source_destroy (struct case_source *source)
574 {
575   destroy_storage_stream_info (source->aux);
576 }
577
578 /* Storage source. */
579 const struct case_source_class storage_source_class = 
580   {
581     "storage",
582     storage_source_count,
583     storage_source_read,
584     storage_source_destroy,
585   };
586
587 struct casefile *
588 storage_source_get_casefile (struct case_source *source) 
589 {
590   struct storage_stream_info *info = source->aux;
591
592   assert (source->class == &storage_source_class);
593   return info->casefile;
594 }
595
596 struct case_source *
597 storage_source_create (struct casefile *cf, const struct dictionary *dict) 
598 {
599   struct storage_stream_info *info;
600
601   info = xmalloc (sizeof *info);
602   info->casefile = cf;
603
604   return create_case_source (&storage_source_class, dict, info);
605 }
606 \f
607 /* Null sink.  Used by a few procedures that keep track of output
608    themselves and would throw away anything that the sink
609    contained anyway. */
610
611 const struct case_sink_class null_sink_class = 
612   {
613     "null",
614     NULL,
615     NULL,
616     NULL,
617     NULL,
618   };
619 \f
620 /* Returns a pointer to the lagged case from N_BEFORE cases before the
621    current one, or NULL if there haven't been that many cases yet. */
622 struct ccase *
623 lagged_case (int n_before)
624 {
625   assert (n_before <= n_lag);
626   if (n_before <= lag_count)
627     {
628       int index = lag_head - n_before;
629       if (index < 0)
630         index += n_lag;
631       return &lag_queue[index];
632     }
633   else
634     return NULL;
635 }
636    
637 /* Appends TRNS to t_trns[], the list of all transformations to be
638    performed on data as it is read from the active file. */
639 void
640 add_transformation (struct trns_header * trns)
641 {
642   if (n_trns >= m_trns)
643     {
644       m_trns += 16;
645       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
646     }
647   t_trns[n_trns] = trns;
648   trns->index = n_trns++;
649 }
650
651 /* Cancels all active transformations, including any transformations
652    created by the input program. */
653 void
654 cancel_transformations (void)
655 {
656   int i;
657   for (i = 0; i < n_trns; i++)
658     {
659       if (t_trns[i]->free)
660         t_trns[i]->free (t_trns[i]);
661       free (t_trns[i]);
662     }
663   n_trns = f_trns = 0;
664   if (m_trns > 32)
665     {
666       free (t_trns);
667       t_trns=NULL;
668       m_trns = 0;
669     }
670 }
671 \f
672 /* Creates a case source with class CLASS and auxiliary data AUX
673    and based on dictionary DICT. */
674 struct case_source *
675 create_case_source (const struct case_source_class *class,
676                     const struct dictionary *dict,
677                     void *aux) 
678 {
679   struct case_source *source = xmalloc (sizeof *source);
680   source->class = class;
681   source->value_cnt = dict_get_next_value_idx (dict);
682   source->aux = aux;
683   return source;
684 }
685
686 /* Destroys case source SOURCE.  It is the caller's responsible to
687    call the source's destroy function, if any. */
688 void
689 free_case_source (struct case_source *source) 
690 {
691   if (source != NULL) 
692     {
693       if (source->class->destroy != NULL)
694         source->class->destroy (source);
695       free (source);
696     }
697 }
698
699 /* Returns nonzero if a case source is "complex". */
700 int
701 case_source_is_complex (const struct case_source *source) 
702 {
703   return source != NULL && (source->class == &input_program_source_class
704                             || source->class == &file_type_source_class);
705 }
706
707 /* Returns nonzero if CLASS is the class of SOURCE. */
708 int
709 case_source_is_class (const struct case_source *source,
710                       const struct case_source_class *class) 
711 {
712   return source != NULL && source->class == class;
713 }
714
715 /* Creates a case sink with class CLASS and auxiliary data
716    AUX. */
717 struct case_sink *
718 create_case_sink (const struct case_sink_class *class,
719                   const struct dictionary *dict,
720                   void *aux) 
721 {
722   struct case_sink *sink = xmalloc (sizeof *sink);
723   sink->class = class;
724   sink->dict = dict;
725   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
726   sink->value_cnt = dict_get_compacted_value_cnt (dict);
727   sink->aux = aux;
728   return sink;
729 }
730
731 /* Destroys case sink SINK.  */
732 void
733 free_case_sink (struct case_sink *sink) 
734 {
735   if (sink != NULL) 
736     {
737       if (sink->class->destroy != NULL)
738         sink->class->destroy (sink);
739       free (sink->idx_to_fv);
740       free (sink); 
741     }
742 }
743 \f
744 /* Represents auxiliary data for handling SPLIT FILE. */
745 struct split_aux_data 
746   {
747     size_t case_count;          /* Number of cases so far. */
748     struct ccase prev_case;     /* Data in previous case. */
749
750     /* Functions to call... */
751     void (*begin_func) (void *);               /* ...before data. */
752     int (*proc_func) (struct ccase *, void *); /* ...with data. */
753     void (*end_func) (void *);                 /* ...after data. */
754     void *func_aux;                            /* Auxiliary data. */ 
755   };
756
757 static int equal_splits (const struct ccase *, const struct ccase *);
758 static int procedure_with_splits_callback (struct ccase *, void *);
759 static void dump_splits (struct ccase *);
760
761 /* Like procedure(), but it automatically breaks the case stream
762    into SPLIT FILE break groups.  Before each group of cases with
763    identical SPLIT FILE variable values, BEGIN_FUNC is called.
764    Then PROC_FUNC is called with each case in the group.  
765    END_FUNC is called when the group is finished.  FUNC_AUX is
766    passed to each of the functions as auxiliary data.
767
768    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
769    and END_FUNC will be called at all. 
770
771    If SPLIT FILE is not in effect, then there is one break group
772    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
773    will be called once. */
774 void
775 procedure_with_splits (void (*begin_func) (void *aux),
776                        int (*proc_func) (struct ccase *, void *aux),
777                        void (*end_func) (void *aux),
778                        void *func_aux) 
779 {
780   struct split_aux_data split_aux;
781
782   split_aux.case_count = 0;
783   case_nullify (&split_aux.prev_case);
784   split_aux.begin_func = begin_func;
785   split_aux.proc_func = proc_func;
786   split_aux.end_func = end_func;
787   split_aux.func_aux = func_aux;
788
789   procedure (procedure_with_splits_callback, &split_aux);
790
791   if (split_aux.case_count > 0 && end_func != NULL)
792     end_func (func_aux);
793   case_destroy (&split_aux.prev_case);
794 }
795
796 /* procedure() callback used by procedure_with_splits(). */
797 static int
798 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
799 {
800   struct split_aux_data *split_aux = split_aux_;
801
802   /* Start a new series if needed. */
803   if (split_aux->case_count == 0
804       || !equal_splits (c, &split_aux->prev_case))
805     {
806       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
807         split_aux->end_func (split_aux->func_aux);
808
809       dump_splits (c);
810       case_destroy (&split_aux->prev_case);
811       case_clone (&split_aux->prev_case, c);
812
813       if (split_aux->begin_func != NULL)
814         split_aux->begin_func (split_aux->func_aux);
815     }
816
817   split_aux->case_count++;
818   if (split_aux->proc_func != NULL)
819     return split_aux->proc_func (c, split_aux->func_aux);
820   else
821     return 1;
822 }
823
824 /* Compares the SPLIT FILE variables in cases A and B and returns
825    nonzero only if they differ. */
826 static int
827 equal_splits (const struct ccase *a, const struct ccase *b) 
828 {
829   struct variable *const *split;
830   size_t split_cnt;
831   size_t i;
832     
833   split = dict_get_split_vars (default_dict);
834   split_cnt = dict_get_split_cnt (default_dict);
835   for (i = 0; i < split_cnt; i++)
836     {
837       struct variable *v = split[i];
838       
839       switch (v->type)
840         {
841         case NUMERIC:
842           if (case_num (a, v->fv) != case_num (b, v->fv))
843             return 0;
844           break;
845         case ALPHA:
846           if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
847             return 0;
848           break;
849         default:
850           assert (0);
851         }
852     }
853
854   return 1;
855 }
856
857 /* Dumps out the values of all the split variables for the case C. */
858 static void
859 dump_splits (struct ccase *c)
860 {
861   struct variable *const *split;
862   struct tab_table *t;
863   size_t split_cnt;
864   int i;
865
866   split_cnt = dict_get_split_cnt (default_dict);
867   if (split_cnt == 0)
868     return;
869
870   t = tab_create (3, split_cnt + 1, 0);
871   tab_dim (t, tab_natural_dimensions);
872   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
873   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
874   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
875   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
876   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
877   split = dict_get_split_vars (default_dict);
878   for (i = 0; i < split_cnt; i++)
879     {
880       struct variable *v = split[i];
881       char temp_buf[80];
882       const char *val_lab;
883
884       assert (v->type == NUMERIC || v->type == ALPHA);
885       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
886       
887       data_out (temp_buf, &v->print, case_data (c, v->fv));
888       
889       temp_buf[v->print.w] = 0;
890       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
891
892       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
893       if (val_lab)
894         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
895     }
896   tab_flags (t, SOMF_NO_TITLE);
897   tab_submit (t);
898 }
899 \f
900 /* Represents auxiliary data for handling SPLIT FILE in a
901    multipass procedure. */
902 struct multipass_split_aux_data 
903   {
904     struct ccase prev_case;     /* Data in previous case. */
905     struct casefile *casefile;  /* Accumulates data for a split. */
906
907     /* Function to call with the accumulated data. */
908     void (*split_func) (const struct casefile *, void *);
909     void *func_aux;                            /* Auxiliary data. */ 
910   };
911
912 static int multipass_split_callback (struct ccase *c, void *aux_);
913 static void multipass_split_output (struct multipass_split_aux_data *);
914
915 void
916 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
917                                                      void *),
918                                  void *func_aux) 
919 {
920   struct multipass_split_aux_data aux;
921
922   assert (split_func != NULL);
923
924   open_active_file ();
925
926   case_nullify (&aux.prev_case);
927   aux.casefile = NULL;
928   aux.split_func = split_func;
929   aux.func_aux = func_aux;
930
931   internal_procedure (multipass_split_callback, &aux);
932   if (aux.casefile != NULL)
933     multipass_split_output (&aux);
934   case_destroy (&aux.prev_case);
935
936   close_active_file ();
937 }
938
939 /* procedure() callback used by multipass_procedure_with_splits(). */
940 static int
941 multipass_split_callback (struct ccase *c, void *aux_)
942 {
943   struct multipass_split_aux_data *aux = aux_;
944
945   /* Start a new series if needed. */
946   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
947     {
948       /* Pass any cases to split_func. */
949       if (aux->casefile != NULL)
950         multipass_split_output (aux);
951
952       /* Start a new casefile. */
953       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
954
955       /* Record split values. */
956       dump_splits (c);
957       case_destroy (&aux->prev_case);
958       case_clone (&aux->prev_case, c);
959     }
960
961   casefile_append (aux->casefile, c);
962
963   return 1;
964 }
965
966 static void
967 multipass_split_output (struct multipass_split_aux_data *aux)
968 {
969   assert (aux->casefile != NULL);
970   aux->split_func (aux->casefile, aux->func_aux);
971   casefile_destroy (aux->casefile);
972   aux->casefile = NULL;
973 }