Instead of making system or portable file readers responsible for
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "dictionary.h"
34 #include "do-ifP.h"
35 #include "error.h"
36 #include "expr.h"
37 #include "misc.h"
38 #include "random.h"
39 #include "settings.h"
40 #include "som.h"
41 #include "str.h"
42 #include "tab.h"
43 #include "var.h"
44 #include "value-labels.h"
45
46 /*
47    Virtual File Manager (vfm):
48
49    vfm is used to process data files.  It uses the model that
50    data is read from one stream (the data source), processed,
51    then written to another (the data sink).  The data source is
52    then deleted and the data sink becomes the data source for the
53    next procedure. */
54
55 /* Procedure execution data. */
56 struct write_case_data
57   {
58     /* Function to call for each case. */
59     int (*proc_func) (struct ccase *, void *); /* Function. */
60     void *aux;                                 /* Auxiliary data. */ 
61
62     struct ccase trns_case;     /* Case used for transformations. */
63     struct ccase sink_case;     /* Case written to sink, if
64                                    compaction is necessary. */
65     size_t cases_written;       /* Cases output so far. */
66     size_t cases_analyzed;      /* Cases passed to procedure so far. */
67   };
68
69 /* The current active file, from which cases are read. */
70 struct case_source *vfm_source;
71
72 /* The replacement active file, to which cases are written. */
73 struct case_sink *vfm_sink;
74
75 /* Nonzero if the case needs to have values deleted before being
76    stored, zero otherwise. */
77 static int compaction_necessary;
78
79 /* Time at which vfm was last invoked. */
80 time_t last_vfm_invocation;
81
82 /* Lag queue. */
83 int n_lag;                      /* Number of cases to lag. */
84 static int lag_count;           /* Number of cases in lag_queue so far. */
85 static int lag_head;            /* Index where next case will be added. */
86 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
87
88 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
89                                 void *aux);
90 static void create_trns_case (struct ccase *, struct dictionary *);
91 static void open_active_file (void);
92 static int write_case (struct write_case_data *wc_data);
93 static int execute_transformations (struct ccase *c,
94                                     struct trns_header **trns,
95                                     int first_idx, int last_idx,
96                                     int case_num);
97 static int filter_case (const struct ccase *c, int case_num);
98 static void lag_case (const struct ccase *c);
99 static void compact_case (struct ccase *dest, const struct ccase *src);
100 static void clear_case (struct ccase *c);
101 static void close_active_file (void);
102 \f
103 /* Public functions. */
104
105 /* Reads the data from the input program and writes it to a new
106    active file.  For each case we read from the input program, we
107    do the following
108
109    1. Execute permanent transformations.  If these drop the case,
110       start the next case from step 1.
111
112    2. N OF CASES.  If we have already written N cases, start the
113       next case from step 1.
114    
115    3. Write case to replacement active file.
116    
117    4. Execute temporary transformations.  If these drop the case,
118       start the next case from step 1.
119       
120    5. FILTER, PROCESS IF.  If these drop the case, start the next
121       case from step 1.
122    
123    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
124       cases, start the next case from step 1.
125       
126    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
127 void
128 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
129 {
130   if (proc_func == NULL
131       && case_source_is_class (vfm_source, &storage_source_class)
132       && vfm_sink == NULL
133       && !temporary
134       && n_trns == 0)
135     {
136       /* Nothing to do. */
137       return;
138     }
139
140   open_active_file ();
141   internal_procedure (proc_func, aux);
142   close_active_file ();
143 }
144
145 /* Executes a procedure, as procedure(), except that the caller
146    is responsible for calling open_active_file() and
147    close_active_file(). */
148 static void
149 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
150 {
151   static int recursive_call;
152
153   struct write_case_data wc_data;
154
155   assert (++recursive_call == 1);
156
157   wc_data.proc_func = proc_func;
158   wc_data.aux = aux;
159   create_trns_case (&wc_data.trns_case, default_dict);
160   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
161   wc_data.cases_written = 0;
162
163   last_vfm_invocation = time (NULL);
164
165   if (vfm_source != NULL) 
166     vfm_source->class->read (vfm_source,
167                              &wc_data.trns_case,
168                              write_case, &wc_data);
169
170   case_destroy (&wc_data.sink_case);
171   case_destroy (&wc_data.trns_case);
172
173   assert (--recursive_call == 0);
174 }
175
176 /* Creates and returns a case, initializing it from the vectors
177    that say which `value's need to be initialized just once, and
178    which ones need to be re-initialized before every case. */
179 static void
180 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
181 {
182   size_t var_cnt = dict_get_var_cnt (dict);
183   size_t i;
184
185   case_create (trns_case, dict_get_next_value_idx (dict));
186   for (i = 0; i < var_cnt; i++) 
187     {
188       struct variable *v = dict_get_var (dict, i);
189       union value *value = case_data_rw (trns_case, v->fv);
190
191       if (v->type == NUMERIC)
192         value->f = v->reinit ? 0.0 : SYSMIS;
193       else
194         memset (value->s, ' ', v->width);
195     }
196 }
197
198 /* Makes all preparations for reading from the data source and writing
199    to the data sink. */
200 static void
201 open_active_file (void)
202 {
203   /* Make temp_dict refer to the dictionary right before data
204      reaches the sink */
205   if (!temporary)
206     {
207       temp_trns = n_trns;
208       temp_dict = default_dict;
209     }
210
211   /* Figure out compaction. */
212   compaction_necessary = (dict_get_next_value_idx (temp_dict)
213                           != dict_get_compacted_value_cnt (temp_dict));
214
215   /* Prepare sink. */
216   if (vfm_sink == NULL)
217     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
218   if (vfm_sink->class->open != NULL)
219     vfm_sink->class->open (vfm_sink);
220
221   /* Allocate memory for lag queue. */
222   if (n_lag > 0)
223     {
224       int i;
225   
226       lag_count = 0;
227       lag_head = 0;
228       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
229       for (i = 0; i < n_lag; i++)
230         case_nullify (&lag_queue[i]);
231     }
232
233   /* Close any unclosed DO IF or LOOP constructs. */
234   discard_ctl_stack ();
235 }
236
237 /* Transforms trns_case and writes it to the replacement active
238    file if advisable.  Returns nonzero if more cases can be
239    accepted, zero otherwise.  Do not call this function again
240    after it has returned zero once.  */
241 static int
242 write_case (struct write_case_data *wc_data)
243 {
244   /* Execute permanent transformations.  */
245   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
246                                 wc_data->cases_written + 1))
247     goto done;
248
249   /* N OF CASES. */
250   if (dict_get_case_limit (default_dict)
251       && wc_data->cases_written >= dict_get_case_limit (default_dict))
252     goto done;
253   wc_data->cases_written++;
254
255   /* Write case to LAG queue. */
256   if (n_lag)
257     lag_case (&wc_data->trns_case);
258
259   /* Write case to replacement active file. */
260   if (vfm_sink->class->write != NULL) 
261     {
262       if (compaction_necessary) 
263         {
264           compact_case (&wc_data->sink_case, &wc_data->trns_case);
265           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
266         }
267       else
268         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
269     }
270   
271   /* Execute temporary transformations. */
272   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
273                                 wc_data->cases_written))
274     goto done;
275   
276   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
277   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
278       || (dict_get_case_limit (temp_dict)
279           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
280     goto done;
281   wc_data->cases_analyzed++;
282
283   /* Pass case to procedure. */
284   if (wc_data->proc_func != NULL)
285     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
286
287  done:
288   clear_case (&wc_data->trns_case);
289   return 1;
290 }
291
292 /* Transforms case C using the transformations in TRNS[] with
293    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
294    become case CASE_NUM (1-based) in the output file.  Returns
295    zero if the case was filtered out by one of the
296    transformations, nonzero otherwise. */
297 static int
298 execute_transformations (struct ccase *c,
299                          struct trns_header **trns,
300                          int first_idx, int last_idx,
301                          int case_num) 
302 {
303   int idx;
304
305   for (idx = first_idx; idx != last_idx; )
306     {
307       int retval = trns[idx]->proc (trns[idx], c, case_num);
308       switch (retval)
309         {
310         case -1:
311           idx++;
312           break;
313           
314         case -2:
315           return 0;
316           
317         default:
318           idx = retval;
319           break;
320         }
321     }
322
323   return 1;
324 }
325
326 /* Returns nonzero if case C with case number CASE_NUM should be
327    exclude as specified on FILTER or PROCESS IF, otherwise
328    zero. */
329 static int
330 filter_case (const struct ccase *c, int case_idx)
331 {
332   /* FILTER. */
333   struct variable *filter_var = dict_get_filter (default_dict);
334   if (filter_var != NULL) 
335     {
336       double f = case_num (c, filter_var->fv);
337       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
338         return 1;
339     }
340
341   /* PROCESS IF. */
342   if (process_if_expr != NULL
343       && expr_evaluate (process_if_expr, c, case_idx, NULL) != 1.0)
344     return 1;
345
346   return 0;
347 }
348
349 /* Add C to the lag queue. */
350 static void
351 lag_case (const struct ccase *c)
352 {
353   if (lag_count < n_lag)
354     lag_count++;
355   case_destroy (&lag_queue[lag_head]);
356   case_clone (&lag_queue[lag_head], c);
357   if (++lag_head >= n_lag)
358     lag_head = 0;
359 }
360
361 /* Copies case SRC to case DEST, compacting it in the process. */
362 static void
363 compact_case (struct ccase *dest, const struct ccase *src)
364 {
365   int i;
366   int nval = 0;
367   size_t var_cnt;
368   
369   assert (compaction_necessary);
370
371   /* Copy all the variables except scratch variables from SRC to
372      DEST. */
373   /* FIXME: this should be temp_dict not default_dict I guess. */
374   var_cnt = dict_get_var_cnt (default_dict);
375   for (i = 0; i < var_cnt; i++)
376     {
377       struct variable *v = dict_get_var (default_dict, i);
378       
379       if (dict_class_from_id (v->name) == DC_SCRATCH)
380         continue;
381
382       if (v->type == NUMERIC) 
383         {
384           case_data_rw (dest, nval)->f = case_num (src, v->fv);
385           nval++; 
386         }
387       else
388         {
389           int w = DIV_RND_UP (v->width, sizeof (union value));
390           
391           memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
392                   w * sizeof (union value));
393           nval += w;
394         }
395     }
396 }
397
398 /* Clears the variables in C that need to be cleared between
399    processing cases.  */
400 static void
401 clear_case (struct ccase *c)
402 {
403   size_t var_cnt = dict_get_var_cnt (default_dict);
404   size_t i;
405   
406   for (i = 0; i < var_cnt; i++) 
407     {
408       struct variable *v = dict_get_var (default_dict, i);
409       if (v->init && v->reinit) 
410         {
411           if (v->type == NUMERIC)
412             case_data_rw (c, v->fv)->f = SYSMIS;
413           else
414             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
415         } 
416     }
417 }
418
419 /* Closes the active file. */
420 static void
421 close_active_file (void)
422 {
423   /* Free memory for lag queue, and turn off lagging. */
424   if (n_lag > 0)
425     {
426       int i;
427       
428       for (i = 0; i < n_lag; i++)
429         case_destroy (&lag_queue[i]);
430       free (lag_queue);
431       n_lag = 0;
432     }
433   
434   /* Dictionary from before TEMPORARY becomes permanent.. */
435   if (temporary)
436     {
437       dict_destroy (default_dict);
438       default_dict = temp_dict;
439       temp_dict = NULL;
440     }
441
442   /* Finish compaction. */
443   if (compaction_necessary)
444     dict_compact_values (default_dict);
445     
446   /* Free data source. */
447   if (vfm_source != NULL) 
448     {
449       free_case_source (vfm_source);
450       vfm_source = NULL;
451     }
452
453   /* Old data sink becomes new data source. */
454   if (vfm_sink->class->make_source != NULL)
455     vfm_source = vfm_sink->class->make_source (vfm_sink);
456   free_case_sink (vfm_sink);
457   vfm_sink = NULL;
458
459   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
460      and get rid of all the transformations. */
461   cancel_temporary ();
462   expr_free (process_if_expr);
463   process_if_expr = NULL;
464   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
465     dict_set_filter (default_dict, NULL);
466   dict_set_case_limit (default_dict, 0);
467   dict_clear_vectors (default_dict);
468   cancel_transformations ();
469 }
470 \f
471 /* Storage case stream. */
472
473 /* Information about storage sink or source. */
474 struct storage_stream_info 
475   {
476     struct casefile *casefile;  /* Storage. */
477   };
478
479 /* Initializes a storage sink. */
480 static void
481 storage_sink_open (struct case_sink *sink)
482 {
483   struct storage_stream_info *info;
484
485   sink->aux = info = xmalloc (sizeof *info);
486   info->casefile = casefile_create (sink->value_cnt);
487 }
488
489 /* Destroys storage stream represented by INFO. */
490 static void
491 destroy_storage_stream_info (struct storage_stream_info *info) 
492 {
493   if (info != NULL) 
494     {
495       casefile_destroy (info->casefile);
496       free (info); 
497     }
498 }
499
500 /* Writes case C to the storage sink SINK. */
501 static void
502 storage_sink_write (struct case_sink *sink, const struct ccase *c)
503 {
504   struct storage_stream_info *info = sink->aux;
505
506   casefile_append (info->casefile, c);
507 }
508
509 /* Destroys internal data in SINK. */
510 static void
511 storage_sink_destroy (struct case_sink *sink)
512 {
513   destroy_storage_stream_info (sink->aux);
514 }
515
516 /* Closes the sink and returns a storage source to read back the
517    written data. */
518 static struct case_source *
519 storage_sink_make_source (struct case_sink *sink) 
520 {
521   struct case_source *source
522     = create_case_source (&storage_source_class, sink->dict, sink->aux);
523   sink->aux = NULL;
524   return source;
525 }
526
527 /* Storage sink. */
528 const struct case_sink_class storage_sink_class = 
529   {
530     "storage",
531     storage_sink_open,
532     storage_sink_write,
533     storage_sink_destroy,
534     storage_sink_make_source,
535   };
536 \f
537 /* Storage source. */
538
539 /* Returns the number of cases that will be read by
540    storage_source_read(). */
541 static int
542 storage_source_count (const struct case_source *source) 
543 {
544   struct storage_stream_info *info = source->aux;
545
546   return casefile_get_case_cnt (info->casefile);
547 }
548
549 /* Reads all cases from the storage source and passes them one by one to
550    write_case(). */
551 static void
552 storage_source_read (struct case_source *source,
553                      struct ccase *output_case,
554                      write_case_func *write_case, write_case_data wc_data)
555 {
556   struct storage_stream_info *info = source->aux;
557   struct ccase casefile_case;
558   struct casereader *reader;
559
560   for (reader = casefile_get_reader (info->casefile);
561        casereader_read (reader, &casefile_case);
562        case_destroy (&casefile_case))
563     {
564       case_copy (output_case, 0,
565                  &casefile_case, 0,
566                  casefile_get_value_cnt (info->casefile));
567       write_case (wc_data);
568     }
569   casereader_destroy (reader);
570 }
571
572 /* Destroys the source's internal data. */
573 static void
574 storage_source_destroy (struct case_source *source)
575 {
576   destroy_storage_stream_info (source->aux);
577 }
578
579 /* Storage source. */
580 const struct case_source_class storage_source_class = 
581   {
582     "storage",
583     storage_source_count,
584     storage_source_read,
585     storage_source_destroy,
586   };
587
588 struct casefile *
589 storage_source_get_casefile (struct case_source *source) 
590 {
591   struct storage_stream_info *info = source->aux;
592
593   assert (source->class == &storage_source_class);
594   return info->casefile;
595 }
596
597 struct case_source *
598 storage_source_create (struct casefile *cf, const struct dictionary *dict) 
599 {
600   struct storage_stream_info *info;
601
602   info = xmalloc (sizeof *info);
603   info->casefile = cf;
604
605   return create_case_source (&storage_source_class, dict, info);
606 }
607 \f
608 /* Null sink.  Used by a few procedures that keep track of output
609    themselves and would throw away anything that the sink
610    contained anyway. */
611
612 const struct case_sink_class null_sink_class = 
613   {
614     "null",
615     NULL,
616     NULL,
617     NULL,
618     NULL,
619   };
620 \f
621 /* Returns a pointer to the lagged case from N_BEFORE cases before the
622    current one, or NULL if there haven't been that many cases yet. */
623 struct ccase *
624 lagged_case (int n_before)
625 {
626   assert (n_before <= n_lag);
627   if (n_before <= lag_count)
628     {
629       int index = lag_head - n_before;
630       if (index < 0)
631         index += n_lag;
632       return &lag_queue[index];
633     }
634   else
635     return NULL;
636 }
637    
638 /* Appends TRNS to t_trns[], the list of all transformations to be
639    performed on data as it is read from the active file. */
640 void
641 add_transformation (struct trns_header * trns)
642 {
643   if (n_trns >= m_trns)
644     {
645       m_trns += 16;
646       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
647     }
648   t_trns[n_trns] = trns;
649   trns->index = n_trns++;
650 }
651
652 /* Cancels all active transformations, including any transformations
653    created by the input program. */
654 void
655 cancel_transformations (void)
656 {
657   int i;
658   for (i = 0; i < n_trns; i++)
659     {
660       if (t_trns[i]->free)
661         t_trns[i]->free (t_trns[i]);
662       free (t_trns[i]);
663     }
664   n_trns = f_trns = 0;
665   if (m_trns > 32)
666     {
667       free (t_trns);
668       t_trns=NULL;
669       m_trns = 0;
670     }
671 }
672 \f
673 /* Creates a case source with class CLASS and auxiliary data AUX
674    and based on dictionary DICT. */
675 struct case_source *
676 create_case_source (const struct case_source_class *class,
677                     const struct dictionary *dict,
678                     void *aux) 
679 {
680   struct case_source *source = xmalloc (sizeof *source);
681   source->class = class;
682   source->value_cnt = dict_get_next_value_idx (dict);
683   source->aux = aux;
684   return source;
685 }
686
687 /* Destroys case source SOURCE.  It is the caller's responsible to
688    call the source's destroy function, if any. */
689 void
690 free_case_source (struct case_source *source) 
691 {
692   if (source != NULL) 
693     {
694       if (source->class->destroy != NULL)
695         source->class->destroy (source);
696       free (source);
697     }
698 }
699
700 /* Returns nonzero if a case source is "complex". */
701 int
702 case_source_is_complex (const struct case_source *source) 
703 {
704   return source != NULL && (source->class == &input_program_source_class
705                             || source->class == &file_type_source_class);
706 }
707
708 /* Returns nonzero if CLASS is the class of SOURCE. */
709 int
710 case_source_is_class (const struct case_source *source,
711                       const struct case_source_class *class) 
712 {
713   return source != NULL && source->class == class;
714 }
715
716 /* Creates a case sink with class CLASS and auxiliary data
717    AUX. */
718 struct case_sink *
719 create_case_sink (const struct case_sink_class *class,
720                   const struct dictionary *dict,
721                   void *aux) 
722 {
723   struct case_sink *sink = xmalloc (sizeof *sink);
724   sink->class = class;
725   sink->dict = dict;
726   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
727   sink->value_cnt = dict_get_compacted_value_cnt (dict);
728   sink->aux = aux;
729   return sink;
730 }
731
732 /* Destroys case sink SINK.  */
733 void
734 free_case_sink (struct case_sink *sink) 
735 {
736   if (sink != NULL) 
737     {
738       if (sink->class->destroy != NULL)
739         sink->class->destroy (sink);
740       free (sink->idx_to_fv);
741       free (sink); 
742     }
743 }
744 \f
745 /* Represents auxiliary data for handling SPLIT FILE. */
746 struct split_aux_data 
747   {
748     size_t case_count;          /* Number of cases so far. */
749     struct ccase prev_case;     /* Data in previous case. */
750
751     /* Functions to call... */
752     void (*begin_func) (void *);               /* ...before data. */
753     int (*proc_func) (struct ccase *, void *); /* ...with data. */
754     void (*end_func) (void *);                 /* ...after data. */
755     void *func_aux;                            /* Auxiliary data. */ 
756   };
757
758 static int equal_splits (const struct ccase *, const struct ccase *);
759 static int procedure_with_splits_callback (struct ccase *, void *);
760 static void dump_splits (struct ccase *);
761
762 /* Like procedure(), but it automatically breaks the case stream
763    into SPLIT FILE break groups.  Before each group of cases with
764    identical SPLIT FILE variable values, BEGIN_FUNC is called.
765    Then PROC_FUNC is called with each case in the group.  
766    END_FUNC is called when the group is finished.  FUNC_AUX is
767    passed to each of the functions as auxiliary data.
768
769    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
770    and END_FUNC will be called at all. 
771
772    If SPLIT FILE is not in effect, then there is one break group
773    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
774    will be called once. */
775 void
776 procedure_with_splits (void (*begin_func) (void *aux),
777                        int (*proc_func) (struct ccase *, void *aux),
778                        void (*end_func) (void *aux),
779                        void *func_aux) 
780 {
781   struct split_aux_data split_aux;
782
783   split_aux.case_count = 0;
784   case_nullify (&split_aux.prev_case);
785   split_aux.begin_func = begin_func;
786   split_aux.proc_func = proc_func;
787   split_aux.end_func = end_func;
788   split_aux.func_aux = func_aux;
789
790   procedure (procedure_with_splits_callback, &split_aux);
791
792   if (split_aux.case_count > 0 && end_func != NULL)
793     end_func (func_aux);
794   case_destroy (&split_aux.prev_case);
795 }
796
797 /* procedure() callback used by procedure_with_splits(). */
798 static int
799 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
800 {
801   struct split_aux_data *split_aux = split_aux_;
802
803   /* Start a new series if needed. */
804   if (split_aux->case_count == 0
805       || !equal_splits (c, &split_aux->prev_case))
806     {
807       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
808         split_aux->end_func (split_aux->func_aux);
809
810       dump_splits (c);
811       case_destroy (&split_aux->prev_case);
812       case_clone (&split_aux->prev_case, c);
813
814       if (split_aux->begin_func != NULL)
815         split_aux->begin_func (split_aux->func_aux);
816     }
817
818   split_aux->case_count++;
819   if (split_aux->proc_func != NULL)
820     return split_aux->proc_func (c, split_aux->func_aux);
821   else
822     return 1;
823 }
824
825 /* Compares the SPLIT FILE variables in cases A and B and returns
826    nonzero only if they differ. */
827 static int
828 equal_splits (const struct ccase *a, const struct ccase *b) 
829 {
830   struct variable *const *split;
831   size_t split_cnt;
832   size_t i;
833     
834   split = dict_get_split_vars (default_dict);
835   split_cnt = dict_get_split_cnt (default_dict);
836   for (i = 0; i < split_cnt; i++)
837     {
838       struct variable *v = split[i];
839       
840       switch (v->type)
841         {
842         case NUMERIC:
843           if (case_num (a, v->fv) != case_num (b, v->fv))
844             return 0;
845           break;
846         case ALPHA:
847           if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
848             return 0;
849           break;
850         default:
851           assert (0);
852         }
853     }
854
855   return 1;
856 }
857
858 /* Dumps out the values of all the split variables for the case C. */
859 static void
860 dump_splits (struct ccase *c)
861 {
862   struct variable *const *split;
863   struct tab_table *t;
864   size_t split_cnt;
865   int i;
866
867   split_cnt = dict_get_split_cnt (default_dict);
868   if (split_cnt == 0)
869     return;
870
871   t = tab_create (3, split_cnt + 1, 0);
872   tab_dim (t, tab_natural_dimensions);
873   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
874   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
875   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
876   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
877   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
878   split = dict_get_split_vars (default_dict);
879   for (i = 0; i < split_cnt; i++)
880     {
881       struct variable *v = split[i];
882       char temp_buf[80];
883       const char *val_lab;
884
885       assert (v->type == NUMERIC || v->type == ALPHA);
886       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
887       
888       data_out (temp_buf, &v->print, case_data (c, v->fv));
889       
890       temp_buf[v->print.w] = 0;
891       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
892
893       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
894       if (val_lab)
895         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
896     }
897   tab_flags (t, SOMF_NO_TITLE);
898   tab_submit (t);
899 }
900 \f
901 /* Represents auxiliary data for handling SPLIT FILE in a
902    multipass procedure. */
903 struct multipass_split_aux_data 
904   {
905     struct ccase prev_case;     /* Data in previous case. */
906     struct casefile *casefile;  /* Accumulates data for a split. */
907
908     /* Function to call with the accumulated data. */
909     void (*split_func) (const struct casefile *, void *);
910     void *func_aux;                            /* Auxiliary data. */ 
911   };
912
913 static int multipass_split_callback (struct ccase *c, void *aux_);
914 static void multipass_split_output (struct multipass_split_aux_data *);
915
916 void
917 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
918                                                      void *),
919                                  void *func_aux) 
920 {
921   struct multipass_split_aux_data aux;
922
923   assert (split_func != NULL);
924
925   open_active_file ();
926
927   case_nullify (&aux.prev_case);
928   aux.casefile = NULL;
929   aux.split_func = split_func;
930   aux.func_aux = func_aux;
931
932   internal_procedure (multipass_split_callback, &aux);
933   if (aux.casefile != NULL)
934     multipass_split_output (&aux);
935   case_destroy (&aux.prev_case);
936
937   close_active_file ();
938 }
939
940 /* procedure() callback used by multipass_procedure_with_splits(). */
941 static int
942 multipass_split_callback (struct ccase *c, void *aux_)
943 {
944   struct multipass_split_aux_data *aux = aux_;
945
946   /* Start a new series if needed. */
947   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
948     {
949       /* Pass any cases to split_func. */
950       if (aux->casefile != NULL)
951         multipass_split_output (aux);
952
953       /* Start a new casefile. */
954       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
955
956       /* Record split values. */
957       dump_splits (c);
958       case_destroy (&aux->prev_case);
959       case_clone (&aux->prev_case, c);
960     }
961
962   casefile_append (aux->casefile, c);
963
964   return 1;
965 }
966
967 static void
968 multipass_split_output (struct multipass_split_aux_data *aux)
969 {
970   assert (aux->casefile != NULL);
971   aux->split_func (aux->casefile, aux->func_aux);
972   casefile_destroy (aux->casefile);
973   aux->casefile = NULL;
974 }