Add scratch file handles.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "case.h"
32 #include "casefile.h"
33 #include "command.h"
34 #include "dictionary.h"
35 #include "ctl-stack.h"
36 #include "error.h"
37 #include "expressions/public.h"
38 #include "file-handle-def.h"
39 #include "misc.h"
40 #include "settings.h"
41 #include "som.h"
42 #include "str.h"
43 #include "tab.h"
44 #include "var.h"
45 #include "value-labels.h"
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /*
51    Virtual File Manager (vfm):
52
53    vfm is used to process data files.  It uses the model that
54    data is read from one stream (the data source), processed,
55    then written to another (the data sink).  The data source is
56    then deleted and the data sink becomes the data source for the
57    next procedure. */
58
59 /* Procedure execution data. */
60 struct write_case_data
61   {
62     /* Function to call for each case. */
63     int (*proc_func) (struct ccase *, void *); /* Function. */
64     void *aux;                                 /* Auxiliary data. */ 
65
66     struct ccase trns_case;     /* Case used for transformations. */
67     struct ccase sink_case;     /* Case written to sink, if
68                                    compaction is necessary. */
69     size_t cases_written;       /* Cases output so far. */
70     size_t cases_analyzed;      /* Cases passed to procedure so far. */
71   };
72
73 /* The current active file, from which cases are read. */
74 struct case_source *vfm_source;
75
76 /* The replacement active file, to which cases are written. */
77 struct case_sink *vfm_sink;
78
79 /* The compactor used to compact a compact, if necessary;
80    otherwise a null pointer. */
81 static struct dict_compactor *compactor;
82
83 /* Time at which vfm was last invoked. */
84 static time_t last_vfm_invocation;
85
86 /* Lag queue. */
87 int n_lag;                      /* Number of cases to lag. */
88 static int lag_count;           /* Number of cases in lag_queue so far. */
89 static int lag_head;            /* Index where next case will be added. */
90 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
91
92 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
93                                 void *aux);
94 static void update_last_vfm_invocation (void);
95 static void create_trns_case (struct ccase *, struct dictionary *);
96 static void open_active_file (void);
97 static int write_case (struct write_case_data *wc_data);
98 static int execute_transformations (struct ccase *c,
99                                     struct transformation *trns,
100                                     int first_idx, int last_idx,
101                                     int case_num);
102 static int filter_case (const struct ccase *c, int case_num);
103 static void lag_case (const struct ccase *c);
104 static void clear_case (struct ccase *c);
105 static void close_active_file (void);
106 \f
107 /* Public functions. */
108
109 /* Returns the last time the data was read. */
110 time_t
111 vfm_last_invocation (void) 
112 {
113   if (last_vfm_invocation == 0)
114     update_last_vfm_invocation ();
115   return last_vfm_invocation;
116 }
117
118 /* Reads the data from the input program and writes it to a new
119    active file.  For each case we read from the input program, we
120    do the following
121
122    1. Execute permanent transformations.  If these drop the case,
123       start the next case from step 1.
124
125    2. N OF CASES.  If we have already written N cases, start the
126       next case from step 1.
127    
128    3. Write case to replacement active file.
129    
130    4. Execute temporary transformations.  If these drop the case,
131       start the next case from step 1.
132       
133    5. FILTER, PROCESS IF.  If these drop the case, start the next
134       case from step 1.
135    
136    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
137       cases, start the next case from step 1.
138       
139    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
140 void
141 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
142 {
143   if (proc_func == NULL
144       && case_source_is_class (vfm_source, &storage_source_class)
145       && vfm_sink == NULL
146       && !temporary
147       && n_trns == 0)
148     {
149       /* Nothing to do. */
150       update_last_vfm_invocation ();
151       return;
152     }
153
154   open_active_file ();
155   internal_procedure (proc_func, aux);
156   close_active_file ();
157 }
158
159 /* Executes a procedure, as procedure(), except that the caller
160    is responsible for calling open_active_file() and
161    close_active_file(). */
162 static void
163 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
164 {
165   static int recursive_call;
166
167   struct write_case_data wc_data;
168
169   assert (++recursive_call == 1);
170
171   wc_data.proc_func = proc_func;
172   wc_data.aux = aux;
173   create_trns_case (&wc_data.trns_case, default_dict);
174   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
175   wc_data.cases_written = 0;
176
177   update_last_vfm_invocation ();
178
179   if (vfm_source != NULL) 
180     vfm_source->class->read (vfm_source,
181                              &wc_data.trns_case,
182                              write_case, &wc_data);
183
184   case_destroy (&wc_data.sink_case);
185   case_destroy (&wc_data.trns_case);
186
187   assert (--recursive_call == 0);
188 }
189
190 /* Updates last_vfm_invocation. */
191 static void
192 update_last_vfm_invocation (void) 
193 {
194   last_vfm_invocation = time (NULL);
195 }
196
197 /* Creates and returns a case, initializing it from the vectors
198    that say which `value's need to be initialized just once, and
199    which ones need to be re-initialized before every case. */
200 static void
201 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
202 {
203   size_t var_cnt = dict_get_var_cnt (dict);
204   size_t i;
205
206   case_create (trns_case, dict_get_next_value_idx (dict));
207   for (i = 0; i < var_cnt; i++) 
208     {
209       struct variable *v = dict_get_var (dict, i);
210       union value *value = case_data_rw (trns_case, v->fv);
211
212       if (v->type == NUMERIC)
213         value->f = v->reinit ? 0.0 : SYSMIS;
214       else
215         memset (value->s, ' ', v->width);
216     }
217 }
218
219 /* Makes all preparations for reading from the data source and writing
220    to the data sink. */
221 static void
222 open_active_file (void)
223 {
224   /* Make temp_dict refer to the dictionary right before data
225      reaches the sink */
226   if (!temporary)
227     {
228       temp_trns = n_trns;
229       temp_dict = default_dict;
230     }
231
232   /* Figure out compaction. */
233   compactor = (dict_needs_compaction (temp_dict)
234                ? dict_make_compactor (temp_dict)
235                : NULL);
236
237   /* Prepare sink. */
238   if (vfm_sink == NULL)
239     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
240   if (vfm_sink->class->open != NULL)
241     vfm_sink->class->open (vfm_sink);
242
243   /* Allocate memory for lag queue. */
244   if (n_lag > 0)
245     {
246       int i;
247   
248       lag_count = 0;
249       lag_head = 0;
250       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
251       for (i = 0; i < n_lag; i++)
252         case_nullify (&lag_queue[i]);
253     }
254
255   /* Close any unclosed DO IF or LOOP constructs. */
256   ctl_stack_clear ();
257 }
258
259 /* Transforms trns_case and writes it to the replacement active
260    file if advisable.  Returns nonzero if more cases can be
261    accepted, zero otherwise.  Do not call this function again
262    after it has returned zero once.  */
263 static int
264 write_case (struct write_case_data *wc_data)
265 {
266   /* Execute permanent transformations.  */
267   if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
268                                 wc_data->cases_written + 1))
269     goto done;
270
271   /* N OF CASES. */
272   if (dict_get_case_limit (default_dict)
273       && wc_data->cases_written >= dict_get_case_limit (default_dict))
274     goto done;
275   wc_data->cases_written++;
276
277   /* Write case to LAG queue. */
278   if (n_lag)
279     lag_case (&wc_data->trns_case);
280
281   /* Write case to replacement active file. */
282   if (vfm_sink->class->write != NULL) 
283     {
284       if (compactor != NULL) 
285         {
286           dict_compactor_compact (compactor, &wc_data->sink_case,
287                                   &wc_data->trns_case);
288           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
289         }
290       else
291         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
292     }
293   
294   /* Execute temporary transformations. */
295   if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
296                                 wc_data->cases_written))
297     goto done;
298   
299   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
300   if (filter_case (&wc_data->trns_case, wc_data->cases_written)
301       || (dict_get_case_limit (temp_dict)
302           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
303     goto done;
304   wc_data->cases_analyzed++;
305
306   /* Pass case to procedure. */
307   if (wc_data->proc_func != NULL)
308     wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
309
310  done:
311   clear_case (&wc_data->trns_case);
312   return 1;
313 }
314
315 /* Transforms case C using the transformations in TRNS[] with
316    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
317    become case CASE_NUM (1-based) in the output file.  Returns
318    zero if the case was filtered out by one of the
319    transformations, nonzero otherwise. */
320 static int
321 execute_transformations (struct ccase *c,
322                          struct transformation *trns,
323                          int first_idx, int last_idx,
324                          int case_num) 
325 {
326   int idx;
327
328   for (idx = first_idx; idx != last_idx; )
329     {
330       struct transformation *t = &trns[idx];
331       int retval = t->proc (t->private, c, case_num);
332       switch (retval)
333         {
334         case -1:
335           idx++;
336           break;
337           
338         case -2:
339           return 0;
340           
341         default:
342           idx = retval;
343           break;
344         }
345     }
346
347   return 1;
348 }
349
350 /* Returns nonzero if case C with case number CASE_NUM should be
351    exclude as specified on FILTER or PROCESS IF, otherwise
352    zero. */
353 static int
354 filter_case (const struct ccase *c, int case_idx)
355 {
356   /* FILTER. */
357   struct variable *filter_var = dict_get_filter (default_dict);
358   if (filter_var != NULL) 
359     {
360       double f = case_num (c, filter_var->fv);
361       if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
362         return 1;
363     }
364
365   /* PROCESS IF. */
366   if (process_if_expr != NULL
367       && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
368     return 1;
369
370   return 0;
371 }
372
373 /* Add C to the lag queue. */
374 static void
375 lag_case (const struct ccase *c)
376 {
377   if (lag_count < n_lag)
378     lag_count++;
379   case_destroy (&lag_queue[lag_head]);
380   case_clone (&lag_queue[lag_head], c);
381   if (++lag_head >= n_lag)
382     lag_head = 0;
383 }
384
385 /* Clears the variables in C that need to be cleared between
386    processing cases.  */
387 static void
388 clear_case (struct ccase *c)
389 {
390   size_t var_cnt = dict_get_var_cnt (default_dict);
391   size_t i;
392   
393   for (i = 0; i < var_cnt; i++) 
394     {
395       struct variable *v = dict_get_var (default_dict, i);
396       if (v->init && v->reinit) 
397         {
398           if (v->type == NUMERIC)
399             case_data_rw (c, v->fv)->f = SYSMIS;
400           else
401             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
402         } 
403     }
404 }
405
406 /* Closes the active file. */
407 static void
408 close_active_file (void)
409 {
410   /* Free memory for lag queue, and turn off lagging. */
411   if (n_lag > 0)
412     {
413       int i;
414       
415       for (i = 0; i < n_lag; i++)
416         case_destroy (&lag_queue[i]);
417       free (lag_queue);
418       n_lag = 0;
419     }
420   
421   /* Dictionary from before TEMPORARY becomes permanent.. */
422   if (temporary)
423     {
424       dict_destroy (default_dict);
425       default_dict = temp_dict;
426       temp_dict = NULL;
427     }
428
429   /* Finish compaction. */
430   if (compactor != NULL) 
431     {
432       dict_compactor_destroy (compactor);
433       dict_compact_values (default_dict); 
434     }
435     
436   /* Free data source. */
437   free_case_source (vfm_source);
438   vfm_source = NULL;
439
440   /* Old data sink becomes new data source. */
441   if (vfm_sink->class->make_source != NULL)
442     vfm_source = vfm_sink->class->make_source (vfm_sink);
443   free_case_sink (vfm_sink);
444   vfm_sink = NULL;
445
446   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
447      and get rid of all the transformations. */
448   cancel_temporary ();
449   expr_free (process_if_expr);
450   process_if_expr = NULL;
451   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
452     dict_set_filter (default_dict, NULL);
453   dict_set_case_limit (default_dict, 0);
454   dict_clear_vectors (default_dict);
455   cancel_transformations ();
456 }
457 \f
458 /* Storage case stream. */
459
460 /* Information about storage sink or source. */
461 struct storage_stream_info 
462   {
463     struct casefile *casefile;  /* Storage. */
464   };
465
466 /* Initializes a storage sink. */
467 static void
468 storage_sink_open (struct case_sink *sink)
469 {
470   struct storage_stream_info *info;
471
472   sink->aux = info = xmalloc (sizeof *info);
473   info->casefile = casefile_create (sink->value_cnt);
474 }
475
476 /* Destroys storage stream represented by INFO. */
477 static void
478 destroy_storage_stream_info (struct storage_stream_info *info) 
479 {
480   if (info != NULL) 
481     {
482       casefile_destroy (info->casefile);
483       free (info); 
484     }
485 }
486
487 /* Writes case C to the storage sink SINK. */
488 static void
489 storage_sink_write (struct case_sink *sink, const struct ccase *c)
490 {
491   struct storage_stream_info *info = sink->aux;
492
493   casefile_append (info->casefile, c);
494 }
495
496 /* Destroys internal data in SINK. */
497 static void
498 storage_sink_destroy (struct case_sink *sink)
499 {
500   destroy_storage_stream_info (sink->aux);
501 }
502
503 /* Closes the sink and returns a storage source to read back the
504    written data. */
505 static struct case_source *
506 storage_sink_make_source (struct case_sink *sink) 
507 {
508   struct case_source *source
509     = create_case_source (&storage_source_class, sink->aux);
510   sink->aux = NULL;
511   return source;
512 }
513
514 /* Storage sink. */
515 const struct case_sink_class storage_sink_class = 
516   {
517     "storage",
518     storage_sink_open,
519     storage_sink_write,
520     storage_sink_destroy,
521     storage_sink_make_source,
522   };
523 \f
524 /* Storage source. */
525
526 /* Returns the number of cases that will be read by
527    storage_source_read(). */
528 static int
529 storage_source_count (const struct case_source *source) 
530 {
531   struct storage_stream_info *info = source->aux;
532
533   return casefile_get_case_cnt (info->casefile);
534 }
535
536 /* Reads all cases from the storage source and passes them one by one to
537    write_case(). */
538 static void
539 storage_source_read (struct case_source *source,
540                      struct ccase *output_case,
541                      write_case_func *write_case, write_case_data wc_data)
542 {
543   struct storage_stream_info *info = source->aux;
544   struct ccase casefile_case;
545   struct casereader *reader;
546
547   for (reader = casefile_get_reader (info->casefile);
548        casereader_read (reader, &casefile_case);
549        case_destroy (&casefile_case))
550     {
551       case_copy (output_case, 0,
552                  &casefile_case, 0,
553                  casefile_get_value_cnt (info->casefile));
554       write_case (wc_data);
555     }
556   casereader_destroy (reader);
557 }
558
559 /* Destroys the source's internal data. */
560 static void
561 storage_source_destroy (struct case_source *source)
562 {
563   destroy_storage_stream_info (source->aux);
564 }
565
566 /* Storage source. */
567 const struct case_source_class storage_source_class = 
568   {
569     "storage",
570     storage_source_count,
571     storage_source_read,
572     storage_source_destroy,
573   };
574
575 struct casefile *
576 storage_source_get_casefile (struct case_source *source) 
577 {
578   struct storage_stream_info *info = source->aux;
579
580   assert (source->class == &storage_source_class);
581   return info->casefile;
582 }
583
584 struct case_source *
585 storage_source_create (struct casefile *cf)
586 {
587   struct storage_stream_info *info;
588
589   info = xmalloc (sizeof *info);
590   info->casefile = cf;
591
592   return create_case_source (&storage_source_class, info);
593 }
594 \f
595 /* Null sink.  Used by a few procedures that keep track of output
596    themselves and would throw away anything that the sink
597    contained anyway. */
598
599 const struct case_sink_class null_sink_class = 
600   {
601     "null",
602     NULL,
603     NULL,
604     NULL,
605     NULL,
606   };
607 \f
608 /* Returns a pointer to the lagged case from N_BEFORE cases before the
609    current one, or NULL if there haven't been that many cases yet. */
610 struct ccase *
611 lagged_case (int n_before)
612 {
613   assert (n_before >= 1 );
614   assert (n_before <= n_lag);
615
616   if (n_before <= lag_count)
617     {
618       int index = lag_head - n_before;
619       if (index < 0)
620         index += n_lag;
621       return &lag_queue[index];
622     }
623   else
624     return NULL;
625 }
626    
627 /* Appends TRNS to t_trns[], the list of all transformations to be
628    performed on data as it is read from the active file. */
629 void
630 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
631 {
632   struct transformation *trns;
633   if (n_trns >= m_trns)
634     t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
635   trns = &t_trns[n_trns++];
636   trns->proc = proc;
637   trns->free = free;
638   trns->private = private;
639 }
640
641 /* Returns the index number that the next transformation added by
642    add_transformation() will receive.  A trns_proc_func that
643    returns this index causes control flow to jump to it. */
644 size_t
645 next_transformation (void) 
646 {
647   return n_trns;
648 }
649
650 /* Cancels all active transformations, including any transformations
651    created by the input program. */
652 void
653 cancel_transformations (void)
654 {
655   size_t i;
656   for (i = 0; i < n_trns; i++)
657     {
658       struct transformation *t = &t_trns[i];
659       if (t->free != NULL)
660         t->free (t->private);
661     }
662   n_trns = f_trns = 0;
663   free (t_trns);
664   t_trns = NULL;
665   m_trns = 0;
666 }
667 \f
668 /* Creates a case source with class CLASS and auxiliary data AUX
669    and based on dictionary DICT. */
670 struct case_source *
671 create_case_source (const struct case_source_class *class,
672                     void *aux) 
673 {
674   struct case_source *source = xmalloc (sizeof *source);
675   source->class = class;
676   source->aux = aux;
677   return source;
678 }
679
680 /* Destroys case source SOURCE.  It is the caller's responsible to
681    call the source's destroy function, if any. */
682 void
683 free_case_source (struct case_source *source) 
684 {
685   if (source != NULL) 
686     {
687       if (source->class->destroy != NULL)
688         source->class->destroy (source);
689       free (source);
690     }
691 }
692
693 /* Returns nonzero if a case source is "complex". */
694 int
695 case_source_is_complex (const struct case_source *source) 
696 {
697   return source != NULL && (source->class == &input_program_source_class
698                             || source->class == &file_type_source_class);
699 }
700
701 /* Returns nonzero if CLASS is the class of SOURCE. */
702 int
703 case_source_is_class (const struct case_source *source,
704                       const struct case_source_class *class) 
705 {
706   return source != NULL && source->class == class;
707 }
708
709 /* Creates a case sink to accept cases from the given DICT with
710    class CLASS and auxiliary data AUX. */
711 struct case_sink *
712 create_case_sink (const struct case_sink_class *class,
713                   const struct dictionary *dict,
714                   void *aux) 
715 {
716   struct case_sink *sink = xmalloc (sizeof *sink);
717   sink->class = class;
718   sink->value_cnt = dict_get_compacted_value_cnt (dict);
719   sink->aux = aux;
720   return sink;
721 }
722
723 /* Destroys case sink SINK.  */
724 void
725 free_case_sink (struct case_sink *sink) 
726 {
727   if (sink != NULL) 
728     {
729       if (sink->class->destroy != NULL)
730         sink->class->destroy (sink);
731       free (sink); 
732     }
733 }
734 \f
735 /* Represents auxiliary data for handling SPLIT FILE. */
736 struct split_aux_data 
737   {
738     size_t case_count;          /* Number of cases so far. */
739     struct ccase prev_case;     /* Data in previous case. */
740
741     /* Functions to call... */
742     void (*begin_func) (void *);               /* ...before data. */
743     int (*proc_func) (struct ccase *, void *); /* ...with data. */
744     void (*end_func) (void *);                 /* ...after data. */
745     void *func_aux;                            /* Auxiliary data. */ 
746   };
747
748 static int equal_splits (const struct ccase *, const struct ccase *);
749 static int procedure_with_splits_callback (struct ccase *, void *);
750 static void dump_splits (struct ccase *);
751
752 /* Like procedure(), but it automatically breaks the case stream
753    into SPLIT FILE break groups.  Before each group of cases with
754    identical SPLIT FILE variable values, BEGIN_FUNC is called.
755    Then PROC_FUNC is called with each case in the group.  
756    END_FUNC is called when the group is finished.  FUNC_AUX is
757    passed to each of the functions as auxiliary data.
758
759    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
760    and END_FUNC will be called at all. 
761
762    If SPLIT FILE is not in effect, then there is one break group
763    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
764    will be called once. */
765 void
766 procedure_with_splits (void (*begin_func) (void *aux),
767                        int (*proc_func) (struct ccase *, void *aux),
768                        void (*end_func) (void *aux),
769                        void *func_aux) 
770 {
771   struct split_aux_data split_aux;
772
773   split_aux.case_count = 0;
774   case_nullify (&split_aux.prev_case);
775   split_aux.begin_func = begin_func;
776   split_aux.proc_func = proc_func;
777   split_aux.end_func = end_func;
778   split_aux.func_aux = func_aux;
779
780   open_active_file ();
781   internal_procedure (procedure_with_splits_callback, &split_aux);
782   if (split_aux.case_count > 0 && end_func != NULL)
783     end_func (func_aux);
784   close_active_file ();
785
786   case_destroy (&split_aux.prev_case);
787 }
788
789 /* procedure() callback used by procedure_with_splits(). */
790 static int
791 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
792 {
793   struct split_aux_data *split_aux = split_aux_;
794
795   /* Start a new series if needed. */
796   if (split_aux->case_count == 0
797       || !equal_splits (c, &split_aux->prev_case))
798     {
799       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
800         split_aux->end_func (split_aux->func_aux);
801
802       dump_splits (c);
803       case_destroy (&split_aux->prev_case);
804       case_clone (&split_aux->prev_case, c);
805
806       if (split_aux->begin_func != NULL)
807         split_aux->begin_func (split_aux->func_aux);
808     }
809
810   split_aux->case_count++;
811   if (split_aux->proc_func != NULL)
812     return split_aux->proc_func (c, split_aux->func_aux);
813   else
814     return 1;
815 }
816
817 /* Compares the SPLIT FILE variables in cases A and B and returns
818    nonzero only if they differ. */
819 static int
820 equal_splits (const struct ccase *a, const struct ccase *b) 
821 {
822   return case_compare (a, b,
823                        dict_get_split_vars (default_dict),
824                        dict_get_split_cnt (default_dict)) == 0;
825 }
826
827 /* Dumps out the values of all the split variables for the case C. */
828 static void
829 dump_splits (struct ccase *c)
830 {
831   struct variable *const *split;
832   struct tab_table *t;
833   size_t split_cnt;
834   int i;
835
836   split_cnt = dict_get_split_cnt (default_dict);
837   if (split_cnt == 0)
838     return;
839
840   t = tab_create (3, split_cnt + 1, 0);
841   tab_dim (t, tab_natural_dimensions);
842   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
843   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
844   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
845   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
846   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
847   split = dict_get_split_vars (default_dict);
848   for (i = 0; i < split_cnt; i++)
849     {
850       struct variable *v = split[i];
851       char temp_buf[80];
852       const char *val_lab;
853
854       assert (v->type == NUMERIC || v->type == ALPHA);
855       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
856       
857       data_out (temp_buf, &v->print, case_data (c, v->fv));
858       
859       temp_buf[v->print.w] = 0;
860       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
861
862       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
863       if (val_lab)
864         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
865     }
866   tab_flags (t, SOMF_NO_TITLE);
867   tab_submit (t);
868 }
869 \f
870 /* Represents auxiliary data for handling SPLIT FILE in a
871    multipass procedure. */
872 struct multipass_split_aux_data 
873   {
874     struct ccase prev_case;     /* Data in previous case. */
875     struct casefile *casefile;  /* Accumulates data for a split. */
876
877     /* Function to call with the accumulated data. */
878     void (*split_func) (const struct casefile *, void *);
879     void *func_aux;                            /* Auxiliary data. */ 
880   };
881
882 static int multipass_split_callback (struct ccase *c, void *aux_);
883 static void multipass_split_output (struct multipass_split_aux_data *);
884
885 void
886 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
887                                                      void *),
888                                  void *func_aux) 
889 {
890   struct multipass_split_aux_data aux;
891
892   assert (split_func != NULL);
893
894   open_active_file ();
895
896   case_nullify (&aux.prev_case);
897   aux.casefile = NULL;
898   aux.split_func = split_func;
899   aux.func_aux = func_aux;
900
901   internal_procedure (multipass_split_callback, &aux);
902   if (aux.casefile != NULL)
903     multipass_split_output (&aux);
904   case_destroy (&aux.prev_case);
905
906   close_active_file ();
907 }
908
909 /* procedure() callback used by multipass_procedure_with_splits(). */
910 static int
911 multipass_split_callback (struct ccase *c, void *aux_)
912 {
913   struct multipass_split_aux_data *aux = aux_;
914
915   /* Start a new series if needed. */
916   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
917     {
918       /* Pass any cases to split_func. */
919       if (aux->casefile != NULL)
920         multipass_split_output (aux);
921
922       /* Start a new casefile. */
923       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
924
925       /* Record split values. */
926       dump_splits (c);
927       case_destroy (&aux->prev_case);
928       case_clone (&aux->prev_case, c);
929     }
930
931   casefile_append (aux->casefile, c);
932
933   return 1;
934 }
935
936 static void
937 multipass_split_output (struct multipass_split_aux_data *aux)
938 {
939   assert (aux->casefile != NULL);
940   aux->split_func (aux->casefile, aux->func_aux);
941   casefile_destroy (aux->casefile);
942   aux->casefile = NULL;
943 }
944
945
946 /* Discards all the current state in preparation for a data-input
947    command like DATA LIST or GET. */
948 void
949 discard_variables (void)
950 {
951   dict_clear (default_dict);
952   fh_set_default_handle (NULL);
953
954   n_lag = 0;
955   
956   if (vfm_source != NULL)
957     {
958       free_case_source (vfm_source);
959       vfm_source = NULL;
960     }
961
962   cancel_transformations ();
963
964   ctl_stack_clear ();
965
966   expr_free (process_if_expr);
967   process_if_expr = NULL;
968
969   cancel_temporary ();
970
971   pgm_state = STATE_INIT;
972 }