Improve the way we handle the various parsing "states". Until now
[pspp-builds.git] / src / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <procedure.h>
22 #include <libpspp/message.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <libpspp/alloc.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <language/command.h>
31 #include <data/dictionary.h>
32 #include <language/control/control-stack.h>
33 #include <libpspp/message.h>
34 #include "expressions/public.h"
35 #include <data/file-handle-def.h>
36 #include <libpspp/misc.h>
37 #include <data/settings.h>
38 #include <output/manager.h>
39 #include <output/table.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <data/value-labels.h>
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 /*
48    Virtual File Manager (vfm):
49
50    vfm is used to process data files.  It uses the model that
51    data is read from one stream (the data source), processed,
52    then written to another (the data sink).  The data source is
53    then deleted and the data sink becomes the data source for the
54    next procedure. */
55
56 /* Procedure execution data. */
57 struct write_case_data
58   {
59     /* Function to call for each case. */
60     bool (*proc_func) (struct ccase *, void *); /* Function. */
61     void *aux;                                 /* Auxiliary data. */ 
62
63     struct ccase trns_case;     /* Case used for transformations. */
64     struct ccase sink_case;     /* Case written to sink, if
65                                    compaction is necessary. */
66     size_t cases_written;       /* Cases output so far. */
67     size_t cases_analyzed;      /* Cases passed to procedure so far. */
68   };
69
70 /* The current active file, from which cases are read. */
71 struct case_source *vfm_source;
72
73 /* The replacement active file, to which cases are written. */
74 struct case_sink *vfm_sink;
75
76 /* The compactor used to compact a compact, if necessary;
77    otherwise a null pointer. */
78 static struct dict_compactor *compactor;
79
80 /* Time at which vfm was last invoked. */
81 static time_t last_vfm_invocation;
82
83 /* Lag queue. */
84 int n_lag;                      /* Number of cases to lag. */
85 static int lag_count;           /* Number of cases in lag_queue so far. */
86 static int lag_head;            /* Index where next case will be added. */
87 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
88
89 /* Active transformations. */
90 struct transformation *t_trns;
91 size_t n_trns, m_trns, f_trns;
92
93 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
94                                 void *aux);
95 static void update_last_vfm_invocation (void);
96 static void create_trns_case (struct ccase *, struct dictionary *);
97 static void open_active_file (void);
98 static bool write_case (struct write_case_data *wc_data);
99 static int execute_transformations (struct ccase *c,
100                                     struct transformation *trns,
101                                     int first_idx, int last_idx,
102                                     int case_num);
103 static int filter_case (const struct ccase *c, int case_num);
104 static void lag_case (const struct ccase *c);
105 static void clear_case (struct ccase *c);
106 static bool close_active_file (void);
107 \f
108 /* Public functions. */
109
110 /* Returns the last time the data was read. */
111 time_t
112 vfm_last_invocation (void) 
113 {
114   if (last_vfm_invocation == 0)
115     update_last_vfm_invocation ();
116   return last_vfm_invocation;
117 }
118
119 /* Reads the data from the input program and writes it to a new
120    active file.  For each case we read from the input program, we
121    do the following
122
123    1. Execute permanent transformations.  If these drop the case,
124       start the next case from step 1.
125
126    2. N OF CASES.  If we have already written N cases, start the
127       next case from step 1.
128    
129    3. Write case to replacement active file.
130    
131    4. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133       
134    5. FILTER, PROCESS IF.  If these drop the case, start the next
135       case from step 1.
136    
137    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
138       cases, start the next case from step 1.
139       
140    7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
141
142    Returns true if successful, false if an I/O error occurred. */
143 bool
144 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
145 {
146   if (proc_func == NULL
147       && case_source_is_class (vfm_source, &storage_source_class)
148       && vfm_sink == NULL
149       && !temporary
150       && n_trns == 0)
151     {
152       /* Nothing to do. */
153       update_last_vfm_invocation ();
154       return true;
155     }
156   else 
157     {
158       bool ok;
159       
160       open_active_file ();
161       ok = internal_procedure (proc_func, aux);
162       if (!close_active_file ())
163         ok = false;
164
165       return ok;
166     }
167 }
168
169 /* Executes a procedure, as procedure(), except that the caller
170    is responsible for calling open_active_file() and
171    close_active_file().
172    Returns true if successful, false if an I/O error occurred. */
173 static bool
174 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
175 {
176   static int recursive_call;
177   struct write_case_data wc_data;
178   bool ok;
179
180   assert (++recursive_call == 1);
181
182   wc_data.proc_func = proc_func;
183   wc_data.aux = aux;
184   create_trns_case (&wc_data.trns_case, default_dict);
185   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
186   wc_data.cases_written = 0;
187
188   update_last_vfm_invocation ();
189
190   ok = (vfm_source == NULL
191         || vfm_source->class->read (vfm_source,
192                                     &wc_data.trns_case,
193                                     write_case, &wc_data));
194
195   case_destroy (&wc_data.sink_case);
196   case_destroy (&wc_data.trns_case);
197
198   assert (--recursive_call == 0);
199
200   return ok;
201 }
202
203 /* Updates last_vfm_invocation. */
204 static void
205 update_last_vfm_invocation (void) 
206 {
207   last_vfm_invocation = time (NULL);
208 }
209
210 /* Creates and returns a case, initializing it from the vectors
211    that say which `value's need to be initialized just once, and
212    which ones need to be re-initialized before every case. */
213 static void
214 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
215 {
216   size_t var_cnt = dict_get_var_cnt (dict);
217   size_t i;
218
219   case_create (trns_case, dict_get_next_value_idx (dict));
220   for (i = 0; i < var_cnt; i++) 
221     {
222       struct variable *v = dict_get_var (dict, i);
223       union value *value = case_data_rw (trns_case, v->fv);
224
225       if (v->type == NUMERIC)
226         value->f = v->reinit ? 0.0 : SYSMIS;
227       else
228         memset (value->s, ' ', v->width);
229     }
230 }
231
232 /* Makes all preparations for reading from the data source and writing
233    to the data sink. */
234 static void
235 open_active_file (void)
236 {
237   /* Make temp_dict refer to the dictionary right before data
238      reaches the sink */
239   if (!temporary)
240     {
241       temp_trns = n_trns;
242       temp_dict = default_dict;
243     }
244
245   /* Figure out compaction. */
246   compactor = (dict_needs_compaction (temp_dict)
247                ? dict_make_compactor (temp_dict)
248                : NULL);
249
250   /* Prepare sink. */
251   if (vfm_sink == NULL)
252     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
253   if (vfm_sink->class->open != NULL)
254     vfm_sink->class->open (vfm_sink);
255
256   /* Allocate memory for lag queue. */
257   if (n_lag > 0)
258     {
259       int i;
260   
261       lag_count = 0;
262       lag_head = 0;
263       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
264       for (i = 0; i < n_lag; i++)
265         case_nullify (&lag_queue[i]);
266     }
267
268   /* Close any unclosed DO IF or LOOP constructs. */
269   ctl_stack_clear ();
270 }
271
272 /* Transforms trns_case and writes it to the replacement active
273    file if advisable.  Returns nonzero if more cases can be
274    accepted, zero otherwise.  Do not call this function again
275    after it has returned zero once.  */
276 static bool
277 write_case (struct write_case_data *wc_data)
278 {
279   int retval;
280   
281   /* Execute permanent transformations.  */
282   retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
283                                     temp_trns, wc_data->cases_written + 1);
284   if (retval != 1)
285     goto done;
286
287   /* N OF CASES. */
288   if (dict_get_case_limit (default_dict)
289       && wc_data->cases_written >= dict_get_case_limit (default_dict))
290     goto done;
291   wc_data->cases_written++;
292
293   /* Write case to LAG queue. */
294   if (n_lag)
295     lag_case (&wc_data->trns_case);
296
297   /* Write case to replacement active file. */
298   if (vfm_sink->class->write != NULL) 
299     {
300       if (compactor != NULL) 
301         {
302           dict_compactor_compact (compactor, &wc_data->sink_case,
303                                   &wc_data->trns_case);
304           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
305         }
306       else
307         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
308     }
309   
310   /* Execute temporary transformations. */
311   retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
312                                     n_trns, wc_data->cases_written);
313   if (retval != 1)
314     goto done;
315   
316   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
317   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
318       || (dict_get_case_limit (temp_dict)
319           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
320     goto done;
321   wc_data->cases_analyzed++;
322
323   /* Pass case to procedure. */
324   if (wc_data->proc_func != NULL)
325     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
326       retval = -1;
327
328  done:
329   clear_case (&wc_data->trns_case);
330   return retval != -1;
331 }
332
333 /* Transforms case C using the transformations in TRNS[] with
334    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
335    become case CASE_NUM (1-based) in the output file.  Returns 1
336    if the case was successfully transformed, 0 if it was filtered
337    out by one of the transformations, or -1 if the procedure
338    should be abandoned due to a fatal error. */
339 static int
340 execute_transformations (struct ccase *c,
341                          struct transformation *trns,
342                          int first_idx, int last_idx,
343                          int case_num) 
344 {
345   int idx;
346
347   for (idx = first_idx; idx != last_idx; )
348     {
349       struct transformation *t = &trns[idx];
350       int retval = t->proc (t->private, c, case_num);
351       switch (retval)
352         {
353         case TRNS_CONTINUE:
354           idx++;
355           break;
356           
357         case TRNS_DROP_CASE:
358           return 0;
359
360         case TRNS_ERROR:
361           return -1;
362
363         case TRNS_NEXT_CASE:
364           abort ();
365
366         case TRNS_END_FILE:
367           abort ();
368           
369         default:
370           idx = retval;
371           break;
372         }
373     }
374
375   return 1;
376 }
377
378 /* Returns nonzero if case C with case number CASE_NUM should be
379    exclude as specified on FILTER or PROCESS IF, otherwise
380    zero. */
381 static int
382 filter_case (const struct ccase *c, int case_idx)
383 {
384   /* FILTER. */
385   struct variable *filter_var = dict_get_filter (default_dict);
386   if (filter_var != NULL) 
387     {
388       double f = case_num (c, filter_var->fv);
389       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
390         return 1;
391     }
392
393   /* PROCESS IF. */
394   if (process_if_expr != NULL
395       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
396     return 1;
397
398   return 0;
399 }
400
401 /* Add C to the lag queue. */
402 static void
403 lag_case (const struct ccase *c)
404 {
405   if (lag_count < n_lag)
406     lag_count++;
407   case_destroy (&lag_queue[lag_head]);
408   case_clone (&lag_queue[lag_head], c);
409   if (++lag_head >= n_lag)
410     lag_head = 0;
411 }
412
413 /* Clears the variables in C that need to be cleared between
414    processing cases.  */
415 static void
416 clear_case (struct ccase *c)
417 {
418   size_t var_cnt = dict_get_var_cnt (default_dict);
419   size_t i;
420   
421   for (i = 0; i < var_cnt; i++) 
422     {
423       struct variable *v = dict_get_var (default_dict, i);
424       if (v->init && v->reinit) 
425         {
426           if (v->type == NUMERIC)
427             case_data_rw (c, v->fv)->f = SYSMIS;
428           else
429             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
430         } 
431     }
432 }
433
434 /* Closes the active file. */
435 static bool
436 close_active_file (void)
437 {
438   /* Free memory for lag queue, and turn off lagging. */
439   if (n_lag > 0)
440     {
441       int i;
442       
443       for (i = 0; i < n_lag; i++)
444         case_destroy (&lag_queue[i]);
445       free (lag_queue);
446       n_lag = 0;
447     }
448   
449   /* Dictionary from before TEMPORARY becomes permanent.. */
450   if (temporary)
451     {
452       dict_destroy (default_dict);
453       default_dict = temp_dict;
454       temp_dict = NULL;
455     }
456
457   /* Finish compaction. */
458   if (compactor != NULL) 
459     {
460       dict_compactor_destroy (compactor);
461       dict_compact_values (default_dict); 
462     }
463     
464   /* Free data source. */
465   free_case_source (vfm_source);
466   vfm_source = NULL;
467
468   /* Old data sink becomes new data source. */
469   if (vfm_sink->class->make_source != NULL)
470     vfm_source = vfm_sink->class->make_source (vfm_sink);
471   free_case_sink (vfm_sink);
472   vfm_sink = NULL;
473
474   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
475      and get rid of all the transformations. */
476   cancel_temporary ();
477   expr_free (process_if_expr);
478   process_if_expr = NULL;
479   dict_set_case_limit (default_dict, 0);
480   dict_clear_vectors (default_dict);
481   return cancel_transformations ();
482 }
483 \f
484 /* Storage case stream. */
485
486 /* Information about storage sink or source. */
487 struct storage_stream_info 
488   {
489     struct casefile *casefile;  /* Storage. */
490   };
491
492 /* Initializes a storage sink. */
493 static void
494 storage_sink_open (struct case_sink *sink)
495 {
496   struct storage_stream_info *info;
497
498   sink->aux = info = xmalloc (sizeof *info);
499   info->casefile = casefile_create (sink->value_cnt);
500 }
501
502 /* Destroys storage stream represented by INFO. */
503 static void
504 destroy_storage_stream_info (struct storage_stream_info *info) 
505 {
506   if (info != NULL) 
507     {
508       casefile_destroy (info->casefile);
509       free (info); 
510     }
511 }
512
513 /* Writes case C to the storage sink SINK.
514    Returns true if successful, false if an I/O error occurred. */
515 static bool
516 storage_sink_write (struct case_sink *sink, const struct ccase *c)
517 {
518   struct storage_stream_info *info = sink->aux;
519
520   return casefile_append (info->casefile, c);
521 }
522
523 /* Destroys internal data in SINK. */
524 static void
525 storage_sink_destroy (struct case_sink *sink)
526 {
527   destroy_storage_stream_info (sink->aux);
528 }
529
530 /* Closes the sink and returns a storage source to read back the
531    written data. */
532 static struct case_source *
533 storage_sink_make_source (struct case_sink *sink) 
534 {
535   struct case_source *source
536     = create_case_source (&storage_source_class, sink->aux);
537   sink->aux = NULL;
538   return source;
539 }
540
541 /* Storage sink. */
542 const struct case_sink_class storage_sink_class = 
543   {
544     "storage",
545     storage_sink_open,
546     storage_sink_write,
547     storage_sink_destroy,
548     storage_sink_make_source,
549   };
550 \f
551 /* Storage source. */
552
553 /* Returns the number of cases that will be read by
554    storage_source_read(). */
555 static int
556 storage_source_count (const struct case_source *source) 
557 {
558   struct storage_stream_info *info = source->aux;
559
560   return casefile_get_case_cnt (info->casefile);
561 }
562
563 /* Reads all cases from the storage source and passes them one by one to
564    write_case(). */
565 static bool
566 storage_source_read (struct case_source *source,
567                      struct ccase *output_case,
568                      write_case_func *write_case, write_case_data wc_data)
569 {
570   struct storage_stream_info *info = source->aux;
571   struct ccase casefile_case;
572   struct casereader *reader;
573   bool ok = true;
574
575   for (reader = casefile_get_reader (info->casefile);
576        ok && casereader_read (reader, &casefile_case);
577        case_destroy (&casefile_case))
578     {
579       case_copy (output_case, 0,
580                  &casefile_case, 0,
581                  casefile_get_value_cnt (info->casefile));
582       ok = write_case (wc_data);
583     }
584   casereader_destroy (reader);
585
586   return ok;
587 }
588
589 /* Destroys the source's internal data. */
590 static void
591 storage_source_destroy (struct case_source *source)
592 {
593   destroy_storage_stream_info (source->aux);
594 }
595
596 /* Storage source. */
597 const struct case_source_class storage_source_class = 
598   {
599     "storage",
600     storage_source_count,
601     storage_source_read,
602     storage_source_destroy,
603   };
604
605 struct casefile *
606 storage_source_get_casefile (struct case_source *source) 
607 {
608   struct storage_stream_info *info = source->aux;
609
610   assert (source->class == &storage_source_class);
611   return info->casefile;
612 }
613
614 struct case_source *
615 storage_source_create (struct casefile *cf)
616 {
617   struct storage_stream_info *info;
618
619   info = xmalloc (sizeof *info);
620   info->casefile = cf;
621
622   return create_case_source (&storage_source_class, info);
623 }
624 \f
625 /* Null sink.  Used by a few procedures that keep track of output
626    themselves and would throw away anything that the sink
627    contained anyway. */
628
629 const struct case_sink_class null_sink_class = 
630   {
631     "null",
632     NULL,
633     NULL,
634     NULL,
635     NULL,
636   };
637 \f
638 /* Returns a pointer to the lagged case from N_BEFORE cases before the
639    current one, or NULL if there haven't been that many cases yet. */
640 struct ccase *
641 lagged_case (int n_before)
642 {
643   assert (n_before >= 1 );
644   assert (n_before <= n_lag);
645
646   if (n_before <= lag_count)
647     {
648       int index = lag_head - n_before;
649       if (index < 0)
650         index += n_lag;
651       return &lag_queue[index];
652     }
653   else
654     return NULL;
655 }
656    
657 /* Appends TRNS to t_trns[], the list of all transformations to be
658    performed on data as it is read from the active file. */
659 void
660 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
661 {
662   struct transformation *trns;
663   if (n_trns >= m_trns)
664     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
665   trns = &t_trns[n_trns++];
666   trns->proc = proc;
667   trns->free = free;
668   trns->private = private;
669 }
670
671 /* Returns the index number that the next transformation added by
672    add_transformation() will receive.  A trns_proc_func that
673    returns this index causes control flow to jump to it. */
674 size_t
675 next_transformation (void) 
676 {
677   return n_trns;
678 }
679
680 /* Cancels all active transformations, including any transformations
681    created by the input program.
682    Returns true if successful, false if an I/O error occurred. */
683 bool
684 cancel_transformations (void)
685 {
686   bool ok = true;
687   size_t i;
688   for (i = 0; i < n_trns; i++)
689     {
690       struct transformation *t = &t_trns[i];
691       if (t->free != NULL) 
692         {
693           if (!t->free (t->private))
694             ok = false; 
695         }
696     }
697   n_trns = f_trns = 0;
698   free (t_trns);
699   t_trns = NULL;
700   m_trns = 0;
701   return ok;
702 }
703 \f
704 /* Creates a case source with class CLASS and auxiliary data AUX
705    and based on dictionary DICT. */
706 struct case_source *
707 create_case_source (const struct case_source_class *class,
708                     void *aux) 
709 {
710   struct case_source *source = xmalloc (sizeof *source);
711   source->class = class;
712   source->aux = aux;
713   return source;
714 }
715
716 /* Destroys case source SOURCE.  It is the caller's responsible to
717    call the source's destroy function, if any. */
718 void
719 free_case_source (struct case_source *source) 
720 {
721   if (source != NULL) 
722     {
723       if (source->class->destroy != NULL)
724         source->class->destroy (source);
725       free (source);
726     }
727 }
728
729 /* Returns nonzero if CLASS is the class of SOURCE. */
730 int
731 case_source_is_class (const struct case_source *source,
732                       const struct case_source_class *class) 
733 {
734   return source != NULL && source->class == class;
735 }
736
737 /* Creates a case sink to accept cases from the given DICT with
738    class CLASS and auxiliary data AUX. */
739 struct case_sink *
740 create_case_sink (const struct case_sink_class *class,
741                   const struct dictionary *dict,
742                   void *aux) 
743 {
744   struct case_sink *sink = xmalloc (sizeof *sink);
745   sink->class = class;
746   sink->value_cnt = dict_get_compacted_value_cnt (dict);
747   sink->aux = aux;
748   return sink;
749 }
750
751 /* Destroys case sink SINK.  */
752 void
753 free_case_sink (struct case_sink *sink) 
754 {
755   if (sink != NULL) 
756     {
757       if (sink->class->destroy != NULL)
758         sink->class->destroy (sink);
759       free (sink); 
760     }
761 }
762 \f
763 /* Represents auxiliary data for handling SPLIT FILE. */
764 struct split_aux_data 
765   {
766     size_t case_count;          /* Number of cases so far. */
767     struct ccase prev_case;     /* Data in previous case. */
768
769     /* Functions to call... */
770     void (*begin_func) (void *);               /* ...before data. */
771     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
772     void (*end_func) (void *);                 /* ...after data. */
773     void *func_aux;                            /* Auxiliary data. */ 
774   };
775
776 static int equal_splits (const struct ccase *, const struct ccase *);
777 static bool procedure_with_splits_callback (struct ccase *, void *);
778 static void dump_splits (struct ccase *);
779
780 /* Like procedure(), but it automatically breaks the case stream
781    into SPLIT FILE break groups.  Before each group of cases with
782    identical SPLIT FILE variable values, BEGIN_FUNC is called.
783    Then PROC_FUNC is called with each case in the group.  
784    END_FUNC is called when the group is finished.  FUNC_AUX is
785    passed to each of the functions as auxiliary data.
786
787    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
788    and END_FUNC will be called at all. 
789
790    If SPLIT FILE is not in effect, then there is one break group
791    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
792    will be called once.
793    
794    Returns true if successful, false if an I/O error occurred. */
795 bool
796 procedure_with_splits (void (*begin_func) (void *aux),
797                        bool (*proc_func) (struct ccase *, void *aux),
798                        void (*end_func) (void *aux),
799                        void *func_aux) 
800 {
801   struct split_aux_data split_aux;
802   bool ok;
803
804   split_aux.case_count = 0;
805   case_nullify (&split_aux.prev_case);
806   split_aux.begin_func = begin_func;
807   split_aux.proc_func = proc_func;
808   split_aux.end_func = end_func;
809   split_aux.func_aux = func_aux;
810
811   open_active_file ();
812   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
813   if (split_aux.case_count > 0 && end_func != NULL)
814     end_func (func_aux);
815   if (!close_active_file ())
816     ok = false;
817
818   case_destroy (&split_aux.prev_case);
819
820   return ok;
821 }
822
823 /* procedure() callback used by procedure_with_splits(). */
824 static bool
825 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
826 {
827   struct split_aux_data *split_aux = split_aux_;
828
829   /* Start a new series if needed. */
830   if (split_aux->case_count == 0
831       || !equal_splits (c, &split_aux->prev_case))
832     {
833       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
834         split_aux->end_func (split_aux->func_aux);
835
836       dump_splits (c);
837       case_destroy (&split_aux->prev_case);
838       case_clone (&split_aux->prev_case, c);
839
840       if (split_aux->begin_func != NULL)
841         split_aux->begin_func (split_aux->func_aux);
842     }
843
844   split_aux->case_count++;
845   if (split_aux->proc_func != NULL)
846     return split_aux->proc_func (c, split_aux->func_aux);
847   else
848     return true;
849 }
850
851 /* Compares the SPLIT FILE variables in cases A and B and returns
852    nonzero only if they differ. */
853 static int
854 equal_splits (const struct ccase *a, const struct ccase *b) 
855 {
856   return case_compare (a, b,
857                        dict_get_split_vars (default_dict),
858                        dict_get_split_cnt (default_dict)) == 0;
859 }
860
861 /* Dumps out the values of all the split variables for the case C. */
862 static void
863 dump_splits (struct ccase *c)
864 {
865   struct variable *const *split;
866   struct tab_table *t;
867   size_t split_cnt;
868   int i;
869
870   split_cnt = dict_get_split_cnt (default_dict);
871   if (split_cnt == 0)
872     return;
873
874   t = tab_create (3, split_cnt + 1, 0);
875   tab_dim (t, tab_natural_dimensions);
876   tab_vline (t, TAL_GAP, 1, 0, split_cnt);
877   tab_vline (t, TAL_GAP, 2, 0, split_cnt);
878   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
879   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
880   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
881   split = dict_get_split_vars (default_dict);
882   for (i = 0; i < split_cnt; i++)
883     {
884       struct variable *v = split[i];
885       char temp_buf[80];
886       const char *val_lab;
887
888       assert (v->type == NUMERIC || v->type == ALPHA);
889       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
890       
891       data_out (temp_buf, &v->print, case_data (c, v->fv));
892       
893       temp_buf[v->print.w] = 0;
894       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
895
896       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
897       if (val_lab)
898         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
899     }
900   tab_flags (t, SOMF_NO_TITLE);
901   tab_submit (t);
902 }
903 \f
904 /* Represents auxiliary data for handling SPLIT FILE in a
905    multipass procedure. */
906 struct multipass_split_aux_data 
907   {
908     struct ccase prev_case;     /* Data in previous case. */
909     struct casefile *casefile;  /* Accumulates data for a split. */
910
911     /* Function to call with the accumulated data. */
912     bool (*split_func) (const struct casefile *, void *);
913     void *func_aux;                            /* Auxiliary data. */ 
914   };
915
916 static bool multipass_split_callback (struct ccase *c, void *aux_);
917 static void multipass_split_output (struct multipass_split_aux_data *);
918
919 /* Returns true if successful, false if an I/O error occurred. */
920 bool
921 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
922                                                      void *),
923                                  void *func_aux) 
924 {
925   struct multipass_split_aux_data aux;
926   bool ok;
927
928   assert (split_func != NULL);
929
930   open_active_file ();
931
932   case_nullify (&aux.prev_case);
933   aux.casefile = NULL;
934   aux.split_func = split_func;
935   aux.func_aux = func_aux;
936
937   ok = internal_procedure (multipass_split_callback, &aux);
938   if (aux.casefile != NULL)
939     multipass_split_output (&aux);
940   case_destroy (&aux.prev_case);
941
942   if (!close_active_file ())
943     ok = false;
944
945   return ok;
946 }
947
948 /* procedure() callback used by multipass_procedure_with_splits(). */
949 static bool
950 multipass_split_callback (struct ccase *c, void *aux_)
951 {
952   struct multipass_split_aux_data *aux = aux_;
953
954   /* Start a new series if needed. */
955   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
956     {
957       /* Pass any cases to split_func. */
958       if (aux->casefile != NULL)
959         multipass_split_output (aux);
960
961       /* Start a new casefile. */
962       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
963
964       /* Record split values. */
965       dump_splits (c);
966       case_destroy (&aux->prev_case);
967       case_clone (&aux->prev_case, c);
968     }
969
970   return casefile_append (aux->casefile, c);
971 }
972
973 static void
974 multipass_split_output (struct multipass_split_aux_data *aux)
975 {
976   assert (aux->casefile != NULL);
977   aux->split_func (aux->casefile, aux->func_aux);
978   casefile_destroy (aux->casefile);
979   aux->casefile = NULL;
980 }
981
982
983 /* Discards all the current state in preparation for a data-input
984    command like DATA LIST or GET. */
985 void
986 discard_variables (void)
987 {
988   dict_clear (default_dict);
989   fh_set_default_handle (NULL);
990
991   n_lag = 0;
992   
993   if (vfm_source != NULL)
994     {
995       free_case_source (vfm_source);
996       vfm_source = NULL;
997     }
998
999   cancel_transformations ();
1000
1001   ctl_stack_clear ();
1002
1003   expr_free (process_if_expr);
1004   process_if_expr = NULL;
1005
1006   cancel_temporary ();
1007 }