Rewrite expression code.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expressions/public.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "som.h"
40 #include "str.h"
41 #include "tab.h"
42 #include "var.h"
43 #include "value-labels.h"
44
45 /*
46    Virtual File Manager (vfm):
47
48    vfm is used to process data files.  It uses the model that
49    data is read from one stream (the data source), processed,
50    then written to another (the data sink).  The data source is
51    then deleted and the data sink becomes the data source for the
52    next procedure. */
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     /* Function to call for each case. */
58     int (*proc_func) (struct ccase *, void *); /* Function. */
59     void *aux;                                 /* Auxiliary data. */ 
60
61     struct ccase trns_case;     /* Case used for transformations. */
62     struct ccase sink_case;     /* Case written to sink, if
63                                    compaction is necessary. */
64     size_t cases_written;       /* Cases output so far. */
65     size_t cases_analyzed;      /* Cases passed to procedure so far. */
66   };
67
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
70
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 static int compaction_necessary;
77
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
80
81 /* Lag queue. */
82 int n_lag;                      /* Number of cases to lag. */
83 static int lag_count;           /* Number of cases in lag_queue so far. */
84 static int lag_head;            /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88                                 void *aux);
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93                                     struct trns_header **trns,
94                                     int first_idx, int last_idx,
95                                     int case_num);
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void compact_case (struct ccase *dest, const struct ccase *src);
99 static void clear_case (struct ccase *c);
100 static void close_active_file (void);
101 \f
102 /* Public functions. */
103
104 /* Reads the data from the input program and writes it to a new
105    active file.  For each case we read from the input program, we
106    do the following
107
108    1. Execute permanent transformations.  If these drop the case,
109       start the next case from step 1.
110
111    2. N OF CASES.  If we have already written N cases, start the
112       next case from step 1.
113    
114    3. Write case to replacement active file.
115    
116    4. Execute temporary transformations.  If these drop the case,
117       start the next case from step 1.
118       
119    5. FILTER, PROCESS IF.  If these drop the case, start the next
120       case from step 1.
121    
122    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
123       cases, start the next case from step 1.
124       
125    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
126 void
127 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
128 {
129   if (proc_func == NULL
130       && case_source_is_class (vfm_source, &storage_source_class)
131       && vfm_sink == NULL
132       && !temporary
133       && n_trns == 0)
134     {
135       /* Nothing to do. */
136       return;
137     }
138
139   open_active_file ();
140   internal_procedure (proc_func, aux);
141   close_active_file ();
142 }
143
144 /* Executes a procedure, as procedure(), except that the caller
145    is responsible for calling open_active_file() and
146    close_active_file(). */
147 static void
148 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
149 {
150   static int recursive_call;
151
152   struct write_case_data wc_data;
153
154   assert (++recursive_call == 1);
155
156   wc_data.proc_func = proc_func;
157   wc_data.aux = aux;
158   create_trns_case (&wc_data.trns_case, default_dict);
159   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
160   wc_data.cases_written = 0;
161
162   last_vfm_invocation = time (NULL);
163
164   if (vfm_source != NULL) 
165     vfm_source->class->read (vfm_source,
166                              &wc_data.trns_case,
167                              write_case, &wc_data);
168
169   case_destroy (&wc_data.sink_case);
170   case_destroy (&wc_data.trns_case);
171
172   assert (--recursive_call == 0);
173 }
174
175 /* Creates and returns a case, initializing it from the vectors
176    that say which `value's need to be initialized just once, and
177    which ones need to be re-initialized before every case. */
178 static void
179 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
180 {
181   size_t var_cnt = dict_get_var_cnt (dict);
182   size_t i;
183
184   case_create (trns_case, dict_get_next_value_idx (dict));
185   for (i = 0; i < var_cnt; i++) 
186     {
187       struct variable *v = dict_get_var (dict, i);
188       union value *value = case_data_rw (trns_case, v->fv);
189
190       if (v->type == NUMERIC)
191         value->f = v->reinit ? 0.0 : SYSMIS;
192       else
193         memset (value->s, ' ', v->width);
194     }
195 }
196
197 /* Makes all preparations for reading from the data source and writing
198    to the data sink. */
199 static void
200 open_active_file (void)
201 {
202   /* Make temp_dict refer to the dictionary right before data
203      reaches the sink */
204   if (!temporary)
205     {
206       temp_trns = n_trns;
207       temp_dict = default_dict;
208     }
209
210   /* Figure out compaction. */
211   compaction_necessary = (dict_get_next_value_idx (temp_dict)
212                           != dict_get_compacted_value_cnt (temp_dict));
213
214   /* Prepare sink. */
215   if (vfm_sink == NULL)
216     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
217   if (vfm_sink->class->open != NULL)
218     vfm_sink->class->open (vfm_sink);
219
220   /* Allocate memory for lag queue. */
221   if (n_lag > 0)
222     {
223       int i;
224   
225       lag_count = 0;
226       lag_head = 0;
227       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
228       for (i = 0; i < n_lag; i++)
229         case_nullify (&lag_queue[i]);
230     }
231
232   /* Close any unclosed DO IF or LOOP constructs. */
233   discard_ctl_stack ();
234 }
235
236 /* Transforms trns_case and writes it to the replacement active
237    file if advisable.  Returns nonzero if more cases can be
238    accepted, zero otherwise.  Do not call this function again
239    after it has returned zero once.  */
240 static int
241 write_case (struct write_case_data *wc_data)
242 {
243   /* Execute permanent transformations.  */
244   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
245                                 wc_data->cases_written + 1))
246     goto done;
247
248   /* N OF CASES. */
249   if (dict_get_case_limit (default_dict)
250       && wc_data->cases_written >= dict_get_case_limit (default_dict))
251     goto done;
252   wc_data->cases_written++;
253
254   /* Write case to LAG queue. */
255   if (n_lag)
256     lag_case (&wc_data->trns_case);
257
258   /* Write case to replacement active file. */
259   if (vfm_sink->class->write != NULL) 
260     {
261       if (compaction_necessary) 
262         {
263           compact_case (&wc_data->sink_case, &wc_data->trns_case);
264           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
265         }
266       else
267         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
268     }
269   
270   /* Execute temporary transformations. */
271   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
272                                 wc_data->cases_written))
273     goto done;
274   
275   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
276   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
277       || (dict_get_case_limit (temp_dict)
278           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
279     goto done;
280   wc_data->cases_analyzed++;
281
282   /* Pass case to procedure. */
283   if (wc_data->proc_func != NULL)
284     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
285
286  done:
287   clear_case (&wc_data->trns_case);
288   return 1;
289 }
290
291 /* Transforms case C using the transformations in TRNS[] with
292    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
293    become case CASE_NUM (1-based) in the output file.  Returns
294    zero if the case was filtered out by one of the
295    transformations, nonzero otherwise. */
296 static int
297 execute_transformations (struct ccase *c,
298                          struct trns_header **trns,
299                          int first_idx, int last_idx,
300                          int case_num) 
301 {
302   int idx;
303
304   for (idx = first_idx; idx != last_idx; )
305     {
306       int retval = trns[idx]->proc (trns[idx], c, case_num);
307       switch (retval)
308         {
309         case -1:
310           idx++;
311           break;
312           
313         case -2:
314           return 0;
315           
316         default:
317           idx = retval;
318           break;
319         }
320     }
321
322   return 1;
323 }
324
325 /* Returns nonzero if case C with case number CASE_NUM should be
326    exclude as specified on FILTER or PROCESS IF, otherwise
327    zero. */
328 static int
329 filter_case (const struct ccase *c, int case_idx)
330 {
331   /* FILTER. */
332   struct variable *filter_var = dict_get_filter (default_dict);
333   if (filter_var != NULL) 
334     {
335       double f = case_num (c, filter_var->fv);
336       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
337         return 1;
338     }
339
340   /* PROCESS IF. */
341   if (process_if_expr != NULL
342       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
343     return 1;
344
345   return 0;
346 }
347
348 /* Add C to the lag queue. */
349 static void
350 lag_case (const struct ccase *c)
351 {
352   if (lag_count < n_lag)
353     lag_count++;
354   case_destroy (&lag_queue[lag_head]);
355   case_clone (&lag_queue[lag_head], c);
356   if (++lag_head >= n_lag)
357     lag_head = 0;
358 }
359
360 /* Copies case SRC to case DEST, compacting it in the process. */
361 static void
362 compact_case (struct ccase *dest, const struct ccase *src)
363 {
364   int i;
365   int nval = 0;
366   size_t var_cnt;
367   
368   assert (compaction_necessary);
369
370   /* Copy all the variables except scratch variables from SRC to
371      DEST. */
372   /* FIXME: this should be temp_dict not default_dict I guess. */
373   var_cnt = dict_get_var_cnt (default_dict);
374   for (i = 0; i < var_cnt; i++)
375     {
376       struct variable *v = dict_get_var (default_dict, i);
377       
378       if (dict_class_from_id (v->name) == DC_SCRATCH)
379         continue;
380
381       if (v->type == NUMERIC) 
382         {
383           case_data_rw (dest, nval)->f = case_num (src, v->fv);
384           nval++; 
385         }
386       else
387         {
388           int w = DIV_RND_UP (v->width, sizeof (union value));
389           
390           memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
391                   w * sizeof (union value));
392           nval += w;
393         }
394     }
395 }
396
397 /* Clears the variables in C that need to be cleared between
398    processing cases.  */
399 static void
400 clear_case (struct ccase *c)
401 {
402   size_t var_cnt = dict_get_var_cnt (default_dict);
403   size_t i;
404   
405   for (i = 0; i < var_cnt; i++) 
406     {
407       struct variable *v = dict_get_var (default_dict, i);
408       if (v->init && v->reinit) 
409         {
410           if (v->type == NUMERIC)
411             case_data_rw (c, v->fv)->f = SYSMIS;
412           else
413             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
414         } 
415     }
416 }
417
418 /* Closes the active file. */
419 static void
420 close_active_file (void)
421 {
422   /* Free memory for lag queue, and turn off lagging. */
423   if (n_lag > 0)
424     {
425       int i;
426       
427       for (i = 0; i < n_lag; i++)
428         case_destroy (&lag_queue[i]);
429       free (lag_queue);
430       n_lag = 0;
431     }
432   
433   /* Dictionary from before TEMPORARY becomes permanent.. */
434   if (temporary)
435     {
436       dict_destroy (default_dict);
437       default_dict = temp_dict;
438       temp_dict = NULL;
439     }
440
441   /* Finish compaction. */
442   if (compaction_necessary)
443     dict_compact_values (default_dict);
444     
445   /* Free data source. */
446   if (vfm_source != NULL) 
447     {
448       free_case_source (vfm_source);
449       vfm_source = NULL;
450     }
451
452   /* Old data sink becomes new data source. */
453   if (vfm_sink->class->make_source != NULL)
454     vfm_source = vfm_sink->class->make_source (vfm_sink);
455   free_case_sink (vfm_sink);
456   vfm_sink = NULL;
457
458   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
459      and get rid of all the transformations. */
460   cancel_temporary ();
461   expr_free (process_if_expr);
462   process_if_expr = NULL;
463   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
464     dict_set_filter (default_dict, NULL);
465   dict_set_case_limit (default_dict, 0);
466   dict_clear_vectors (default_dict);
467   cancel_transformations ();
468 }
469 \f
470 /* Storage case stream. */
471
472 /* Information about storage sink or source. */
473 struct storage_stream_info 
474   {
475     struct casefile *casefile;  /* Storage. */
476   };
477
478 /* Initializes a storage sink. */
479 static void
480 storage_sink_open (struct case_sink *sink)
481 {
482   struct storage_stream_info *info;
483
484   sink->aux = info = xmalloc (sizeof *info);
485   info->casefile = casefile_create (sink->value_cnt);
486 }
487
488 /* Destroys storage stream represented by INFO. */
489 static void
490 destroy_storage_stream_info (struct storage_stream_info *info) 
491 {
492   if (info != NULL) 
493     {
494       casefile_destroy (info->casefile);
495       free (info); 
496     }
497 }
498
499 /* Writes case C to the storage sink SINK. */
500 static void
501 storage_sink_write (struct case_sink *sink, const struct ccase *c)
502 {
503   struct storage_stream_info *info = sink->aux;
504
505   casefile_append (info->casefile, c);
506 }
507
508 /* Destroys internal data in SINK. */
509 static void
510 storage_sink_destroy (struct case_sink *sink)
511 {
512   destroy_storage_stream_info (sink->aux);
513 }
514
515 /* Closes the sink and returns a storage source to read back the
516    written data. */
517 static struct case_source *
518 storage_sink_make_source (struct case_sink *sink) 
519 {
520   struct case_source *source
521     = create_case_source (&storage_source_class, sink->dict, sink->aux);
522   sink->aux = NULL;
523   return source;
524 }
525
526 /* Storage sink. */
527 const struct case_sink_class storage_sink_class = 
528   {
529     "storage",
530     storage_sink_open,
531     storage_sink_write,
532     storage_sink_destroy,
533     storage_sink_make_source,
534   };
535 \f
536 /* Storage source. */
537
538 /* Returns the number of cases that will be read by
539    storage_source_read(). */
540 static int
541 storage_source_count (const struct case_source *source) 
542 {
543   struct storage_stream_info *info = source->aux;
544
545   return casefile_get_case_cnt (info->casefile);
546 }
547
548 /* Reads all cases from the storage source and passes them one by one to
549    write_case(). */
550 static void
551 storage_source_read (struct case_source *source,
552                      struct ccase *output_case,
553                      write_case_func *write_case, write_case_data wc_data)
554 {
555   struct storage_stream_info *info = source->aux;
556   struct ccase casefile_case;
557   struct casereader *reader;
558
559   for (reader = casefile_get_reader (info->casefile);
560        casereader_read (reader, &casefile_case);
561        case_destroy (&casefile_case))
562     {
563       case_copy (output_case, 0,
564                  &casefile_case, 0,
565                  casefile_get_value_cnt (info->casefile));
566       write_case (wc_data);
567     }
568   casereader_destroy (reader);
569 }
570
571 /* Destroys the source's internal data. */
572 static void
573 storage_source_destroy (struct case_source *source)
574 {
575   destroy_storage_stream_info (source->aux);
576 }
577
578 /* Storage source. */
579 const struct case_source_class storage_source_class = 
580   {
581     "storage",
582     storage_source_count,
583     storage_source_read,
584     storage_source_destroy,
585   };
586
587 struct casefile *
588 storage_source_get_casefile (struct case_source *source) 
589 {
590   struct storage_stream_info *info = source->aux;
591
592   assert (source->class == &storage_source_class);
593   return info->casefile;
594 }
595
596 struct case_source *
597 storage_source_create (struct casefile *cf, const struct dictionary *dict) 
598 {
599   struct storage_stream_info *info;
600
601   info = xmalloc (sizeof *info);
602   info->casefile = cf;
603
604   return create_case_source (&storage_source_class, dict, info);
605 }
606 \f
607 /* Null sink.  Used by a few procedures that keep track of output
608    themselves and would throw away anything that the sink
609    contained anyway. */
610
611 const struct case_sink_class null_sink_class = 
612   {
613     "null",
614     NULL,
615     NULL,
616     NULL,
617     NULL,
618   };
619 \f
620 /* Returns a pointer to the lagged case from N_BEFORE cases before the
621    current one, or NULL if there haven't been that many cases yet. */
622 struct ccase *
623 lagged_case (int n_before)
624 {
625   assert (n_before >= 1 && n_before <= n_lag);
626   if (n_before <= lag_count)
627     {
628       int index = lag_head - n_before;
629       if (index < 0)
630         index += n_lag;
631       return &lag_queue[index];
632     }
633   else
634     return NULL;
635 }
636    
637 /* Appends TRNS to t_trns[], the list of all transformations to be
638    performed on data as it is read from the active file. */
639 void
640 add_transformation (struct trns_header * trns)
641 {
642   if (n_trns >= m_trns)
643     {
644       m_trns += 16;
645       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
646     }
647   t_trns[n_trns] = trns;
648   trns->index = n_trns++;
649 }
650
651 /* Cancels all active transformations, including any transformations
652    created by the input program. */
653 void
654 cancel_transformations (void)
655 {
656   int i;
657   for (i = 0; i < n_trns; i++)
658     {
659       if (t_trns[i]->free)
660         t_trns[i]->free (t_trns[i]);
661       free (t_trns[i]);
662     }
663   n_trns = f_trns = 0;
664   free (t_trns);
665   t_trns=NULL;
666   m_trns = 0;
667 }
668 \f
669 /* Creates a case source with class CLASS and auxiliary data AUX
670    and based on dictionary DICT. */
671 struct case_source *
672 create_case_source (const struct case_source_class *class,
673                     const struct dictionary *dict,
674                     void *aux) 
675 {
676   struct case_source *source = xmalloc (sizeof *source);
677   source->class = class;
678   source->value_cnt = dict_get_next_value_idx (dict);
679   source->aux = aux;
680   return source;
681 }
682
683 /* Destroys case source SOURCE.  It is the caller's responsible to
684    call the source's destroy function, if any. */
685 void
686 free_case_source (struct case_source *source) 
687 {
688   if (source != NULL) 
689     {
690       if (source->class->destroy != NULL)
691         source->class->destroy (source);
692       free (source);
693     }
694 }
695
696 /* Returns nonzero if a case source is "complex". */
697 int
698 case_source_is_complex (const struct case_source *source) 
699 {
700   return source != NULL && (source->class == &input_program_source_class
701                             || source->class == &file_type_source_class);
702 }
703
704 /* Returns nonzero if CLASS is the class of SOURCE. */
705 int
706 case_source_is_class (const struct case_source *source,
707                       const struct case_source_class *class) 
708 {
709   return source != NULL && source->class == class;
710 }
711
712 /* Creates a case sink with class CLASS and auxiliary data
713    AUX. */
714 struct case_sink *
715 create_case_sink (const struct case_sink_class *class,
716                   const struct dictionary *dict,
717                   void *aux) 
718 {
719   struct case_sink *sink = xmalloc (sizeof *sink);
720   sink->class = class;
721   sink->dict = dict;
722   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
723   sink->value_cnt = dict_get_compacted_value_cnt (dict);
724   sink->aux = aux;
725   return sink;
726 }
727
728 /* Destroys case sink SINK.  */
729 void
730 free_case_sink (struct case_sink *sink) 
731 {
732   if (sink != NULL) 
733     {
734       if (sink->class->destroy != NULL)
735         sink->class->destroy (sink);
736       free (sink->idx_to_fv);
737       free (sink); 
738     }
739 }
740 \f
741 /* Represents auxiliary data for handling SPLIT FILE. */
742 struct split_aux_data 
743   {
744     size_t case_count;          /* Number of cases so far. */
745     struct ccase prev_case;     /* Data in previous case. */
746
747     /* Functions to call... */
748     void (*begin_func) (void *);               /* ...before data. */
749     int (*proc_func) (struct ccase *, void *); /* ...with data. */
750     void (*end_func) (void *);                 /* ...after data. */
751     void *func_aux;                            /* Auxiliary data. */ 
752   };
753
754 static int equal_splits (const struct ccase *, const struct ccase *);
755 static int procedure_with_splits_callback (struct ccase *, void *);
756 static void dump_splits (struct ccase *);
757
758 /* Like procedure(), but it automatically breaks the case stream
759    into SPLIT FILE break groups.  Before each group of cases with
760    identical SPLIT FILE variable values, BEGIN_FUNC is called.
761    Then PROC_FUNC is called with each case in the group.  
762    END_FUNC is called when the group is finished.  FUNC_AUX is
763    passed to each of the functions as auxiliary data.
764
765    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
766    and END_FUNC will be called at all. 
767
768    If SPLIT FILE is not in effect, then there is one break group
769    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
770    will be called once. */
771 void
772 procedure_with_splits (void (*begin_func) (void *aux),
773                        int (*proc_func) (struct ccase *, void *aux),
774                        void (*end_func) (void *aux),
775                        void *func_aux) 
776 {
777   struct split_aux_data split_aux;
778
779   split_aux.case_count = 0;
780   case_nullify (&split_aux.prev_case);
781   split_aux.begin_func = begin_func;
782   split_aux.proc_func = proc_func;
783   split_aux.end_func = end_func;
784   split_aux.func_aux = func_aux;
785
786   procedure (procedure_with_splits_callback, &split_aux);
787
788   if (split_aux.case_count > 0 && end_func != NULL)
789     end_func (func_aux);
790   case_destroy (&split_aux.prev_case);
791 }
792
793 /* procedure() callback used by procedure_with_splits(). */
794 static int
795 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
796 {
797   struct split_aux_data *split_aux = split_aux_;
798
799   /* Start a new series if needed. */
800   if (split_aux->case_count == 0
801       || !equal_splits (c, &split_aux->prev_case))
802     {
803       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
804         split_aux->end_func (split_aux->func_aux);
805
806       dump_splits (c);
807       case_destroy (&split_aux->prev_case);
808       case_clone (&split_aux->prev_case, c);
809
810       if (split_aux->begin_func != NULL)
811         split_aux->begin_func (split_aux->func_aux);
812     }
813
814   split_aux->case_count++;
815   if (split_aux->proc_func != NULL)
816     return split_aux->proc_func (c, split_aux->func_aux);
817   else
818     return 1;
819 }
820
821 /* Compares the SPLIT FILE variables in cases A and B and returns
822    nonzero only if they differ. */
823 static int
824 equal_splits (const struct ccase *a, const struct ccase *b) 
825 {
826   struct variable *const *split;
827   size_t split_cnt;
828   size_t i;
829     
830   split = dict_get_split_vars (default_dict);
831   split_cnt = dict_get_split_cnt (default_dict);
832   for (i = 0; i < split_cnt; i++)
833     {
834       struct variable *v = split[i];
835       
836       switch (v->type)
837         {
838         case NUMERIC:
839           if (case_num (a, v->fv) != case_num (b, v->fv))
840             return 0;
841           break;
842         case ALPHA:
843           if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
844             return 0;
845           break;
846         default:
847           assert (0);
848         }
849     }
850
851   return 1;
852 }
853
854 /* Dumps out the values of all the split variables for the case C. */
855 static void
856 dump_splits (struct ccase *c)
857 {
858   struct variable *const *split;
859   struct tab_table *t;
860   size_t split_cnt;
861   int i;
862
863   split_cnt = dict_get_split_cnt (default_dict);
864   if (split_cnt == 0)
865     return;
866
867   t = tab_create (3, split_cnt + 1, 0);
868   tab_dim (t, tab_natural_dimensions);
869   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
870   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
871   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
872   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
873   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
874   split = dict_get_split_vars (default_dict);
875   for (i = 0; i < split_cnt; i++)
876     {
877       struct variable *v = split[i];
878       char temp_buf[80];
879       const char *val_lab;
880
881       assert (v->type == NUMERIC || v->type == ALPHA);
882       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
883       
884       data_out (temp_buf, &v->print, case_data (c, v->fv));
885       
886       temp_buf[v->print.w] = 0;
887       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
888
889       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
890       if (val_lab)
891         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
892     }
893   tab_flags (t, SOMF_NO_TITLE);
894   tab_submit (t);
895 }
896 \f
897 /* Represents auxiliary data for handling SPLIT FILE in a
898    multipass procedure. */
899 struct multipass_split_aux_data 
900   {
901     struct ccase prev_case;     /* Data in previous case. */
902     struct casefile *casefile;  /* Accumulates data for a split. */
903
904     /* Function to call with the accumulated data. */
905     void (*split_func) (const struct casefile *, void *);
906     void *func_aux;                            /* Auxiliary data. */ 
907   };
908
909 static int multipass_split_callback (struct ccase *c, void *aux_);
910 static void multipass_split_output (struct multipass_split_aux_data *);
911
912 void
913 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
914                                                      void *),
915                                  void *func_aux) 
916 {
917   struct multipass_split_aux_data aux;
918
919   assert (split_func != NULL);
920
921   open_active_file ();
922
923   case_nullify (&aux.prev_case);
924   aux.casefile = NULL;
925   aux.split_func = split_func;
926   aux.func_aux = func_aux;
927
928   internal_procedure (multipass_split_callback, &aux);
929   if (aux.casefile != NULL)
930     multipass_split_output (&aux);
931   case_destroy (&aux.prev_case);
932
933   close_active_file ();
934 }
935
936 /* procedure() callback used by multipass_procedure_with_splits(). */
937 static int
938 multipass_split_callback (struct ccase *c, void *aux_)
939 {
940   struct multipass_split_aux_data *aux = aux_;
941
942   /* Start a new series if needed. */
943   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
944     {
945       /* Pass any cases to split_func. */
946       if (aux->casefile != NULL)
947         multipass_split_output (aux);
948
949       /* Start a new casefile. */
950       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
951
952       /* Record split values. */
953       dump_splits (c);
954       case_destroy (&aux->prev_case);
955       case_clone (&aux->prev_case, c);
956     }
957
958   casefile_append (aux->casefile, c);
959
960   return 1;
961 }
962
963 static void
964 multipass_split_output (struct multipass_split_aux_data *aux)
965 {
966   assert (aux->casefile != NULL);
967   aux->split_func (aux->casefile, aux->func_aux);
968   casefile_destroy (aux->casefile);
969   aux->casefile = NULL;
970 }