Added custom assert(); Fixed bugs in T-TEST and VALUE LABELS
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include "error.h"
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "settings.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that
47    data is read from one stream (the data source), processed,
48    then written to another (the data sink).  The data source is
49    then deleted and the data sink becomes the data source for the
50    next procedure. */
51
52 /* Procedure execution data. */
53 struct write_case_data
54   {
55     /* Function to call for each case. */
56     int (*proc_func) (struct ccase *, void *); /* Function. */
57     void *aux;                                 /* Auxiliary data. */ 
58
59     struct ccase *trns_case;    /* Case used for transformations. */
60     struct ccase *sink_case;    /* Case written to sink, if
61                                    compaction is necessary. */
62     size_t cases_written;       /* Cases output so far. */
63     size_t cases_analyzed;      /* Cases passed to procedure so far. */
64   };
65
66 /* The current active file, from which cases are read. */
67 struct case_source *vfm_source;
68
69 /* The replacement active file, to which cases are written. */
70 struct case_sink *vfm_sink;
71
72 /* Nonzero if the case needs to have values deleted before being
73    stored, zero otherwise. */
74 static int compaction_necessary;
75
76 /* Nonzero means that we've overflowed our allotted workspace.
77    After that happens once during a session, we always store the
78    active file on disk instead of in memory.  (This policy may be
79    too aggressive.) */
80 static int workspace_overflow = 0;
81
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
90
91 static struct ccase *create_trns_case (struct dictionary *);
92 static void open_active_file (void);
93 static int write_case (struct write_case_data *wc_data);
94 static int execute_transformations (struct ccase *c,
95                                     struct trns_header **trns,
96                                     int first_idx, int last_idx,
97                                     int case_num);
98 static int filter_case (const struct ccase *c, int case_num);
99 static void lag_case (const struct ccase *c);
100 static void compact_case (struct ccase *dest, const struct ccase *src);
101 static void clear_case (struct ccase *c);
102 static void close_active_file (void);
103 \f
104 /* Public functions. */
105
106 /* Reads the data from the input program and writes it to a new
107    active file.  For each case we read from the input program, we
108    do the following
109
110    1. Execute permanent transformations.  If these drop the case,
111       start the next case from step 1.
112
113    2. N OF CASES.  If we have already written N cases, start the
114       next case from step 1.
115    
116    3. Write case to replacement active file.
117    
118    4. Execute temporary transformations.  If these drop the case,
119       start the next case from step 1.
120       
121    5. FILTER, PROCESS IF.  If these drop the case, start the next
122       case from step 1.
123    
124    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
125       cases, start the next case from step 1.
126       
127    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
128 void
129 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
130 {
131   static int recursive_call;
132
133   struct write_case_data wc_data;
134
135   assert (++recursive_call == 1);
136
137   wc_data.proc_func = proc_func;
138   wc_data.aux = aux;
139   wc_data.trns_case = create_trns_case (default_dict);
140   wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
141   wc_data.cases_written = 0;
142
143   last_vfm_invocation = time (NULL);
144
145   open_active_file ();
146   if (vfm_source != NULL) 
147     vfm_source->class->read (vfm_source,
148                              wc_data.trns_case,
149                              write_case, &wc_data);
150   close_active_file ();
151
152   free (wc_data.sink_case);
153   free (wc_data.trns_case);
154
155   assert (--recursive_call == 0);
156 }
157
158 /* Creates and returns a case, initializing it from the vectors
159    that say which `value's need to be initialized just once, and
160    which ones need to be re-initialized before every case. */
161 static struct ccase *
162 create_trns_case (struct dictionary *dict)
163 {
164   struct ccase *c = xmalloc (dict_get_case_size (dict));
165   size_t var_cnt = dict_get_var_cnt (dict);
166   size_t i;
167
168   for (i = 0; i < var_cnt; i++) 
169     {
170       struct variable *v = dict_get_var (dict, i);
171
172       if (v->type == NUMERIC) 
173         {
174           if (v->reinit)
175             c->data[v->fv].f = 0.0;
176           else
177             c->data[v->fv].f = SYSMIS;
178         }
179       else
180         memset (c->data[v->fv].s, ' ', v->width);
181     }
182   return c;
183 }
184
185 /* Makes all preparations for reading from the data source and writing
186    to the data sink. */
187 static void
188 open_active_file (void)
189 {
190   /* Make temp_dict refer to the dictionary right before data
191      reaches the sink */
192   if (!temporary)
193     {
194       temp_trns = n_trns;
195       temp_dict = default_dict;
196     }
197
198   /* Figure out compaction. */
199   compaction_necessary = (dict_get_next_value_idx (temp_dict)
200                           != dict_get_compacted_value_cnt (temp_dict));
201
202   /* Prepare sink. */
203   if (vfm_sink == NULL)
204     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
205   if (vfm_sink->class->open != NULL)
206     vfm_sink->class->open (vfm_sink);
207
208   /* Allocate memory for lag queue. */
209   if (n_lag > 0)
210     {
211       int i;
212   
213       lag_count = 0;
214       lag_head = 0;
215       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
216       for (i = 0; i < n_lag; i++)
217         lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
218     }
219
220   /* Close any unclosed DO IF or LOOP constructs. */
221   discard_ctl_stack ();
222 }
223
224 /* Transforms trns_case and writes it to the replacement active
225    file if advisable.  Returns nonzero if more cases can be
226    accepted, zero otherwise.  Do not call this function again
227    after it has returned zero once.  */
228 static int
229 write_case (struct write_case_data *wc_data)
230 {
231   /* Execute permanent transformations.  */
232   if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
233                                 wc_data->cases_written + 1))
234     goto done;
235
236   /* N OF CASES. */
237   if (dict_get_case_limit (default_dict)
238       && wc_data->cases_written >= dict_get_case_limit (default_dict))
239     goto done;
240   wc_data->cases_written++;
241
242   /* Write case to LAG queue. */
243   if (n_lag)
244     lag_case (wc_data->trns_case);
245
246   /* Write case to replacement active file. */
247   if (vfm_sink->class->write != NULL) 
248     {
249       if (compaction_necessary) 
250         {
251           compact_case (wc_data->sink_case, wc_data->trns_case);
252           vfm_sink->class->write (vfm_sink, wc_data->sink_case);
253         }
254       else
255         vfm_sink->class->write (vfm_sink, wc_data->trns_case);
256     }
257   
258   /* Execute temporary transformations. */
259   if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
260                                 wc_data->cases_written))
261     goto done;
262   
263   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
264   if (filter_case (wc_data->trns_case, wc_data->cases_written)
265       || (dict_get_case_limit (temp_dict)
266           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
267     goto done;
268   wc_data->cases_analyzed++;
269
270   /* Pass case to procedure. */
271   if (wc_data->proc_func != NULL)
272     wc_data->proc_func (wc_data->trns_case, wc_data->aux);
273
274  done:
275   clear_case (wc_data->trns_case);
276   return 1;
277 }
278
279 /* Transforms case C using the transformations in TRNS[] with
280    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
281    become case CASE_NUM (1-based) in the output file.  Returns
282    zero if the case was filtered out by one of the
283    transformations, nonzero otherwise. */
284 static int
285 execute_transformations (struct ccase *c,
286                          struct trns_header **trns,
287                          int first_idx, int last_idx,
288                          int case_num) 
289 {
290   int idx;
291
292   for (idx = first_idx; idx != last_idx; )
293     {
294       int retval = trns[idx]->proc (trns[idx], c, case_num);
295       switch (retval)
296         {
297         case -1:
298           idx++;
299           break;
300           
301         case -2:
302           return 0;
303           
304         default:
305           idx = retval;
306           break;
307         }
308     }
309
310   return 1;
311 }
312
313 /* Returns nonzero if case C with case number CASE_NUM should be
314    exclude as specified on FILTER or PROCESS IF, otherwise
315    zero. */
316 static int
317 filter_case (const struct ccase *c, int case_num)
318 {
319   /* FILTER. */
320   struct variable *filter_var = dict_get_filter (default_dict);
321   if (filter_var != NULL) 
322     {
323       double f = c->data[filter_var->fv].f;
324       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
325         return 1;
326     }
327
328   /* PROCESS IF. */
329   if (process_if_expr != NULL
330       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
331     return 1;
332
333   return 0;
334 }
335
336 /* Add C to the lag queue. */
337 static void
338 lag_case (const struct ccase *c)
339 {
340   if (lag_count < n_lag)
341     lag_count++;
342   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
343   if (++lag_head >= n_lag)
344     lag_head = 0;
345 }
346
347 /* Copies case SRC to case DEST, compacting it in the process. */
348 static void
349 compact_case (struct ccase *dest, const struct ccase *src)
350 {
351   int i;
352   int nval = 0;
353   size_t var_cnt;
354   
355   assert (compaction_necessary);
356
357   /* Copy all the variables except scratch variables from SRC to
358      DEST. */
359   var_cnt = dict_get_var_cnt (default_dict);
360   for (i = 0; i < var_cnt; i++)
361     {
362       struct variable *v = dict_get_var (default_dict, i);
363       
364       if (dict_class_from_id (v->name) == DC_SCRATCH)
365         continue;
366
367       if (v->type == NUMERIC)
368         dest->data[nval++] = src->data[v->fv];
369       else
370         {
371           int w = DIV_RND_UP (v->width, sizeof (union value));
372           
373           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
374           nval += w;
375         }
376     }
377 }
378
379 /* Clears the variables in C that need to be cleared between
380    processing cases.  */
381 static void
382 clear_case (struct ccase *c)
383 {
384   size_t var_cnt = dict_get_var_cnt (default_dict);
385   size_t i;
386   
387   for (i = 0; i < var_cnt; i++) 
388     {
389       struct variable *v = dict_get_var (default_dict, i);
390       if (v->init && v->reinit) 
391         {
392           if (v->type == NUMERIC) 
393             c->data[v->fv].f = SYSMIS;
394           else
395             memset (c->data[v->fv].s, ' ', v->width);
396         } 
397     }
398 }
399
400 /* Closes the active file. */
401 static void
402 close_active_file (void)
403 {
404   /* Free memory for lag queue, and turn off lagging. */
405   if (n_lag > 0)
406     {
407       int i;
408       
409       for (i = 0; i < n_lag; i++)
410         free (lag_queue[i]);
411       free (lag_queue);
412       n_lag = 0;
413     }
414   
415   /* Dictionary from before TEMPORARY becomes permanent.. */
416   if (temporary)
417     {
418       dict_destroy (default_dict);
419       default_dict = temp_dict;
420       temp_dict = NULL;
421     }
422
423   /* Finish compaction. */
424   if (compaction_necessary)
425     dict_compact_values (default_dict);
426     
427   /* Free data source. */
428   if (vfm_source != NULL) 
429     {
430       if (vfm_source->class->destroy != NULL)
431         vfm_source->class->destroy (vfm_source);
432       free (vfm_source);
433     }
434
435   /* Old data sink becomes new data source. */
436   if (vfm_sink->class->make_source != NULL)
437     vfm_source = vfm_sink->class->make_source (vfm_sink);
438   else 
439     {
440       if (vfm_sink->class->destroy != NULL)
441         vfm_sink->class->destroy (vfm_sink);
442       vfm_source = NULL; 
443     }
444   free_case_sink (vfm_sink);
445   vfm_sink = NULL;
446
447   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
448      and get rid of all the transformations. */
449   cancel_temporary ();
450   expr_free (process_if_expr);
451   process_if_expr = NULL;
452   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
453     dict_set_filter (default_dict, NULL);
454   dict_set_case_limit (default_dict, 0);
455   dict_clear_vectors (default_dict);
456   cancel_transformations ();
457 }
458 \f
459 /* Storage case stream. */
460
461 /* Information about storage sink or source. */
462 struct storage_stream_info 
463   {
464     size_t case_cnt;            /* Number of cases. */
465     size_t case_size;           /* Number of bytes in case. */
466     enum { DISK, MEMORY } mode; /* Where is data stored? */
467
468     /* Disk storage.  */
469     FILE *file;                 /* Data file. */
470
471     /* Memory storage. */
472     int max_cases;              /* Maximum cases before switching to disk. */
473     struct case_list *head;     /* First case in list. */
474     struct case_list *tail;     /* Last case in list. */
475   };
476
477 static void open_storage_file (struct storage_stream_info *info);
478
479 /* Initializes a storage sink. */
480 static void
481 storage_sink_open (struct case_sink *sink)
482 {
483   struct storage_stream_info *info;
484
485   sink->aux = info = xmalloc (sizeof *info);
486   info->case_cnt = 0;
487   info->case_size = sink->value_cnt * sizeof (union value);
488   info->file = NULL;
489   info->max_cases = 0;
490   info->head = info->tail = NULL;
491   if (workspace_overflow) 
492     {
493       info->mode = DISK;
494       open_storage_file (info);
495     }
496   else 
497     {
498       info->mode = MEMORY; 
499       info->max_cases = (get_max_workspace()
500                          / (sizeof (struct case_list) + info->case_size));
501     }
502 }
503
504 /* Creates a new temporary file and puts it into INFO. */
505 static void
506 open_storage_file (struct storage_stream_info *info) 
507 {
508   info->file = tmpfile ();
509   if (info->file == NULL)
510     {
511       msg (ME, _("An error occurred creating a temporary "
512                  "file for use as the active file: %s."),
513            strerror (errno));
514       err_failure ();
515     }
516 }
517
518 /* Writes the VALUE_CNT values in VALUES to FILE. */
519 static void
520 write_storage_file (FILE *file, const union value *values, size_t value_cnt) 
521 {
522   if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
523     {
524       msg (ME, _("An error occurred writing to a "
525                  "temporary file used as the active file: %s."),
526            strerror (errno));
527       err_failure ();
528     }
529 }
530
531 /* If INFO represents records in memory, moves them to disk.
532    Each comprises VALUE_CNT `union value's. */
533 static void
534 storage_to_disk (struct storage_stream_info *info, size_t value_cnt) 
535 {
536   struct case_list *cur, *next;
537
538   if (info->mode == MEMORY) 
539     {
540       info->mode = DISK;
541       open_storage_file (info);
542       for (cur = info->head; cur; cur = next)
543         {
544           next = cur->next;
545           write_storage_file (info->file, cur->c.data, value_cnt);
546           free (cur);
547         }
548       info->head = info->tail = NULL; 
549     }
550 }
551
552 /* Destroys storage stream represented by INFO. */
553 static void
554 destroy_storage_stream_info (struct storage_stream_info *info) 
555 {
556   if (info->mode == DISK) 
557     {
558       if (info->file != NULL)
559         fclose (info->file); 
560     }
561   else 
562     {
563       struct case_list *cur, *next;
564   
565       for (cur = info->head; cur; cur = next)
566         {
567           next = cur->next;
568           free (cur);
569         }
570     }
571   free (info); 
572 }
573
574 /* Writes case C to the storage sink SINK. */
575 static void
576 storage_sink_write (struct case_sink *sink, const struct ccase *c)
577 {
578   struct storage_stream_info *info = sink->aux;
579
580   info->case_cnt++;
581   if (info->mode == MEMORY) 
582     {
583       struct case_list *new_case;
584
585       /* Copy case. */
586       new_case = xmalloc (sizeof (struct case_list)
587                           + ((sink->value_cnt - 1) * sizeof (union value)));
588       memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
589
590       /* Append case to linked list. */
591       new_case->next = NULL;
592       if (info->head != NULL)
593         info->tail->next = new_case;
594       else
595         info->head = new_case;
596       info->tail = new_case;
597
598       /* Dump all the cases to disk if we've run out of
599          workspace. */
600       if (info->case_cnt > info->max_cases) 
601         {
602           workspace_overflow = 1;
603           msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
604                      "overflowed.  Writing active file to disk."),
605                get_max_workspace() / 1024, info->max_cases,
606                sizeof (struct case_list) + info->case_size);
607
608           storage_to_disk (info, sink->value_cnt);
609         }
610     }
611   else 
612     write_storage_file (info->file, c->data, sink->value_cnt);
613 }
614
615 /* Destroys internal data in SINK. */
616 static void
617 storage_sink_destroy (struct case_sink *sink)
618 {
619   destroy_storage_stream_info (sink->aux);
620 }
621
622 /* Closes and destroys the sink and returns a storage source to
623    read back the written data. */
624 static struct case_source *
625 storage_sink_make_source (struct case_sink *sink) 
626 {
627   struct storage_stream_info *info = sink->aux;
628
629   if (info->mode == DISK) 
630     {
631       /* Rewind the file. */
632       assert (info->file != NULL);
633       if (fseek (info->file, 0, SEEK_SET) != 0)
634         {
635           msg (ME, _("An error occurred while attempting to rewind a "
636                      "temporary file used as the active file: %s."),
637                strerror (errno));
638           err_failure ();
639         }
640     }
641
642   return create_case_source (&storage_source_class, sink->dict, info); 
643 }
644
645 /* Storage sink. */
646 const struct case_sink_class storage_sink_class = 
647   {
648     "storage",
649     storage_sink_open,
650     storage_sink_write,
651     storage_sink_destroy,
652     storage_sink_make_source,
653   };
654 \f
655 /* Storage source. */
656
657 /* Returns the number of cases that will be read by
658    storage_source_read(). */
659 static int
660 storage_source_count (const struct case_source *source) 
661 {
662   struct storage_stream_info *info = source->aux;
663
664   return info->case_cnt;
665 }
666
667 /* Reads all cases from the storage source and passes them one by one to
668    write_case(). */
669 static void
670 storage_source_read (struct case_source *source,
671                      struct ccase *c,
672                      write_case_func *write_case, write_case_data wc_data)
673 {
674   struct storage_stream_info *info = source->aux;
675
676   if (info->mode == DISK) 
677     {
678       int i;
679
680       for (i = 0; i < info->case_cnt; i++)
681         {
682           if (!fread (c, info->case_size, 1, info->file))
683             {
684               msg (ME, _("An error occurred while attempting to read from "
685                          "a temporary file created for the active file: %s."),
686                    strerror (errno));
687               err_failure ();
688               break;
689             }
690
691           if (!write_case (wc_data))
692             break;
693         }
694     }
695   else 
696     {
697       while (info->head != NULL) 
698         {
699           struct case_list *iter = info->head;
700           memcpy (c, &iter->c, info->case_size);
701           if (!write_case (wc_data)) 
702             break;
703             
704           info->head = iter->next;
705           free (iter);
706         }
707       info->tail = NULL;
708     }
709 }
710
711 /* Destroys the source's internal data. */
712 static void
713 storage_source_destroy (struct case_source *source)
714 {
715   destroy_storage_stream_info (source->aux);
716 }
717
718 /* Storage source. */
719 const struct case_source_class storage_source_class = 
720   {
721     "storage",
722     storage_source_count,
723     storage_source_read,
724     storage_source_destroy,
725   };
726
727 /* Returns nonzero only if SOURCE is stored on disk (instead of
728    in memory). */
729 int
730 storage_source_on_disk (const struct case_source *source) 
731 {
732   struct storage_stream_info *info = source->aux;
733
734   return info->mode == DISK;
735 }
736
737 /* Returns the list of cases in storage source SOURCE. */
738 struct case_list *
739 storage_source_get_cases (const struct case_source *source) 
740 {
741   struct storage_stream_info *info = source->aux;
742
743   assert (info->mode == MEMORY);
744   return info->head;
745 }
746
747 /* Sets the list of cases in memory source SOURCE to CASES. */
748 void
749 storage_source_set_cases (const struct case_source *source,
750                           struct case_list *cases) 
751 {
752   struct storage_stream_info *info = source->aux;
753
754   assert (info->mode == MEMORY);
755   info->head = cases;
756 }
757
758 /* If SOURCE has its cases in memory, writes them to disk. */
759 void
760 storage_source_to_disk (struct case_source *source) 
761 {
762   struct storage_stream_info *info = source->aux;
763
764   storage_to_disk (info, source->value_cnt);
765 }
766 \f
767 /* Null sink.  Used by a few procedures that keep track of output
768    themselves and would throw away anything that the sink
769    contained anyway. */
770
771 const struct case_sink_class null_sink_class = 
772   {
773     "null",
774     NULL,
775     NULL,
776     NULL,
777     NULL,
778   };
779 \f
780 /* Returns a pointer to the lagged case from N_BEFORE cases before the
781    current one, or NULL if there haven't been that many cases yet. */
782 struct ccase *
783 lagged_case (int n_before)
784 {
785   assert (n_before <= n_lag);
786   if (n_before > lag_count)
787     return NULL;
788   
789   {
790     int index = lag_head - n_before;
791     if (index < 0)
792       index += n_lag;
793     return lag_queue[index];
794   }
795 }
796    
797 /* Appends TRNS to t_trns[], the list of all transformations to be
798    performed on data as it is read from the active file. */
799 void
800 add_transformation (struct trns_header * trns)
801 {
802   if (n_trns >= m_trns)
803     {
804       m_trns += 16;
805       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
806     }
807   t_trns[n_trns] = trns;
808   trns->index = n_trns++;
809 }
810
811 /* Cancels all active transformations, including any transformations
812    created by the input program. */
813 void
814 cancel_transformations (void)
815 {
816   int i;
817   for (i = 0; i < n_trns; i++)
818     {
819       if (t_trns[i]->free)
820         t_trns[i]->free (t_trns[i]);
821       free (t_trns[i]);
822     }
823   n_trns = f_trns = 0;
824   if (m_trns > 32)
825     {
826       free (t_trns);
827       m_trns = 0;
828     }
829 }
830 \f
831 /* Creates a case source with class CLASS and auxiliary data AUX
832    and based on dictionary DICT. */
833 struct case_source *
834 create_case_source (const struct case_source_class *class,
835                     const struct dictionary *dict,
836                     void *aux) 
837 {
838   struct case_source *source = xmalloc (sizeof *source);
839   source->class = class;
840   source->value_cnt = dict_get_next_value_idx (dict);
841   source->aux = aux;
842   return source;
843 }
844
845 /* Returns nonzero if a case source is "complex". */
846 int
847 case_source_is_complex (const struct case_source *source) 
848 {
849   return source != NULL && (source->class == &input_program_source_class
850                             || source->class == &file_type_source_class);
851 }
852
853 /* Returns nonzero if CLASS is the class of SOURCE. */
854 int
855 case_source_is_class (const struct case_source *source,
856                       const struct case_source_class *class) 
857 {
858   return source != NULL && source->class == class;
859 }
860
861 /* Creates a case sink with class CLASS and auxiliary data
862    AUX. */
863 struct case_sink *
864 create_case_sink (const struct case_sink_class *class,
865                   const struct dictionary *dict,
866                   void *aux) 
867 {
868   struct case_sink *sink = xmalloc (sizeof *sink);
869   sink->class = class;
870   sink->dict = dict;
871   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
872   sink->value_cnt = dict_get_compacted_value_cnt (dict);
873   sink->aux = aux;
874   return sink;
875 }
876
877 /* Destroys case sink SINK.  It is the caller's responsible to
878    call the sink's destroy function, if any. */
879 void
880 free_case_sink (struct case_sink *sink) 
881 {
882   free (sink->idx_to_fv);
883   free (sink);
884 }
885 \f
886 /* Represents auxiliary data for handling SPLIT FILE. */
887 struct split_aux_data 
888   {
889     size_t case_count;          /* Number of cases so far. */
890     struct ccase *prev_case;    /* Data in previous case. */
891
892     /* Functions to call... */
893     void (*begin_func) (void *);               /* ...before data. */
894     int (*proc_func) (struct ccase *, void *); /* ...with data. */
895     void (*end_func) (void *);                 /* ...after data. */
896     void *func_aux;                            /* Auxiliary data. */ 
897   };
898
899 static int equal_splits (const struct ccase *, const struct ccase *);
900 static int procedure_with_splits_callback (struct ccase *, void *);
901 static void dump_splits (struct ccase *);
902
903 /* Like procedure(), but it automatically breaks the case stream
904    into SPLIT FILE break groups.  Before each group of cases with
905    identical SPLIT FILE variable values, BEGIN_FUNC is called.
906    Then PROC_FUNC is called with each case in the group.  
907    END_FUNC is called when the group is finished.  FUNC_AUX is
908    passed to each of the functions as auxiliary data.
909
910    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
911    and END_FUNC will be called at all. 
912
913    If SPLIT FILE is not in effect, then there is one break group
914    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
915    will be called once. */
916 void
917 procedure_with_splits (void (*begin_func) (void *aux),
918                        int (*proc_func) (struct ccase *, void *aux),
919                        void (*end_func) (void *aux),
920                        void *func_aux) 
921 {
922   struct split_aux_data split_aux;
923
924   split_aux.case_count = 0;
925   split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
926   split_aux.begin_func = begin_func;
927   split_aux.proc_func = proc_func;
928   split_aux.end_func = end_func;
929   split_aux.func_aux = func_aux;
930
931   procedure (procedure_with_splits_callback, &split_aux);
932
933   if (split_aux.case_count > 0 && end_func != NULL)
934     end_func (func_aux);
935   free (split_aux.prev_case);
936 }
937
938 /* procedure() callback used by procedure_with_splits(). */
939 static int
940 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
941 {
942   struct split_aux_data *split_aux = split_aux_;
943
944   /* Start a new series if needed. */
945   if (split_aux->case_count == 0
946       || !equal_splits (c, split_aux->prev_case))
947     {
948       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
949         split_aux->end_func (split_aux->func_aux);
950
951       dump_splits (c);
952       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
953
954       if (split_aux->begin_func != NULL)
955         split_aux->begin_func (split_aux->func_aux);
956     }
957
958   split_aux->case_count++;
959   if (split_aux->proc_func != NULL)
960     return split_aux->proc_func (c, split_aux->func_aux);
961   else
962     return 1;
963 }
964
965 /* Compares the SPLIT FILE variables in cases A and B and returns
966    nonzero only if they differ. */
967 static int
968 equal_splits (const struct ccase *a, const struct ccase *b) 
969 {
970   struct variable *const *split;
971   size_t split_cnt;
972   size_t i;
973     
974   split = dict_get_split_vars (default_dict);
975   split_cnt = dict_get_split_cnt (default_dict);
976   for (i = 0; i < split_cnt; i++)
977     {
978       struct variable *v = split[i];
979       
980       switch (v->type)
981         {
982         case NUMERIC:
983           if (a->data[v->fv].f != b->data[v->fv].f)
984             return 0;
985           break;
986         case ALPHA:
987           if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
988             return 0;
989           break;
990         default:
991           assert (0);
992         }
993     }
994
995   return 1;
996 }
997
998 /* Dumps out the values of all the split variables for the case C. */
999 static void
1000 dump_splits (struct ccase *c)
1001 {
1002   struct variable *const *split;
1003   struct tab_table *t;
1004   size_t split_cnt;
1005   int i;
1006
1007   split_cnt = dict_get_split_cnt (default_dict);
1008   if (split_cnt == 0)
1009     return;
1010
1011   t = tab_create (3, split_cnt + 1, 0);
1012   tab_dim (t, tab_natural_dimensions);
1013   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1014   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1015   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1016   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1017   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1018   split = dict_get_split_vars (default_dict);
1019   for (i = 0; i < split_cnt; i++)
1020     {
1021       struct variable *v = split[i];
1022       char temp_buf[80];
1023       const char *val_lab;
1024
1025       assert (v->type == NUMERIC || v->type == ALPHA);
1026       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1027       
1028       data_out (temp_buf, &v->print, &c->data[v->fv]);
1029       
1030       temp_buf[v->print.w] = 0;
1031       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1032
1033       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1034       if (val_lab)
1035         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1036     }
1037   tab_flags (t, SOMF_NO_TITLE);
1038   tab_submit (t);
1039 }