Implemented the SHOW command and massaged the SET command to fit
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "settings.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that
47    data is read from one stream (the data source), processed,
48    then written to another (the data sink).  The data source is
49    then deleted and the data sink becomes the data source for the
50    next procedure. */
51
52 /* Procedure execution data. */
53 struct write_case_data
54   {
55     /* Function to call for each case. */
56     int (*proc_func) (struct ccase *, void *); /* Function. */
57     void *aux;                                 /* Auxiliary data. */ 
58
59     struct ccase *trns_case;    /* Case used for transformations. */
60     struct ccase *sink_case;    /* Case written to sink, if
61                                    compaction is necessary. */
62     size_t cases_written;       /* Cases output so far. */
63     size_t cases_analyzed;      /* Cases passed to procedure so far. */
64   };
65
66 /* The current active file, from which cases are read. */
67 struct case_source *vfm_source;
68
69 /* The replacement active file, to which cases are written. */
70 struct case_sink *vfm_sink;
71
72 /* Nonzero if the case needs to have values deleted before being
73    stored, zero otherwise. */
74 static int compaction_necessary;
75
76 /* Nonzero means that we've overflowed our allotted workspace.
77    After that happens once during a session, we always store the
78    active file on disk instead of in memory.  (This policy may be
79    too aggressive.) */
80 static int workspace_overflow = 0;
81
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
84
85 /* Lag queue. */
86 int n_lag;                      /* Number of cases to lag. */
87 static int lag_count;           /* Number of cases in lag_queue so far. */
88 static int lag_head;            /* Index where next case will be added. */
89 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
90
91 static struct ccase *create_trns_case (struct dictionary *);
92 static void open_active_file (void);
93 static int write_case (struct write_case_data *wc_data);
94 static int execute_transformations (struct ccase *c,
95                                     struct trns_header **trns,
96                                     int first_idx, int last_idx,
97                                     int case_num);
98 static int filter_case (const struct ccase *c, int case_num);
99 static void lag_case (const struct ccase *c);
100 static void compact_case (struct ccase *dest, const struct ccase *src);
101 static void clear_case (struct ccase *c);
102 static void close_active_file (void);
103 \f
104 /* Public functions. */
105
106 /* Reads the data from the input program and writes it to a new
107    active file.  For each case we read from the input program, we
108    do the following
109
110    1. Execute permanent transformations.  If these drop the case,
111       start the next case from step 1.
112
113    2. N OF CASES.  If we have already written N cases, start the
114       next case from step 1.
115    
116    3. Write case to replacement active file.
117    
118    4. Execute temporary transformations.  If these drop the case,
119       start the next case from step 1.
120       
121    5. FILTER, PROCESS IF.  If these drop the case, start the next
122       case from step 1.
123    
124    6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
125       cases, start the next case from step 1.
126       
127    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
128 void
129 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
130 {
131   static int recursive_call;
132
133   struct write_case_data wc_data;
134
135   assert (++recursive_call == 1);
136
137   wc_data.proc_func = proc_func;
138   wc_data.aux = aux;
139   wc_data.trns_case = create_trns_case (default_dict);
140   wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
141   wc_data.cases_written = 0;
142
143   last_vfm_invocation = time (NULL);
144
145   open_active_file ();
146   if (vfm_source != NULL) 
147     vfm_source->class->read (vfm_source,
148                              wc_data.trns_case,
149                              write_case, &wc_data);
150   close_active_file ();
151
152   free (wc_data.sink_case);
153   free (wc_data.trns_case);
154
155   assert (--recursive_call == 0);
156 }
157
158 /* Creates and returns a case, initializing it from the vectors
159    that say which `value's need to be initialized just once, and
160    which ones need to be re-initialized before every case. */
161 static struct ccase *
162 create_trns_case (struct dictionary *dict)
163 {
164   struct ccase *c = xmalloc (dict_get_case_size (dict));
165   size_t var_cnt = dict_get_var_cnt (dict);
166   size_t i;
167
168   for (i = 0; i < var_cnt; i++) 
169     {
170       struct variable *v = dict_get_var (dict, i);
171
172       if (v->type == NUMERIC) 
173         {
174           if (v->reinit)
175             c->data[v->fv].f = 0.0;
176           else
177             c->data[v->fv].f = SYSMIS;
178         }
179       else
180         memset (c->data[v->fv].s, ' ', v->width);
181     }
182   return c;
183 }
184
185 /* Makes all preparations for reading from the data source and writing
186    to the data sink. */
187 static void
188 open_active_file (void)
189 {
190   /* Make temp_dict refer to the dictionary right before data
191      reaches the sink */
192   if (!temporary)
193     {
194       temp_trns = n_trns;
195       temp_dict = default_dict;
196     }
197
198   /* Figure out compaction. */
199   compaction_necessary = (dict_get_next_value_idx (temp_dict)
200                           != dict_get_compacted_value_cnt (temp_dict));
201
202   /* Prepare sink. */
203   if (vfm_sink == NULL)
204     vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
205   if (vfm_sink->class->open != NULL)
206     vfm_sink->class->open (vfm_sink);
207
208   /* Allocate memory for lag queue. */
209   if (n_lag > 0)
210     {
211       int i;
212   
213       lag_count = 0;
214       lag_head = 0;
215       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
216       for (i = 0; i < n_lag; i++)
217         lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
218     }
219
220   /* Close any unclosed DO IF or LOOP constructs. */
221   discard_ctl_stack ();
222 }
223
224 /* Transforms trns_case and writes it to the replacement active
225    file if advisable.  Returns nonzero if more cases can be
226    accepted, zero otherwise.  Do not call this function again
227    after it has returned zero once.  */
228 static int
229 write_case (struct write_case_data *wc_data)
230 {
231   /* Execute permanent transformations.  */
232   if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
233                                 wc_data->cases_written + 1))
234     goto done;
235
236   /* N OF CASES. */
237   if (dict_get_case_limit (default_dict)
238       && wc_data->cases_written >= dict_get_case_limit (default_dict))
239     goto done;
240   wc_data->cases_written++;
241
242   /* Write case to LAG queue. */
243   if (n_lag)
244     lag_case (wc_data->trns_case);
245
246   /* Write case to replacement active file. */
247   if (vfm_sink->class->write != NULL) 
248     {
249       if (compaction_necessary) 
250         {
251           compact_case (wc_data->sink_case, wc_data->trns_case);
252           vfm_sink->class->write (vfm_sink, wc_data->sink_case);
253         }
254       else
255         vfm_sink->class->write (vfm_sink, wc_data->trns_case);
256     }
257   
258   /* Execute temporary transformations. */
259   if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
260                                 wc_data->cases_written))
261     goto done;
262   
263   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
264   if (filter_case (wc_data->trns_case, wc_data->cases_written)
265       || (dict_get_case_limit (temp_dict)
266           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
267     goto done;
268   wc_data->cases_analyzed++;
269
270   /* Pass case to procedure. */
271   if (wc_data->proc_func != NULL)
272     wc_data->proc_func (wc_data->trns_case, wc_data->aux);
273
274  done:
275   clear_case (wc_data->trns_case);
276   return 1;
277 }
278
279 /* Transforms case C using the transformations in TRNS[] with
280    indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
281    become case CASE_NUM (1-based) in the output file.  Returns
282    zero if the case was filtered out by one of the
283    transformations, nonzero otherwise. */
284 static int
285 execute_transformations (struct ccase *c,
286                          struct trns_header **trns,
287                          int first_idx, int last_idx,
288                          int case_num) 
289 {
290   int idx;
291
292   for (idx = first_idx; idx != last_idx; )
293     {
294       int retval = trns[idx]->proc (trns[idx], c, case_num);
295       switch (retval)
296         {
297         case -1:
298           idx++;
299           break;
300           
301         case -2:
302           return 0;
303           
304         default:
305           idx = retval;
306           break;
307         }
308     }
309
310   return 1;
311 }
312
313 /* Returns nonzero if case C with case number CASE_NUM should be
314    exclude as specified on FILTER or PROCESS IF, otherwise
315    zero. */
316 static int
317 filter_case (const struct ccase *c, int case_num)
318 {
319   /* FILTER. */
320   struct variable *filter_var = dict_get_filter (default_dict);
321   if (filter_var != NULL) 
322     {
323       double f = c->data[filter_var->fv].f;
324       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
325         return 1;
326     }
327
328   /* PROCESS IF. */
329   if (process_if_expr != NULL
330       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
331     return 1;
332
333   return 0;
334 }
335
336 /* Add C to the lag queue. */
337 static void
338 lag_case (const struct ccase *c)
339 {
340   if (lag_count < n_lag)
341     lag_count++;
342   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
343   if (++lag_head >= n_lag)
344     lag_head = 0;
345 }
346
347 /* Copies case SRC to case DEST, compacting it in the process. */
348 static void
349 compact_case (struct ccase *dest, const struct ccase *src)
350 {
351   int i;
352   int nval = 0;
353   size_t var_cnt;
354   
355   assert (compaction_necessary);
356
357   /* Copy all the variables except scratch variables from SRC to
358      DEST. */
359   var_cnt = dict_get_var_cnt (default_dict);
360   for (i = 0; i < var_cnt; i++)
361     {
362       struct variable *v = dict_get_var (default_dict, i);
363       
364       if (dict_class_from_id (v->name) == DC_SCRATCH)
365         continue;
366
367       if (v->type == NUMERIC)
368         dest->data[nval++] = src->data[v->fv];
369       else
370         {
371           int w = DIV_RND_UP (v->width, sizeof (union value));
372           
373           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
374           nval += w;
375         }
376     }
377 }
378
379 /* Clears the variables in C that need to be cleared between
380    processing cases.  */
381 static void
382 clear_case (struct ccase *c)
383 {
384   size_t var_cnt = dict_get_var_cnt (default_dict);
385   size_t i;
386   
387   for (i = 0; i < var_cnt; i++) 
388     {
389       struct variable *v = dict_get_var (default_dict, i);
390       if (v->init && v->reinit) 
391         {
392           if (v->type == NUMERIC) 
393             c->data[v->fv].f = SYSMIS;
394           else
395             memset (c->data[v->fv].s, ' ', v->width);
396         } 
397     }
398 }
399
400 /* Closes the active file. */
401 static void
402 close_active_file (void)
403 {
404   /* Free memory for lag queue, and turn off lagging. */
405   if (n_lag > 0)
406     {
407       int i;
408       
409       for (i = 0; i < n_lag; i++)
410         free (lag_queue[i]);
411       free (lag_queue);
412       n_lag = 0;
413     }
414   
415   /* Dictionary from before TEMPORARY becomes permanent.. */
416   if (temporary)
417     {
418       dict_destroy (default_dict);
419       default_dict = temp_dict;
420       temp_dict = NULL;
421     }
422
423   /* Finish compaction. */
424   if (compaction_necessary)
425     dict_compact_values (default_dict);
426     
427   /* Free data source. */
428   if (vfm_source != NULL) 
429     {
430       if (vfm_source->class->destroy != NULL)
431         vfm_source->class->destroy (vfm_source);
432       free (vfm_source);
433     }
434
435   /* Old data sink becomes new data source. */
436   if (vfm_sink->class->make_source != NULL)
437     vfm_source = vfm_sink->class->make_source (vfm_sink);
438   else
439     vfm_source = NULL;
440   free_case_sink (vfm_sink);
441   vfm_sink = NULL;
442
443   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
444      and get rid of all the transformations. */
445   cancel_temporary ();
446   expr_free (process_if_expr);
447   process_if_expr = NULL;
448   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
449     dict_set_filter (default_dict, NULL);
450   dict_set_case_limit (default_dict, 0);
451   dict_clear_vectors (default_dict);
452   cancel_transformations ();
453 }
454 \f
455 /* Storage case stream. */
456
457 /* Information about storage sink or source. */
458 struct storage_stream_info 
459   {
460     size_t case_cnt;            /* Number of cases. */
461     size_t case_size;           /* Number of bytes in case. */
462     enum { DISK, MEMORY } mode; /* Where is data stored? */
463
464     /* Disk storage.  */
465     FILE *file;                 /* Data file. */
466
467     /* Memory storage. */
468     int max_cases;              /* Maximum cases before switching to disk. */
469     struct case_list *head;     /* First case in list. */
470     struct case_list *tail;     /* Last case in list. */
471   };
472
473 static void open_storage_file (struct storage_stream_info *info);
474
475 /* Initializes a storage sink. */
476 static void
477 storage_sink_open (struct case_sink *sink)
478 {
479   struct storage_stream_info *info;
480
481   sink->aux = info = xmalloc (sizeof *info);
482   info->case_cnt = 0;
483   info->case_size = sink->value_cnt * sizeof (union value);
484   info->file = NULL;
485   info->max_cases = 0;
486   info->head = info->tail = NULL;
487   if (workspace_overflow) 
488     {
489       info->mode = DISK;
490       open_storage_file (info);
491     }
492   else 
493     {
494       info->mode = MEMORY; 
495       info->max_cases = (get_max_workspace()
496                          / (sizeof (struct case_list) + info->case_size));
497     }
498 }
499
500 /* Creates a new temporary file and puts it into INFO. */
501 static void
502 open_storage_file (struct storage_stream_info *info) 
503 {
504   info->file = tmpfile ();
505   if (info->file == NULL)
506     {
507       msg (ME, _("An error occurred creating a temporary "
508                  "file for use as the active file: %s."),
509            strerror (errno));
510       err_failure ();
511     }
512 }
513
514 /* Writes the VALUE_CNT values in VALUES to FILE. */
515 static void
516 write_storage_file (FILE *file, const union value *values, size_t value_cnt) 
517 {
518   if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
519     {
520       msg (ME, _("An error occurred writing to a "
521                  "temporary file used as the active file: %s."),
522            strerror (errno));
523       err_failure ();
524     }
525 }
526
527 /* If INFO represents records in memory, moves them to disk.
528    Each comprises VALUE_CNT `union value's. */
529 static void
530 storage_to_disk (struct storage_stream_info *info, size_t value_cnt) 
531 {
532   struct case_list *cur, *next;
533
534   if (info->mode == MEMORY) 
535     {
536       info->mode = DISK;
537       open_storage_file (info);
538       for (cur = info->head; cur; cur = next)
539         {
540           next = cur->next;
541           write_storage_file (info->file, cur->c.data, value_cnt);
542           free (cur);
543         }
544       info->head = info->tail = NULL; 
545     }
546 }
547
548 /* Destroys storage stream represented by INFO. */
549 static void
550 destroy_storage_stream_info (struct storage_stream_info *info) 
551 {
552   if (info->mode == DISK) 
553     {
554       if (info->file != NULL)
555         fclose (info->file); 
556     }
557   else 
558     {
559       struct case_list *cur, *next;
560   
561       for (cur = info->head; cur; cur = next)
562         {
563           next = cur->next;
564           free (cur);
565         }
566     }
567   free (info); 
568 }
569
570 /* Writes case C to the storage sink SINK. */
571 static void
572 storage_sink_write (struct case_sink *sink, const struct ccase *c)
573 {
574   struct storage_stream_info *info = sink->aux;
575
576   info->case_cnt++;
577   if (info->mode == MEMORY) 
578     {
579       struct case_list *new_case;
580
581       /* Copy case. */
582       new_case = xmalloc (sizeof (struct case_list)
583                           + ((sink->value_cnt - 1) * sizeof (union value)));
584       memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
585
586       /* Append case to linked list. */
587       new_case->next = NULL;
588       if (info->head != NULL)
589         info->tail->next = new_case;
590       else
591         info->head = new_case;
592       info->tail = new_case;
593
594       /* Dump all the cases to disk if we've run out of
595          workspace. */
596       if (info->case_cnt > info->max_cases) 
597         {
598           workspace_overflow = 1;
599           msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
600                      "overflowed.  Writing active file to disk."),
601                get_max_workspace() / 1024, info->max_cases,
602                sizeof (struct case_list) + info->case_size);
603
604           storage_to_disk (info, sink->value_cnt);
605         }
606     }
607   else 
608     write_storage_file (info->file, c->data, sink->value_cnt);
609 }
610
611 /* Destroys internal data in SINK. */
612 static void
613 storage_sink_destroy (struct case_sink *sink)
614 {
615   destroy_storage_stream_info (sink->aux);
616 }
617
618 /* Closes and destroys the sink and returns a storage source to
619    read back the written data. */
620 static struct case_source *
621 storage_sink_make_source (struct case_sink *sink) 
622 {
623   struct storage_stream_info *info = sink->aux;
624
625   if (info->mode == DISK) 
626     {
627       /* Rewind the file. */
628       assert (info->file != NULL);
629       if (fseek (info->file, 0, SEEK_SET) != 0)
630         {
631           msg (ME, _("An error occurred while attempting to rewind a "
632                      "temporary file used as the active file: %s."),
633                strerror (errno));
634           err_failure ();
635         }
636     }
637
638   return create_case_source (&storage_source_class, sink->dict, info); 
639 }
640
641 /* Storage sink. */
642 const struct case_sink_class storage_sink_class = 
643   {
644     "storage",
645     storage_sink_open,
646     storage_sink_write,
647     storage_sink_destroy,
648     storage_sink_make_source,
649   };
650 \f
651 /* Storage source. */
652
653 /* Returns the number of cases that will be read by
654    storage_source_read(). */
655 static int
656 storage_source_count (const struct case_source *source) 
657 {
658   struct storage_stream_info *info = source->aux;
659
660   return info->case_cnt;
661 }
662
663 /* Reads all cases from the storage source and passes them one by one to
664    write_case(). */
665 static void
666 storage_source_read (struct case_source *source,
667                      struct ccase *c,
668                      write_case_func *write_case, write_case_data wc_data)
669 {
670   struct storage_stream_info *info = source->aux;
671
672   if (info->mode == DISK) 
673     {
674       int i;
675
676       for (i = 0; i < info->case_cnt; i++)
677         {
678           if (!fread (c, info->case_size, 1, info->file))
679             {
680               msg (ME, _("An error occurred while attempting to read from "
681                          "a temporary file created for the active file: %s."),
682                    strerror (errno));
683               err_failure ();
684               break;
685             }
686
687           if (!write_case (wc_data))
688             break;
689         }
690     }
691   else 
692     {
693       while (info->head != NULL) 
694         {
695           struct case_list *iter = info->head;
696           memcpy (c, &iter->c, info->case_size);
697           if (!write_case (wc_data)) 
698             break;
699             
700           info->head = iter->next;
701           free (iter);
702         }
703       info->tail = NULL;
704     }
705 }
706
707 /* Destroys the source's internal data. */
708 static void
709 storage_source_destroy (struct case_source *source)
710 {
711   destroy_storage_stream_info (source->aux);
712 }
713
714 /* Storage source. */
715 const struct case_source_class storage_source_class = 
716   {
717     "storage",
718     storage_source_count,
719     storage_source_read,
720     storage_source_destroy,
721   };
722
723 /* Returns nonzero only if SOURCE is stored on disk (instead of
724    in memory). */
725 int
726 storage_source_on_disk (const struct case_source *source) 
727 {
728   struct storage_stream_info *info = source->aux;
729
730   return info->mode == DISK;
731 }
732
733 /* Returns the list of cases in storage source SOURCE. */
734 struct case_list *
735 storage_source_get_cases (const struct case_source *source) 
736 {
737   struct storage_stream_info *info = source->aux;
738
739   assert (info->mode == MEMORY);
740   return info->head;
741 }
742
743 /* Sets the list of cases in memory source SOURCE to CASES. */
744 void
745 storage_source_set_cases (const struct case_source *source,
746                           struct case_list *cases) 
747 {
748   struct storage_stream_info *info = source->aux;
749
750   assert (info->mode == MEMORY);
751   info->head = cases;
752 }
753
754 /* If SOURCE has its cases in memory, writes them to disk. */
755 void
756 storage_source_to_disk (struct case_source *source) 
757 {
758   struct storage_stream_info *info = source->aux;
759
760   storage_to_disk (info, source->value_cnt);
761 }
762 \f
763 /* Null sink.  Used by a few procedures that keep track of output
764    themselves and would throw away anything that the sink
765    contained anyway. */
766
767 const struct case_sink_class null_sink_class = 
768   {
769     "null",
770     NULL,
771     NULL,
772     NULL,
773     NULL,
774   };
775 \f
776 /* Returns a pointer to the lagged case from N_BEFORE cases before the
777    current one, or NULL if there haven't been that many cases yet. */
778 struct ccase *
779 lagged_case (int n_before)
780 {
781   assert (n_before <= n_lag);
782   if (n_before > lag_count)
783     return NULL;
784   
785   {
786     int index = lag_head - n_before;
787     if (index < 0)
788       index += n_lag;
789     return lag_queue[index];
790   }
791 }
792    
793 /* Appends TRNS to t_trns[], the list of all transformations to be
794    performed on data as it is read from the active file. */
795 void
796 add_transformation (struct trns_header * trns)
797 {
798   if (n_trns >= m_trns)
799     {
800       m_trns += 16;
801       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
802     }
803   t_trns[n_trns] = trns;
804   trns->index = n_trns++;
805 }
806
807 /* Cancels all active transformations, including any transformations
808    created by the input program. */
809 void
810 cancel_transformations (void)
811 {
812   int i;
813   for (i = 0; i < n_trns; i++)
814     {
815       if (t_trns[i]->free)
816         t_trns[i]->free (t_trns[i]);
817       free (t_trns[i]);
818     }
819   n_trns = f_trns = 0;
820   if (m_trns > 32)
821     {
822       free (t_trns);
823       m_trns = 0;
824     }
825 }
826 \f
827 /* Creates a case source with class CLASS and auxiliary data AUX
828    and based on dictionary DICT. */
829 struct case_source *
830 create_case_source (const struct case_source_class *class,
831                     const struct dictionary *dict,
832                     void *aux) 
833 {
834   struct case_source *source = xmalloc (sizeof *source);
835   source->class = class;
836   source->value_cnt = dict_get_next_value_idx (dict);
837   source->aux = aux;
838   return source;
839 }
840
841 /* Returns nonzero if a case source is "complex". */
842 int
843 case_source_is_complex (const struct case_source *source) 
844 {
845   return source != NULL && (source->class == &input_program_source_class
846                             || source->class == &file_type_source_class);
847 }
848
849 /* Returns nonzero if CLASS is the class of SOURCE. */
850 int
851 case_source_is_class (const struct case_source *source,
852                       const struct case_source_class *class) 
853 {
854   return source != NULL && source->class == class;
855 }
856
857 /* Creates a case sink with class CLASS and auxiliary data
858    AUX. */
859 struct case_sink *
860 create_case_sink (const struct case_sink_class *class,
861                   const struct dictionary *dict,
862                   void *aux) 
863 {
864   struct case_sink *sink = xmalloc (sizeof *sink);
865   sink->class = class;
866   sink->dict = dict;
867   sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
868   sink->value_cnt = dict_get_compacted_value_cnt (dict);
869   sink->aux = aux;
870   return sink;
871 }
872
873 /* Destroys case sink SINK.  It is the caller's responsible to
874    call the sink's destroy function, if any. */
875 void
876 free_case_sink (struct case_sink *sink) 
877 {
878   free (sink->idx_to_fv);
879   free (sink);
880 }
881 \f
882 /* Represents auxiliary data for handling SPLIT FILE. */
883 struct split_aux_data 
884   {
885     size_t case_count;          /* Number of cases so far. */
886     struct ccase *prev_case;    /* Data in previous case. */
887
888     /* Functions to call... */
889     void (*begin_func) (void *);               /* ...before data. */
890     int (*proc_func) (struct ccase *, void *); /* ...with data. */
891     void (*end_func) (void *);                 /* ...after data. */
892     void *func_aux;                            /* Auxiliary data. */ 
893   };
894
895 static int equal_splits (const struct ccase *, const struct ccase *);
896 static int procedure_with_splits_callback (struct ccase *, void *);
897 static void dump_splits (struct ccase *);
898
899 /* Like procedure(), but it automatically breaks the case stream
900    into SPLIT FILE break groups.  Before each group of cases with
901    identical SPLIT FILE variable values, BEGIN_FUNC is called.
902    Then PROC_FUNC is called with each case in the group.  
903    END_FUNC is called when the group is finished.  FUNC_AUX is
904    passed to each of the functions as auxiliary data.
905
906    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
907    and END_FUNC will be called at all. 
908
909    If SPLIT FILE is not in effect, then there is one break group
910    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
911    will be called once. */
912 void
913 procedure_with_splits (void (*begin_func) (void *aux),
914                        int (*proc_func) (struct ccase *, void *aux),
915                        void (*end_func) (void *aux),
916                        void *func_aux) 
917 {
918   struct split_aux_data split_aux;
919
920   split_aux.case_count = 0;
921   split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
922   split_aux.begin_func = begin_func;
923   split_aux.proc_func = proc_func;
924   split_aux.end_func = end_func;
925   split_aux.func_aux = func_aux;
926
927   procedure (procedure_with_splits_callback, &split_aux);
928
929   if (split_aux.case_count > 0 && end_func != NULL)
930     end_func (func_aux);
931   free (split_aux.prev_case);
932 }
933
934 /* procedure() callback used by procedure_with_splits(). */
935 static int
936 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
937 {
938   struct split_aux_data *split_aux = split_aux_;
939
940   /* Start a new series if needed. */
941   if (split_aux->case_count == 0
942       || !equal_splits (c, split_aux->prev_case))
943     {
944       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
945         split_aux->end_func (split_aux->func_aux);
946
947       dump_splits (c);
948       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
949
950       if (split_aux->begin_func != NULL)
951         split_aux->begin_func (split_aux->func_aux);
952     }
953
954   split_aux->case_count++;
955   if (split_aux->proc_func != NULL)
956     return split_aux->proc_func (c, split_aux->func_aux);
957   else
958     return 1;
959 }
960
961 /* Compares the SPLIT FILE variables in cases A and B and returns
962    nonzero only if they differ. */
963 static int
964 equal_splits (const struct ccase *a, const struct ccase *b) 
965 {
966   struct variable *const *split;
967   size_t split_cnt;
968   size_t i;
969     
970   split = dict_get_split_vars (default_dict);
971   split_cnt = dict_get_split_cnt (default_dict);
972   for (i = 0; i < split_cnt; i++)
973     {
974       struct variable *v = split[i];
975       
976       switch (v->type)
977         {
978         case NUMERIC:
979           if (a->data[v->fv].f != b->data[v->fv].f)
980             return 0;
981           break;
982         case ALPHA:
983           if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
984             return 0;
985           break;
986         default:
987           assert (0);
988         }
989     }
990
991   return 1;
992 }
993
994 /* Dumps out the values of all the split variables for the case C. */
995 static void
996 dump_splits (struct ccase *c)
997 {
998   struct variable *const *split;
999   struct tab_table *t;
1000   size_t split_cnt;
1001   int i;
1002
1003   split_cnt = dict_get_split_cnt (default_dict);
1004   if (split_cnt == 0)
1005     return;
1006
1007   t = tab_create (3, split_cnt + 1, 0);
1008   tab_dim (t, tab_natural_dimensions);
1009   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1010   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1011   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1012   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1013   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1014   split = dict_get_split_vars (default_dict);
1015   for (i = 0; i < split_cnt; i++)
1016     {
1017       struct variable *v = split[i];
1018       char temp_buf[80];
1019       const char *val_lab;
1020
1021       assert (v->type == NUMERIC || v->type == ALPHA);
1022       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1023       
1024       data_out (temp_buf, &v->print, &c->data[v->fv]);
1025       
1026       temp_buf[v->print.w] = 0;
1027       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1028
1029       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1030       if (val_lab)
1031         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1032     }
1033   tab_flags (t, SOMF_NO_TITLE);
1034   tab_submit (t);
1035 }