58edebca6ad7dd25a67271b47c7efb3871515a13
[pspp-builds.git] / src / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <procedure.h>
22 #include <libpspp/message.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <libpspp/alloc.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <language/command.h>
31 #include <data/dictionary.h>
32 #include <language/control/control-stack.h>
33 #include <libpspp/message.h>
34 #include "expressions/public.h"
35 #include <data/file-handle-def.h>
36 #include <libpspp/misc.h>
37 #include <data/settings.h>
38 #include <output/manager.h>
39 #include <output/table.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <data/value-labels.h>
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 /*
48    Virtual File Manager (vfm):
49
50    vfm is used to process data files.  It uses the model that
51    data is read from one stream (the data source), processed,
52    then written to another (the data sink).  The data source is
53    then deleted and the data sink becomes the data source for the
54    next procedure. */
55
56 /* Procedure execution data. */
57 struct write_case_data
58   {
59     /* Function to call for each case. */
60     bool (*proc_func) (struct ccase *, void *); /* Function. */
61     void *aux;                                 /* Auxiliary data. */ 
62
63     struct ccase trns_case;     /* Case used for transformations. */
64     struct ccase sink_case;     /* Case written to sink, if
65                                    compaction is necessary. */
66     size_t cases_written;       /* Cases output so far. */
67     size_t cases_analyzed;      /* Cases passed to procedure so far. */
68   };
69
70 /* The current active file, from which cases are read. */
71 struct case_source *vfm_source;
72
73 /* The replacement active file, to which cases are written. */
74 struct case_sink *vfm_sink;
75
76 /* The compactor used to compact a compact, if necessary;
77    otherwise a null pointer. */
78 static struct dict_compactor *compactor;
79
80 /* Time at which vfm was last invoked. */
81 static time_t last_vfm_invocation;
82
83 /* Lag queue. */
84 int n_lag;                      /* Number of cases to lag. */
85 static int lag_count;           /* Number of cases in lag_queue so far. */
86 static int lag_head;            /* Index where next case will be added. */
87 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
88
89 /* Active transformations. */
90 struct transformation *t_trns;
91 size_t n_trns, m_trns, f_trns;
92
93 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
94                                 void *aux);
95 static void update_last_vfm_invocation (void);
96 static void create_trns_case (struct ccase *, struct dictionary *);
97 static void open_active_file (void);
98 static bool write_case (struct write_case_data *wc_data);
99 static int execute_transformations (struct ccase *c,
100                                     struct transformation *trns,
101                                     int first_idx, int last_idx,
102                                     int case_num);
103 static int filter_case (const struct ccase *c, int case_num);
104 static void lag_case (const struct ccase *c);
105 static void clear_case (struct ccase *c);
106 static bool close_active_file (void);
107 \f
108 /* Public functions. */
109
110 /* Returns the last time the data was read. */
111 time_t
112 vfm_last_invocation (void) 
113 {
114   if (last_vfm_invocation == 0)
115     update_last_vfm_invocation ();
116   return last_vfm_invocation;
117 }
118
119 /* Reads the data from the input program and writes it to a new
120    active file.  For each case we read from the input program, we
121    do the following
122
123    1. Execute permanent transformations.  If these drop the case,
124       start the next case from step 1.
125
126    2. N OF CASES.  If we have already written N cases, start the
127       next case from step 1.
128    
129    3. Write case to replacement active file.
130    
131    4. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133       
134    5. FILTER, PROCESS IF.  If these drop the case, start the next
135       case from step 1.
136    
137    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
138       cases, start the next case from step 1.
139       
140    7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
141
142    Returns true if successful, false if an I/O error occurred. */
143 bool
144 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
145 {
146   if (proc_func == NULL
147       && case_source_is_class (vfm_source, &storage_source_class)
148       && vfm_sink == NULL
149       && !temporary
150       && n_trns == 0)
151     {
152       /* Nothing to do. */
153       update_last_vfm_invocation ();
154       return true;
155     }
156   else 
157     {
158       bool ok;
159       
160       open_active_file ();
161       ok = internal_procedure (proc_func, aux);
162       if (!close_active_file ())
163         ok = false;
164
165       return ok;
166     }
167 }
168
169 /* Executes a procedure, as procedure(), except that the caller
170    is responsible for calling open_active_file() and
171    close_active_file().
172    Returns true if successful, false if an I/O error occurred. */
173 static bool
174 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
175 {
176   static int recursive_call;
177   struct write_case_data wc_data;
178   bool ok;
179
180   assert (++recursive_call == 1);
181
182   wc_data.proc_func = proc_func;
183   wc_data.aux = aux;
184   create_trns_case (&wc_data.trns_case, default_dict);
185   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
186   wc_data.cases_written = 0;
187
188   update_last_vfm_invocation ();
189
190   ok = (vfm_source == NULL
191         || vfm_source->class->read (vfm_source,
192                                     &wc_data.trns_case,
193                                     write_case, &wc_data));
194
195   case_destroy (&wc_data.sink_case);
196   case_destroy (&wc_data.trns_case);
197
198   assert (--recursive_call == 0);
199
200   return ok;
201 }
202
203 /* Updates last_vfm_invocation. */
204 static void
205 update_last_vfm_invocation (void) 
206 {
207   last_vfm_invocation = time (NULL);
208 }
209
210 /* Creates and returns a case, initializing it from the vectors
211    that say which `value's need to be initialized just once, and
212    which ones need to be re-initialized before every case. */
213 static void
214 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
215 {
216   size_t var_cnt = dict_get_var_cnt (dict);
217   size_t i;
218
219   case_create (trns_case, dict_get_next_value_idx (dict));
220   for (i = 0; i < var_cnt; i++) 
221     {
222       struct variable *v = dict_get_var (dict, i);
223       union value *value = case_data_rw (trns_case, v->fv);
224
225       if (v->type == NUMERIC)
226         value->f = v->reinit ? 0.0 : SYSMIS;
227       else
228         memset (value->s, ' ', v->width);
229     }
230 }
231
232 /* Makes all preparations for reading from the data source and writing
233    to the data sink. */
234 static void
235 open_active_file (void)
236 {
237   /* Make temp_dict refer to the dictionary right before data
238      reaches the sink */
239   if (!temporary)
240     {
241       temp_trns = n_trns;
242       temp_dict = default_dict;
243     }
244
245   /* Figure out compaction. */
246   compactor = (dict_needs_compaction (temp_dict)
247                ? dict_make_compactor (temp_dict)
248                : NULL);
249
250   /* Prepare sink. */
251   if (vfm_sink == NULL)
252     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
253   if (vfm_sink->class->open != NULL)
254     vfm_sink->class->open (vfm_sink);
255
256   /* Allocate memory for lag queue. */
257   if (n_lag > 0)
258     {
259       int i;
260   
261       lag_count = 0;
262       lag_head = 0;
263       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
264       for (i = 0; i < n_lag; i++)
265         case_nullify (&lag_queue[i]);
266     }
267
268   /* Close any unclosed DO IF or LOOP constructs. */
269   ctl_stack_clear ();
270 }
271
272 /* Transforms trns_case and writes it to the replacement active
273    file if advisable.  Returns nonzero if more cases can be
274    accepted, zero otherwise.  Do not call this function again
275    after it has returned zero once.  */
276 static bool
277 write_case (struct write_case_data *wc_data)
278 {
279   int retval;
280   
281   /* Execute permanent transformations.  */
282   retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
283                                     temp_trns, wc_data->cases_written + 1);
284   if (retval != 1)
285     goto done;
286
287   /* N OF CASES. */
288   if (dict_get_case_limit (default_dict)
289       && wc_data->cases_written >= dict_get_case_limit (default_dict))
290     goto done;
291   wc_data->cases_written++;
292
293   /* Write case to LAG queue. */
294   if (n_lag)
295     lag_case (&wc_data->trns_case);
296
297   /* Write case to replacement active file. */
298   if (vfm_sink->class->write != NULL) 
299     {
300       if (compactor != NULL) 
301         {
302           dict_compactor_compact (compactor, &wc_data->sink_case,
303                                   &wc_data->trns_case);
304           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
305         }
306       else
307         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
308     }
309   
310   /* Execute temporary transformations. */
311   retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
312                                     n_trns, wc_data->cases_written);
313   if (retval != 1)
314     goto done;
315   
316   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
317   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
318       || (dict_get_case_limit (temp_dict)
319           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
320     goto done;
321   wc_data->cases_analyzed++;
322
323   /* Pass case to procedure. */
324   if (wc_data->proc_func != NULL)
325     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
326       retval = -1;
327
328  done:
329   clear_case (&wc_data->trns_case);
330   return retval != -1;
331 }
332
333 /* Transforms case C using the transformations in TRNS[] with
334    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
335    become case CASE_NUM (1-based) in the output file.  Returns 1
336    if the case was successfully transformed, 0 if it was filtered
337    out by one of the transformations, or -1 if the procedure
338    should be abandoned due to a fatal error. */
339 static int
340 execute_transformations (struct ccase *c,
341                          struct transformation *trns,
342                          int first_idx, int last_idx,
343                          int case_num) 
344 {
345   int idx;
346
347   for (idx = first_idx; idx != last_idx; )
348     {
349       struct transformation *t = &trns[idx];
350       int retval = t->proc (t->private, c, case_num);
351       switch (retval)
352         {
353         case TRNS_CONTINUE:
354           idx++;
355           break;
356           
357         case TRNS_DROP_CASE:
358           return 0;
359
360         case TRNS_ERROR:
361           return -1;
362
363         case TRNS_NEXT_CASE:
364           abort ();
365
366         case TRNS_END_FILE:
367           abort ();
368           
369         default:
370           idx = retval;
371           break;
372         }
373     }
374
375   return 1;
376 }
377
378 /* Returns nonzero if case C with case number CASE_NUM should be
379    exclude as specified on FILTER or PROCESS IF, otherwise
380    zero. */
381 static int
382 filter_case (const struct ccase *c, int case_idx)
383 {
384   /* FILTER. */
385   struct variable *filter_var = dict_get_filter (default_dict);
386   if (filter_var != NULL) 
387     {
388       double f = case_num (c, filter_var->fv);
389       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
390         return 1;
391     }
392
393   /* PROCESS IF. */
394   if (process_if_expr != NULL
395       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
396     return 1;
397
398   return 0;
399 }
400
401 /* Add C to the lag queue. */
402 static void
403 lag_case (const struct ccase *c)
404 {
405   if (lag_count < n_lag)
406     lag_count++;
407   case_destroy (&lag_queue[lag_head]);
408   case_clone (&lag_queue[lag_head], c);
409   if (++lag_head >= n_lag)
410     lag_head = 0;
411 }
412
413 /* Clears the variables in C that need to be cleared between
414    processing cases.  */
415 static void
416 clear_case (struct ccase *c)
417 {
418   size_t var_cnt = dict_get_var_cnt (default_dict);
419   size_t i;
420   
421   for (i = 0; i < var_cnt; i++) 
422     {
423       struct variable *v = dict_get_var (default_dict, i);
424       if (v->init && v->reinit) 
425         {
426           if (v->type == NUMERIC)
427             case_data_rw (c, v->fv)->f = SYSMIS;
428           else
429             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
430         } 
431     }
432 }
433
434 /* Closes the active file. */
435 static bool
436 close_active_file (void)
437 {
438   /* Free memory for lag queue, and turn off lagging. */
439   if (n_lag > 0)
440     {
441       int i;
442       
443       for (i = 0; i < n_lag; i++)
444         case_destroy (&lag_queue[i]);
445       free (lag_queue);
446       n_lag = 0;
447     }
448   
449   /* Dictionary from before TEMPORARY becomes permanent.. */
450   if (temporary)
451     {
452       dict_destroy (default_dict);
453       default_dict = temp_dict;
454       temp_dict = NULL;
455     }
456
457   /* Finish compaction. */
458   if (compactor != NULL) 
459     {
460       dict_compactor_destroy (compactor);
461       dict_compact_values (default_dict); 
462     }
463     
464   /* Free data source. */
465   free_case_source (vfm_source);
466   vfm_source = NULL;
467
468   /* Old data sink becomes new data source. */
469   if (vfm_sink->class->make_source != NULL)
470     vfm_source = vfm_sink->class->make_source (vfm_sink);
471   free_case_sink (vfm_sink);
472   vfm_sink = NULL;
473
474   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
475      and get rid of all the transformations. */
476   cancel_temporary ();
477   expr_free (process_if_expr);
478   process_if_expr = NULL;
479   dict_set_case_limit (default_dict, 0);
480   dict_clear_vectors (default_dict);
481   return cancel_transformations ();
482 }
483 \f
484 /* Storage case stream. */
485
486 /* Information about storage sink or source. */
487 struct storage_stream_info 
488   {
489     struct casefile *casefile;  /* Storage. */
490   };
491
492 /* Initializes a storage sink. */
493 static void
494 storage_sink_open (struct case_sink *sink)
495 {
496   struct storage_stream_info *info;
497
498   sink->aux = info = xmalloc (sizeof *info);
499   info->casefile = casefile_create (sink->value_cnt);
500 }
501
502 /* Destroys storage stream represented by INFO. */
503 static void
504 destroy_storage_stream_info (struct storage_stream_info *info) 
505 {
506   if (info != NULL) 
507     {
508       casefile_destroy (info->casefile);
509       free (info); 
510     }
511 }
512
513 /* Writes case C to the storage sink SINK.
514    Returns true if successful, false if an I/O error occurred. */
515 static bool
516 storage_sink_write (struct case_sink *sink, const struct ccase *c)
517 {
518   struct storage_stream_info *info = sink->aux;
519
520   return casefile_append (info->casefile, c);
521 }
522
523 /* Destroys internal data in SINK. */
524 static void
525 storage_sink_destroy (struct case_sink *sink)
526 {
527   destroy_storage_stream_info (sink->aux);
528 }
529
530 /* Closes the sink and returns a storage source to read back the
531    written data. */
532 static struct case_source *
533 storage_sink_make_source (struct case_sink *sink) 
534 {
535   struct case_source *source
536     = create_case_source (&storage_source_class, sink->aux);
537   sink->aux = NULL;
538   return source;
539 }
540
541 /* Storage sink. */
542 const struct case_sink_class storage_sink_class = 
543   {
544     "storage",
545     storage_sink_open,
546     storage_sink_write,
547     storage_sink_destroy,
548     storage_sink_make_source,
549   };
550 \f
551 /* Storage source. */
552
553 /* Returns the number of cases that will be read by
554    storage_source_read(). */
555 static int
556 storage_source_count (const struct case_source *source) 
557 {
558   struct storage_stream_info *info = source->aux;
559
560   return casefile_get_case_cnt (info->casefile);
561 }
562
563 /* Reads all cases from the storage source and passes them one by one to
564    write_case(). */
565 static bool
566 storage_source_read (struct case_source *source,
567                      struct ccase *output_case,
568                      write_case_func *write_case, write_case_data wc_data)
569 {
570   struct storage_stream_info *info = source->aux;
571   struct ccase casefile_case;
572   struct casereader *reader;
573   bool ok = true;
574
575   for (reader = casefile_get_reader (info->casefile);
576        ok && casereader_read (reader, &casefile_case);
577        case_destroy (&casefile_case))
578     {
579       case_copy (output_case, 0,
580                  &casefile_case, 0,
581                  casefile_get_value_cnt (info->casefile));
582       ok = write_case (wc_data);
583     }
584   casereader_destroy (reader);
585
586   return ok;
587 }
588
589 /* Destroys the source's internal data. */
590 static void
591 storage_source_destroy (struct case_source *source)
592 {
593   destroy_storage_stream_info (source->aux);
594 }
595
596 /* Storage source. */
597 const struct case_source_class storage_source_class = 
598   {
599     "storage",
600     storage_source_count,
601     storage_source_read,
602     storage_source_destroy,
603   };
604
605 struct casefile *
606 storage_source_get_casefile (struct case_source *source) 
607 {
608   struct storage_stream_info *info = source->aux;
609
610   assert (source->class == &storage_source_class);
611   return info->casefile;
612 }
613
614 struct case_source *
615 storage_source_create (struct casefile *cf)
616 {
617   struct storage_stream_info *info;
618
619   info = xmalloc (sizeof *info);
620   info->casefile = cf;
621
622   return create_case_source (&storage_source_class, info);
623 }
624 \f
625 /* Null sink.  Used by a few procedures that keep track of output
626    themselves and would throw away anything that the sink
627    contained anyway. */
628
629 const struct case_sink_class null_sink_class = 
630   {
631     "null",
632     NULL,
633     NULL,
634     NULL,
635     NULL,
636   };
637 \f
638 /* Returns a pointer to the lagged case from N_BEFORE cases before the
639    current one, or NULL if there haven't been that many cases yet. */
640 struct ccase *
641 lagged_case (int n_before)
642 {
643   assert (n_before >= 1 );
644   assert (n_before <= n_lag);
645
646   if (n_before <= lag_count)
647     {
648       int index = lag_head - n_before;
649       if (index < 0)
650         index += n_lag;
651       return &lag_queue[index];
652     }
653   else
654     return NULL;
655 }
656    
657 /* Appends TRNS to t_trns[], the list of all transformations to be
658    performed on data as it is read from the active file. */
659 void
660 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
661 {
662   struct transformation *trns;
663   if (n_trns >= m_trns)
664     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
665   trns = &t_trns[n_trns++];
666   trns->proc = proc;
667   trns->free = free;
668   trns->private = private;
669 }
670
671 /* Returns the index number that the next transformation added by
672    add_transformation() will receive.  A trns_proc_func that
673    returns this index causes control flow to jump to it. */
674 size_t
675 next_transformation (void) 
676 {
677   return n_trns;
678 }
679
680 /* Cancels all active transformations, including any transformations
681    created by the input program.
682    Returns true if successful, false if an I/O error occurred. */
683 bool
684 cancel_transformations (void)
685 {
686   bool ok = true;
687   size_t i;
688   for (i = 0; i < n_trns; i++)
689     {
690       struct transformation *t = &t_trns[i];
691       if (t->free != NULL) 
692         {
693           if (!t->free (t->private))
694             ok = false; 
695         }
696     }
697   n_trns = f_trns = 0;
698   free (t_trns);
699   t_trns = NULL;
700   m_trns = 0;
701   return ok;
702 }
703 \f
704 /* Creates a case source with class CLASS and auxiliary data AUX
705    and based on dictionary DICT. */
706 struct case_source *
707 create_case_source (const struct case_source_class *class,
708                     void *aux) 
709 {
710   struct case_source *source = xmalloc (sizeof *source);
711   source->class = class;
712   source->aux = aux;
713   return source;
714 }
715
716 /* Destroys case source SOURCE.  It is the caller's responsible to
717    call the source's destroy function, if any. */
718 void
719 free_case_source (struct case_source *source) 
720 {
721   if (source != NULL) 
722     {
723       if (source->class->destroy != NULL)
724         source->class->destroy (source);
725       free (source);
726     }
727 }
728
729 /* Returns nonzero if a case source is "complex". */
730 int
731 case_source_is_complex (const struct case_source *source) 
732 {
733   return source != NULL && (source->class == &input_program_source_class
734                             || source->class == &file_type_source_class);
735 }
736
737 /* Returns nonzero if CLASS is the class of SOURCE. */
738 int
739 case_source_is_class (const struct case_source *source,
740                       const struct case_source_class *class) 
741 {
742   return source != NULL && source->class == class;
743 }
744
745 /* Creates a case sink to accept cases from the given DICT with
746    class CLASS and auxiliary data AUX. */
747 struct case_sink *
748 create_case_sink (const struct case_sink_class *class,
749                   const struct dictionary *dict,
750                   void *aux) 
751 {
752   struct case_sink *sink = xmalloc (sizeof *sink);
753   sink->class = class;
754   sink->value_cnt = dict_get_compacted_value_cnt (dict);
755   sink->aux = aux;
756   return sink;
757 }
758
759 /* Destroys case sink SINK.  */
760 void
761 free_case_sink (struct case_sink *sink) 
762 {
763   if (sink != NULL) 
764     {
765       if (sink->class->destroy != NULL)
766         sink->class->destroy (sink);
767       free (sink); 
768     }
769 }
770 \f
771 /* Represents auxiliary data for handling SPLIT FILE. */
772 struct split_aux_data 
773   {
774     size_t case_count;          /* Number of cases so far. */
775     struct ccase prev_case;     /* Data in previous case. */
776
777     /* Functions to call... */
778     void (*begin_func) (void *);               /* ...before data. */
779     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
780     void (*end_func) (void *);                 /* ...after data. */
781     void *func_aux;                            /* Auxiliary data. */ 
782   };
783
784 static int equal_splits (const struct ccase *, const struct ccase *);
785 static bool procedure_with_splits_callback (struct ccase *, void *);
786 static void dump_splits (struct ccase *);
787
788 /* Like procedure(), but it automatically breaks the case stream
789    into SPLIT FILE break groups.  Before each group of cases with
790    identical SPLIT FILE variable values, BEGIN_FUNC is called.
791    Then PROC_FUNC is called with each case in the group.  
792    END_FUNC is called when the group is finished.  FUNC_AUX is
793    passed to each of the functions as auxiliary data.
794
795    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
796    and END_FUNC will be called at all. 
797
798    If SPLIT FILE is not in effect, then there is one break group
799    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
800    will be called once.
801    
802    Returns true if successful, false if an I/O error occurred. */
803 bool
804 procedure_with_splits (void (*begin_func) (void *aux),
805                        bool (*proc_func) (struct ccase *, void *aux),
806                        void (*end_func) (void *aux),
807                        void *func_aux) 
808 {
809   struct split_aux_data split_aux;
810   bool ok;
811
812   split_aux.case_count = 0;
813   case_nullify (&split_aux.prev_case);
814   split_aux.begin_func = begin_func;
815   split_aux.proc_func = proc_func;
816   split_aux.end_func = end_func;
817   split_aux.func_aux = func_aux;
818
819   open_active_file ();
820   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
821   if (split_aux.case_count > 0 && end_func != NULL)
822     end_func (func_aux);
823   if (!close_active_file ())
824     ok = false;
825
826   case_destroy (&split_aux.prev_case);
827
828   return ok;
829 }
830
831 /* procedure() callback used by procedure_with_splits(). */
832 static bool
833 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
834 {
835   struct split_aux_data *split_aux = split_aux_;
836
837   /* Start a new series if needed. */
838   if (split_aux->case_count == 0
839       || !equal_splits (c, &split_aux->prev_case))
840     {
841       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
842         split_aux->end_func (split_aux->func_aux);
843
844       dump_splits (c);
845       case_destroy (&split_aux->prev_case);
846       case_clone (&split_aux->prev_case, c);
847
848       if (split_aux->begin_func != NULL)
849         split_aux->begin_func (split_aux->func_aux);
850     }
851
852   split_aux->case_count++;
853   if (split_aux->proc_func != NULL)
854     return split_aux->proc_func (c, split_aux->func_aux);
855   else
856     return true;
857 }
858
859 /* Compares the SPLIT FILE variables in cases A and B and returns
860    nonzero only if they differ. */
861 static int
862 equal_splits (const struct ccase *a, const struct ccase *b) 
863 {
864   return case_compare (a, b,
865                        dict_get_split_vars (default_dict),
866                        dict_get_split_cnt (default_dict)) == 0;
867 }
868
869 /* Dumps out the values of all the split variables for the case C. */
870 static void
871 dump_splits (struct ccase *c)
872 {
873   struct variable *const *split;
874   struct tab_table *t;
875   size_t split_cnt;
876   int i;
877
878   split_cnt = dict_get_split_cnt (default_dict);
879   if (split_cnt == 0)
880     return;
881
882   t = tab_create (3, split_cnt + 1, 0);
883   tab_dim (t, tab_natural_dimensions);
884   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
885   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
886   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
887   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
888   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
889   split = dict_get_split_vars (default_dict);
890   for (i = 0; i < split_cnt; i++)
891     {
892       struct variable *v = split[i];
893       char temp_buf[80];
894       const char *val_lab;
895
896       assert (v->type == NUMERIC || v->type == ALPHA);
897       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
898       
899       data_out (temp_buf, &v->print, case_data (c, v->fv));
900       
901       temp_buf[v->print.w] = 0;
902       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
903
904       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
905       if (val_lab)
906         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
907     }
908   tab_flags (t, SOMF_NO_TITLE);
909   tab_submit (t);
910 }
911 \f
912 /* Represents auxiliary data for handling SPLIT FILE in a
913    multipass procedure. */
914 struct multipass_split_aux_data 
915   {
916     struct ccase prev_case;     /* Data in previous case. */
917     struct casefile *casefile;  /* Accumulates data for a split. */
918
919     /* Function to call with the accumulated data. */
920     bool (*split_func) (const struct casefile *, void *);
921     void *func_aux;                            /* Auxiliary data. */ 
922   };
923
924 static bool multipass_split_callback (struct ccase *c, void *aux_);
925 static void multipass_split_output (struct multipass_split_aux_data *);
926
927 /* Returns true if successful, false if an I/O error occurred. */
928 bool
929 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
930                                                      void *),
931                                  void *func_aux) 
932 {
933   struct multipass_split_aux_data aux;
934   bool ok;
935
936   assert (split_func != NULL);
937
938   open_active_file ();
939
940   case_nullify (&aux.prev_case);
941   aux.casefile = NULL;
942   aux.split_func = split_func;
943   aux.func_aux = func_aux;
944
945   ok = internal_procedure (multipass_split_callback, &aux);
946   if (aux.casefile != NULL)
947     multipass_split_output (&aux);
948   case_destroy (&aux.prev_case);
949
950   if (!close_active_file ())
951     ok = false;
952
953   return ok;
954 }
955
956 /* procedure() callback used by multipass_procedure_with_splits(). */
957 static bool
958 multipass_split_callback (struct ccase *c, void *aux_)
959 {
960   struct multipass_split_aux_data *aux = aux_;
961
962   /* Start a new series if needed. */
963   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
964     {
965       /* Pass any cases to split_func. */
966       if (aux->casefile != NULL)
967         multipass_split_output (aux);
968
969       /* Start a new casefile. */
970       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
971
972       /* Record split values. */
973       dump_splits (c);
974       case_destroy (&aux->prev_case);
975       case_clone (&aux->prev_case, c);
976     }
977
978   return casefile_append (aux->casefile, c);
979 }
980
981 static void
982 multipass_split_output (struct multipass_split_aux_data *aux)
983 {
984   assert (aux->casefile != NULL);
985   aux->split_func (aux->casefile, aux->func_aux);
986   casefile_destroy (aux->casefile);
987   aux->casefile = NULL;
988 }
989
990
991 /* Discards all the current state in preparation for a data-input
992    command like DATA LIST or GET. */
993 void
994 discard_variables (void)
995 {
996   dict_clear (default_dict);
997   fh_set_default_handle (NULL);
998
999   n_lag = 0;
1000   
1001   if (vfm_source != NULL)
1002     {
1003       free_case_source (vfm_source);
1004       vfm_source = NULL;
1005     }
1006
1007   cancel_transformations ();
1008
1009   ctl_stack_clear ();
1010
1011   expr_free (process_if_expr);
1012   process_if_expr = NULL;
1013
1014   cancel_temporary ();
1015
1016   pgm_state = STATE_INIT;
1017 }