c6209af266451267a8de102df4131b22f40c40b5
[pspp-builds.git] / src / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <procedure.h>
22 #include <libpspp/message.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #if HAVE_UNISTD_H
27 #include <unistd.h>     /* Required by SunOS4. */
28 #endif
29 #include <libpspp/alloc.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <language/command.h>
33 #include <data/dictionary.h>
34 #include <language/control/control-stack.h>
35 #include <libpspp/message.h>
36 #include "expressions/public.h"
37 #include <data/file-handle-def.h>
38 #include <libpspp/misc.h>
39 #include <data/settings.h>
40 #include <output/manager.h>
41 #include <output/table.h>
42 #include <libpspp/str.h>
43 #include <data/variable.h>
44 #include <data/value-labels.h>
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 /*
50    Virtual File Manager (vfm):
51
52    vfm is used to process data files.  It uses the model that
53    data is read from one stream (the data source), processed,
54    then written to another (the data sink).  The data source is
55    then deleted and the data sink becomes the data source for the
56    next procedure. */
57
58 /* Procedure execution data. */
59 struct write_case_data
60   {
61     /* Function to call for each case. */
62     bool (*proc_func) (struct ccase *, void *); /* Function. */
63     void *aux;                                 /* Auxiliary data. */ 
64
65     struct ccase trns_case;     /* Case used for transformations. */
66     struct ccase sink_case;     /* Case written to sink, if
67                                    compaction is necessary. */
68     size_t cases_written;       /* Cases output so far. */
69     size_t cases_analyzed;      /* Cases passed to procedure so far. */
70   };
71
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
74
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
77
78 /* The compactor used to compact a compact, if necessary;
79    otherwise a null pointer. */
80 static struct dict_compactor *compactor;
81
82 /* Time at which vfm was last invoked. */
83 static time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
90
91 /* Active transformations. */
92 struct transformation *t_trns;
93 size_t n_trns, m_trns, f_trns;
94
95 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
96                                 void *aux);
97 static void update_last_vfm_invocation (void);
98 static void create_trns_case (struct ccase *, struct dictionary *);
99 static void open_active_file (void);
100 static bool write_case (struct write_case_data *wc_data);
101 static int execute_transformations (struct ccase *c,
102                                     struct transformation *trns,
103                                     int first_idx, int last_idx,
104                                     int case_num);
105 static int filter_case (const struct ccase *c, int case_num);
106 static void lag_case (const struct ccase *c);
107 static void clear_case (struct ccase *c);
108 static bool close_active_file (void);
109 \f
110 /* Public functions. */
111
112 /* Returns the last time the data was read. */
113 time_t
114 vfm_last_invocation (void) 
115 {
116   if (last_vfm_invocation == 0)
117     update_last_vfm_invocation ();
118   return last_vfm_invocation;
119 }
120
121 /* Reads the data from the input program and writes it to a new
122    active file.  For each case we read from the input program, we
123    do the following
124
125    1. Execute permanent transformations.  If these drop the case,
126       start the next case from step 1.
127
128    2. N OF CASES.  If we have already written N cases, start the
129       next case from step 1.
130    
131    3. Write case to replacement active file.
132    
133    4. Execute temporary transformations.  If these drop the case,
134       start the next case from step 1.
135       
136    5. FILTER, PROCESS IF.  If these drop the case, start the next
137       case from step 1.
138    
139    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
140       cases, start the next case from step 1.
141       
142    7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
143
144    Returns true if successful, false if an I/O error occurred. */
145 bool
146 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
147 {
148   if (proc_func == NULL
149       && case_source_is_class (vfm_source, &storage_source_class)
150       && vfm_sink == NULL
151       && !temporary
152       && n_trns == 0)
153     {
154       /* Nothing to do. */
155       update_last_vfm_invocation ();
156       return true;
157     }
158   else 
159     {
160       bool ok;
161       
162       open_active_file ();
163       ok = internal_procedure (proc_func, aux);
164       if (!close_active_file ())
165         ok = false;
166
167       return ok;
168     }
169 }
170
171 /* Executes a procedure, as procedure(), except that the caller
172    is responsible for calling open_active_file() and
173    close_active_file().
174    Returns true if successful, false if an I/O error occurred. */
175 static bool
176 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
177 {
178   static int recursive_call;
179   struct write_case_data wc_data;
180   bool ok;
181
182   assert (++recursive_call == 1);
183
184   wc_data.proc_func = proc_func;
185   wc_data.aux = aux;
186   create_trns_case (&wc_data.trns_case, default_dict);
187   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
188   wc_data.cases_written = 0;
189
190   update_last_vfm_invocation ();
191
192   ok = (vfm_source == NULL
193         || vfm_source->class->read (vfm_source,
194                                     &wc_data.trns_case,
195                                     write_case, &wc_data));
196
197   case_destroy (&wc_data.sink_case);
198   case_destroy (&wc_data.trns_case);
199
200   assert (--recursive_call == 0);
201
202   return ok;
203 }
204
205 /* Updates last_vfm_invocation. */
206 static void
207 update_last_vfm_invocation (void) 
208 {
209   last_vfm_invocation = time (NULL);
210 }
211
212 /* Creates and returns a case, initializing it from the vectors
213    that say which `value's need to be initialized just once, and
214    which ones need to be re-initialized before every case. */
215 static void
216 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
217 {
218   size_t var_cnt = dict_get_var_cnt (dict);
219   size_t i;
220
221   case_create (trns_case, dict_get_next_value_idx (dict));
222   for (i = 0; i < var_cnt; i++) 
223     {
224       struct variable *v = dict_get_var (dict, i);
225       union value *value = case_data_rw (trns_case, v->fv);
226
227       if (v->type == NUMERIC)
228         value->f = v->reinit ? 0.0 : SYSMIS;
229       else
230         memset (value->s, ' ', v->width);
231     }
232 }
233
234 /* Makes all preparations for reading from the data source and writing
235    to the data sink. */
236 static void
237 open_active_file (void)
238 {
239   /* Make temp_dict refer to the dictionary right before data
240      reaches the sink */
241   if (!temporary)
242     {
243       temp_trns = n_trns;
244       temp_dict = default_dict;
245     }
246
247   /* Figure out compaction. */
248   compactor = (dict_needs_compaction (temp_dict)
249                ? dict_make_compactor (temp_dict)
250                : NULL);
251
252   /* Prepare sink. */
253   if (vfm_sink == NULL)
254     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
255   if (vfm_sink->class->open != NULL)
256     vfm_sink->class->open (vfm_sink);
257
258   /* Allocate memory for lag queue. */
259   if (n_lag > 0)
260     {
261       int i;
262   
263       lag_count = 0;
264       lag_head = 0;
265       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
266       for (i = 0; i < n_lag; i++)
267         case_nullify (&lag_queue[i]);
268     }
269
270   /* Close any unclosed DO IF or LOOP constructs. */
271   ctl_stack_clear ();
272 }
273
274 /* Transforms trns_case and writes it to the replacement active
275    file if advisable.  Returns nonzero if more cases can be
276    accepted, zero otherwise.  Do not call this function again
277    after it has returned zero once.  */
278 static bool
279 write_case (struct write_case_data *wc_data)
280 {
281   int retval;
282   
283   /* Execute permanent transformations.  */
284   retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
285                                     temp_trns, wc_data->cases_written + 1);
286   if (retval != 1)
287     goto done;
288
289   /* N OF CASES. */
290   if (dict_get_case_limit (default_dict)
291       && wc_data->cases_written >= dict_get_case_limit (default_dict))
292     goto done;
293   wc_data->cases_written++;
294
295   /* Write case to LAG queue. */
296   if (n_lag)
297     lag_case (&wc_data->trns_case);
298
299   /* Write case to replacement active file. */
300   if (vfm_sink->class->write != NULL) 
301     {
302       if (compactor != NULL) 
303         {
304           dict_compactor_compact (compactor, &wc_data->sink_case,
305                                   &wc_data->trns_case);
306           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
307         }
308       else
309         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
310     }
311   
312   /* Execute temporary transformations. */
313   retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
314                                     n_trns, wc_data->cases_written);
315   if (retval != 1)
316     goto done;
317   
318   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
319   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
320       || (dict_get_case_limit (temp_dict)
321           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
322     goto done;
323   wc_data->cases_analyzed++;
324
325   /* Pass case to procedure. */
326   if (wc_data->proc_func != NULL)
327     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
328       retval = -1;
329
330  done:
331   clear_case (&wc_data->trns_case);
332   return retval != -1;
333 }
334
335 /* Transforms case C using the transformations in TRNS[] with
336    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
337    become case CASE_NUM (1-based) in the output file.  Returns 1
338    if the case was successfully transformed, 0 if it was filtered
339    out by one of the transformations, or -1 if the procedure
340    should be abandoned due to a fatal error. */
341 static int
342 execute_transformations (struct ccase *c,
343                          struct transformation *trns,
344                          int first_idx, int last_idx,
345                          int case_num) 
346 {
347   int idx;
348
349   for (idx = first_idx; idx != last_idx; )
350     {
351       struct transformation *t = &trns[idx];
352       int retval = t->proc (t->private, c, case_num);
353       switch (retval)
354         {
355         case TRNS_CONTINUE:
356           idx++;
357           break;
358           
359         case TRNS_DROP_CASE:
360           return 0;
361
362         case TRNS_ERROR:
363           return -1;
364
365         case TRNS_NEXT_CASE:
366           abort ();
367
368         case TRNS_END_FILE:
369           abort ();
370           
371         default:
372           idx = retval;
373           break;
374         }
375     }
376
377   return 1;
378 }
379
380 /* Returns nonzero if case C with case number CASE_NUM should be
381    exclude as specified on FILTER or PROCESS IF, otherwise
382    zero. */
383 static int
384 filter_case (const struct ccase *c, int case_idx)
385 {
386   /* FILTER. */
387   struct variable *filter_var = dict_get_filter (default_dict);
388   if (filter_var != NULL) 
389     {
390       double f = case_num (c, filter_var->fv);
391       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
392         return 1;
393     }
394
395   /* PROCESS IF. */
396   if (process_if_expr != NULL
397       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
398     return 1;
399
400   return 0;
401 }
402
403 /* Add C to the lag queue. */
404 static void
405 lag_case (const struct ccase *c)
406 {
407   if (lag_count < n_lag)
408     lag_count++;
409   case_destroy (&lag_queue[lag_head]);
410   case_clone (&lag_queue[lag_head], c);
411   if (++lag_head >= n_lag)
412     lag_head = 0;
413 }
414
415 /* Clears the variables in C that need to be cleared between
416    processing cases.  */
417 static void
418 clear_case (struct ccase *c)
419 {
420   size_t var_cnt = dict_get_var_cnt (default_dict);
421   size_t i;
422   
423   for (i = 0; i < var_cnt; i++) 
424     {
425       struct variable *v = dict_get_var (default_dict, i);
426       if (v->init && v->reinit) 
427         {
428           if (v->type == NUMERIC)
429             case_data_rw (c, v->fv)->f = SYSMIS;
430           else
431             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
432         } 
433     }
434 }
435
436 /* Closes the active file. */
437 static bool
438 close_active_file (void)
439 {
440   /* Free memory for lag queue, and turn off lagging. */
441   if (n_lag > 0)
442     {
443       int i;
444       
445       for (i = 0; i < n_lag; i++)
446         case_destroy (&lag_queue[i]);
447       free (lag_queue);
448       n_lag = 0;
449     }
450   
451   /* Dictionary from before TEMPORARY becomes permanent.. */
452   if (temporary)
453     {
454       dict_destroy (default_dict);
455       default_dict = temp_dict;
456       temp_dict = NULL;
457     }
458
459   /* Finish compaction. */
460   if (compactor != NULL) 
461     {
462       dict_compactor_destroy (compactor);
463       dict_compact_values (default_dict); 
464     }
465     
466   /* Free data source. */
467   free_case_source (vfm_source);
468   vfm_source = NULL;
469
470   /* Old data sink becomes new data source. */
471   if (vfm_sink->class->make_source != NULL)
472     vfm_source = vfm_sink->class->make_source (vfm_sink);
473   free_case_sink (vfm_sink);
474   vfm_sink = NULL;
475
476   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
477      and get rid of all the transformations. */
478   cancel_temporary ();
479   expr_free (process_if_expr);
480   process_if_expr = NULL;
481   dict_set_case_limit (default_dict, 0);
482   dict_clear_vectors (default_dict);
483   return cancel_transformations ();
484 }
485 \f
486 /* Storage case stream. */
487
488 /* Information about storage sink or source. */
489 struct storage_stream_info 
490   {
491     struct casefile *casefile;  /* Storage. */
492   };
493
494 /* Initializes a storage sink. */
495 static void
496 storage_sink_open (struct case_sink *sink)
497 {
498   struct storage_stream_info *info;
499
500   sink->aux = info = xmalloc (sizeof *info);
501   info->casefile = casefile_create (sink->value_cnt);
502 }
503
504 /* Destroys storage stream represented by INFO. */
505 static void
506 destroy_storage_stream_info (struct storage_stream_info *info) 
507 {
508   if (info != NULL) 
509     {
510       casefile_destroy (info->casefile);
511       free (info); 
512     }
513 }
514
515 /* Writes case C to the storage sink SINK.
516    Returns true if successful, false if an I/O error occurred. */
517 static bool
518 storage_sink_write (struct case_sink *sink, const struct ccase *c)
519 {
520   struct storage_stream_info *info = sink->aux;
521
522   return casefile_append (info->casefile, c);
523 }
524
525 /* Destroys internal data in SINK. */
526 static void
527 storage_sink_destroy (struct case_sink *sink)
528 {
529   destroy_storage_stream_info (sink->aux);
530 }
531
532 /* Closes the sink and returns a storage source to read back the
533    written data. */
534 static struct case_source *
535 storage_sink_make_source (struct case_sink *sink) 
536 {
537   struct case_source *source
538     = create_case_source (&storage_source_class, sink->aux);
539   sink->aux = NULL;
540   return source;
541 }
542
543 /* Storage sink. */
544 const struct case_sink_class storage_sink_class = 
545   {
546     "storage",
547     storage_sink_open,
548     storage_sink_write,
549     storage_sink_destroy,
550     storage_sink_make_source,
551   };
552 \f
553 /* Storage source. */
554
555 /* Returns the number of cases that will be read by
556    storage_source_read(). */
557 static int
558 storage_source_count (const struct case_source *source) 
559 {
560   struct storage_stream_info *info = source->aux;
561
562   return casefile_get_case_cnt (info->casefile);
563 }
564
565 /* Reads all cases from the storage source and passes them one by one to
566    write_case(). */
567 static bool
568 storage_source_read (struct case_source *source,
569                      struct ccase *output_case,
570                      write_case_func *write_case, write_case_data wc_data)
571 {
572   struct storage_stream_info *info = source->aux;
573   struct ccase casefile_case;
574   struct casereader *reader;
575   bool ok = true;
576
577   for (reader = casefile_get_reader (info->casefile);
578        ok && casereader_read (reader, &casefile_case);
579        case_destroy (&casefile_case))
580     {
581       case_copy (output_case, 0,
582                  &casefile_case, 0,
583                  casefile_get_value_cnt (info->casefile));
584       ok = write_case (wc_data);
585     }
586   casereader_destroy (reader);
587
588   return ok;
589 }
590
591 /* Destroys the source's internal data. */
592 static void
593 storage_source_destroy (struct case_source *source)
594 {
595   destroy_storage_stream_info (source->aux);
596 }
597
598 /* Storage source. */
599 const struct case_source_class storage_source_class = 
600   {
601     "storage",
602     storage_source_count,
603     storage_source_read,
604     storage_source_destroy,
605   };
606
607 struct casefile *
608 storage_source_get_casefile (struct case_source *source) 
609 {
610   struct storage_stream_info *info = source->aux;
611
612   assert (source->class == &storage_source_class);
613   return info->casefile;
614 }
615
616 struct case_source *
617 storage_source_create (struct casefile *cf)
618 {
619   struct storage_stream_info *info;
620
621   info = xmalloc (sizeof *info);
622   info->casefile = cf;
623
624   return create_case_source (&storage_source_class, info);
625 }
626 \f
627 /* Null sink.  Used by a few procedures that keep track of output
628    themselves and would throw away anything that the sink
629    contained anyway. */
630
631 const struct case_sink_class null_sink_class = 
632   {
633     "null",
634     NULL,
635     NULL,
636     NULL,
637     NULL,
638   };
639 \f
640 /* Returns a pointer to the lagged case from N_BEFORE cases before the
641    current one, or NULL if there haven't been that many cases yet. */
642 struct ccase *
643 lagged_case (int n_before)
644 {
645   assert (n_before >= 1 );
646   assert (n_before <= n_lag);
647
648   if (n_before <= lag_count)
649     {
650       int index = lag_head - n_before;
651       if (index < 0)
652         index += n_lag;
653       return &lag_queue[index];
654     }
655   else
656     return NULL;
657 }
658    
659 /* Appends TRNS to t_trns[], the list of all transformations to be
660    performed on data as it is read from the active file. */
661 void
662 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
663 {
664   struct transformation *trns;
665   if (n_trns >= m_trns)
666     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
667   trns = &t_trns[n_trns++];
668   trns->proc = proc;
669   trns->free = free;
670   trns->private = private;
671 }
672
673 /* Returns the index number that the next transformation added by
674    add_transformation() will receive.  A trns_proc_func that
675    returns this index causes control flow to jump to it. */
676 size_t
677 next_transformation (void) 
678 {
679   return n_trns;
680 }
681
682 /* Cancels all active transformations, including any transformations
683    created by the input program.
684    Returns true if successful, false if an I/O error occurred. */
685 bool
686 cancel_transformations (void)
687 {
688   bool ok = true;
689   size_t i;
690   for (i = 0; i < n_trns; i++)
691     {
692       struct transformation *t = &t_trns[i];
693       if (t->free != NULL) 
694         {
695           if (!t->free (t->private))
696             ok = false; 
697         }
698     }
699   n_trns = f_trns = 0;
700   free (t_trns);
701   t_trns = NULL;
702   m_trns = 0;
703   return ok;
704 }
705 \f
706 /* Creates a case source with class CLASS and auxiliary data AUX
707    and based on dictionary DICT. */
708 struct case_source *
709 create_case_source (const struct case_source_class *class,
710                     void *aux) 
711 {
712   struct case_source *source = xmalloc (sizeof *source);
713   source->class = class;
714   source->aux = aux;
715   return source;
716 }
717
718 /* Destroys case source SOURCE.  It is the caller's responsible to
719    call the source's destroy function, if any. */
720 void
721 free_case_source (struct case_source *source) 
722 {
723   if (source != NULL) 
724     {
725       if (source->class->destroy != NULL)
726         source->class->destroy (source);
727       free (source);
728     }
729 }
730
731 /* Returns nonzero if a case source is "complex". */
732 int
733 case_source_is_complex (const struct case_source *source) 
734 {
735   return source != NULL && (source->class == &input_program_source_class
736                             || source->class == &file_type_source_class);
737 }
738
739 /* Returns nonzero if CLASS is the class of SOURCE. */
740 int
741 case_source_is_class (const struct case_source *source,
742                       const struct case_source_class *class) 
743 {
744   return source != NULL && source->class == class;
745 }
746
747 /* Creates a case sink to accept cases from the given DICT with
748    class CLASS and auxiliary data AUX. */
749 struct case_sink *
750 create_case_sink (const struct case_sink_class *class,
751                   const struct dictionary *dict,
752                   void *aux) 
753 {
754   struct case_sink *sink = xmalloc (sizeof *sink);
755   sink->class = class;
756   sink->value_cnt = dict_get_compacted_value_cnt (dict);
757   sink->aux = aux;
758   return sink;
759 }
760
761 /* Destroys case sink SINK.  */
762 void
763 free_case_sink (struct case_sink *sink) 
764 {
765   if (sink != NULL) 
766     {
767       if (sink->class->destroy != NULL)
768         sink->class->destroy (sink);
769       free (sink); 
770     }
771 }
772 \f
773 /* Represents auxiliary data for handling SPLIT FILE. */
774 struct split_aux_data 
775   {
776     size_t case_count;          /* Number of cases so far. */
777     struct ccase prev_case;     /* Data in previous case. */
778
779     /* Functions to call... */
780     void (*begin_func) (void *);               /* ...before data. */
781     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
782     void (*end_func) (void *);                 /* ...after data. */
783     void *func_aux;                            /* Auxiliary data. */ 
784   };
785
786 static int equal_splits (const struct ccase *, const struct ccase *);
787 static bool procedure_with_splits_callback (struct ccase *, void *);
788 static void dump_splits (struct ccase *);
789
790 /* Like procedure(), but it automatically breaks the case stream
791    into SPLIT FILE break groups.  Before each group of cases with
792    identical SPLIT FILE variable values, BEGIN_FUNC is called.
793    Then PROC_FUNC is called with each case in the group.  
794    END_FUNC is called when the group is finished.  FUNC_AUX is
795    passed to each of the functions as auxiliary data.
796
797    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
798    and END_FUNC will be called at all. 
799
800    If SPLIT FILE is not in effect, then there is one break group
801    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
802    will be called once.
803    
804    Returns true if successful, false if an I/O error occurred. */
805 bool
806 procedure_with_splits (void (*begin_func) (void *aux),
807                        bool (*proc_func) (struct ccase *, void *aux),
808                        void (*end_func) (void *aux),
809                        void *func_aux) 
810 {
811   struct split_aux_data split_aux;
812   bool ok;
813
814   split_aux.case_count = 0;
815   case_nullify (&split_aux.prev_case);
816   split_aux.begin_func = begin_func;
817   split_aux.proc_func = proc_func;
818   split_aux.end_func = end_func;
819   split_aux.func_aux = func_aux;
820
821   open_active_file ();
822   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
823   if (split_aux.case_count > 0 && end_func != NULL)
824     end_func (func_aux);
825   if (!close_active_file ())
826     ok = false;
827
828   case_destroy (&split_aux.prev_case);
829
830   return ok;
831 }
832
833 /* procedure() callback used by procedure_with_splits(). */
834 static bool
835 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
836 {
837   struct split_aux_data *split_aux = split_aux_;
838
839   /* Start a new series if needed. */
840   if (split_aux->case_count == 0
841       || !equal_splits (c, &split_aux->prev_case))
842     {
843       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
844         split_aux->end_func (split_aux->func_aux);
845
846       dump_splits (c);
847       case_destroy (&split_aux->prev_case);
848       case_clone (&split_aux->prev_case, c);
849
850       if (split_aux->begin_func != NULL)
851         split_aux->begin_func (split_aux->func_aux);
852     }
853
854   split_aux->case_count++;
855   if (split_aux->proc_func != NULL)
856     return split_aux->proc_func (c, split_aux->func_aux);
857   else
858     return true;
859 }
860
861 /* Compares the SPLIT FILE variables in cases A and B and returns
862    nonzero only if they differ. */
863 static int
864 equal_splits (const struct ccase *a, const struct ccase *b) 
865 {
866   return case_compare (a, b,
867                        dict_get_split_vars (default_dict),
868                        dict_get_split_cnt (default_dict)) == 0;
869 }
870
871 /* Dumps out the values of all the split variables for the case C. */
872 static void
873 dump_splits (struct ccase *c)
874 {
875   struct variable *const *split;
876   struct tab_table *t;
877   size_t split_cnt;
878   int i;
879
880   split_cnt = dict_get_split_cnt (default_dict);
881   if (split_cnt == 0)
882     return;
883
884   t = tab_create (3, split_cnt + 1, 0);
885   tab_dim (t, tab_natural_dimensions);
886   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
887   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
888   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
889   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
890   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
891   split = dict_get_split_vars (default_dict);
892   for (i = 0; i < split_cnt; i++)
893     {
894       struct variable *v = split[i];
895       char temp_buf[80];
896       const char *val_lab;
897
898       assert (v->type == NUMERIC || v->type == ALPHA);
899       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
900       
901       data_out (temp_buf, &v->print, case_data (c, v->fv));
902       
903       temp_buf[v->print.w] = 0;
904       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
905
906       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
907       if (val_lab)
908         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
909     }
910   tab_flags (t, SOMF_NO_TITLE);
911   tab_submit (t);
912 }
913 \f
914 /* Represents auxiliary data for handling SPLIT FILE in a
915    multipass procedure. */
916 struct multipass_split_aux_data 
917   {
918     struct ccase prev_case;     /* Data in previous case. */
919     struct casefile *casefile;  /* Accumulates data for a split. */
920
921     /* Function to call with the accumulated data. */
922     bool (*split_func) (const struct casefile *, void *);
923     void *func_aux;                            /* Auxiliary data. */ 
924   };
925
926 static bool multipass_split_callback (struct ccase *c, void *aux_);
927 static void multipass_split_output (struct multipass_split_aux_data *);
928
929 /* Returns true if successful, false if an I/O error occurred. */
930 bool
931 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
932                                                      void *),
933                                  void *func_aux) 
934 {
935   struct multipass_split_aux_data aux;
936   bool ok;
937
938   assert (split_func != NULL);
939
940   open_active_file ();
941
942   case_nullify (&aux.prev_case);
943   aux.casefile = NULL;
944   aux.split_func = split_func;
945   aux.func_aux = func_aux;
946
947   ok = internal_procedure (multipass_split_callback, &aux);
948   if (aux.casefile != NULL)
949     multipass_split_output (&aux);
950   case_destroy (&aux.prev_case);
951
952   if (!close_active_file ())
953     ok = false;
954
955   return ok;
956 }
957
958 /* procedure() callback used by multipass_procedure_with_splits(). */
959 static bool
960 multipass_split_callback (struct ccase *c, void *aux_)
961 {
962   struct multipass_split_aux_data *aux = aux_;
963
964   /* Start a new series if needed. */
965   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
966     {
967       /* Pass any cases to split_func. */
968       if (aux->casefile != NULL)
969         multipass_split_output (aux);
970
971       /* Start a new casefile. */
972       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
973
974       /* Record split values. */
975       dump_splits (c);
976       case_destroy (&aux->prev_case);
977       case_clone (&aux->prev_case, c);
978     }
979
980   return casefile_append (aux->casefile, c);
981 }
982
983 static void
984 multipass_split_output (struct multipass_split_aux_data *aux)
985 {
986   assert (aux->casefile != NULL);
987   aux->split_func (aux->casefile, aux->func_aux);
988   casefile_destroy (aux->casefile);
989   aux->casefile = NULL;
990 }
991
992
993 /* Discards all the current state in preparation for a data-input
994    command like DATA LIST or GET. */
995 void
996 discard_variables (void)
997 {
998   dict_clear (default_dict);
999   fh_set_default_handle (NULL);
1000
1001   n_lag = 0;
1002   
1003   if (vfm_source != NULL)
1004     {
1005       free_case_source (vfm_source);
1006       vfm_source = NULL;
1007     }
1008
1009   cancel_transformations ();
1010
1011   ctl_stack_clear ();
1012
1013   expr_free (process_if_expr);
1014   process_if_expr = NULL;
1015
1016   cancel_temporary ();
1017
1018   pgm_state = STATE_INIT;
1019 }