1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
33 #include "dictionary.h"
36 #include "expressions/public.h"
43 #include "value-labels.h"
46 Virtual File Manager (vfm):
48 vfm is used to process data files. It uses the model that
49 data is read from one stream (the data source), processed,
50 then written to another (the data sink). The data source is
51 then deleted and the data sink becomes the data source for the
54 /* Procedure execution data. */
55 struct write_case_data
57 /* Function to call for each case. */
58 int (*proc_func) (struct ccase *, void *); /* Function. */
59 void *aux; /* Auxiliary data. */
61 struct ccase trns_case; /* Case used for transformations. */
62 struct ccase sink_case; /* Case written to sink, if
63 compaction is necessary. */
64 size_t cases_written; /* Cases output so far. */
65 size_t cases_analyzed; /* Cases passed to procedure so far. */
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
74 /* Nonzero if the case needs to have values deleted before being
75 stored, zero otherwise. */
76 static int compaction_necessary;
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
82 int n_lag; /* Number of cases to lag. */
83 static int lag_count; /* Number of cases in lag_queue so far. */
84 static int lag_head; /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93 struct trns_header **trns,
94 int first_idx, int last_idx,
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
101 /* Public functions. */
103 /* Reads the data from the input program and writes it to a new
104 active file. For each case we read from the input program, we
107 1. Execute permanent transformations. If these drop the case,
108 start the next case from step 1.
110 2. N OF CASES. If we have already written N cases, start the
111 next case from step 1.
113 3. Write case to replacement active file.
115 4. Execute temporary transformations. If these drop the case,
116 start the next case from step 1.
118 5. FILTER, PROCESS IF. If these drop the case, start the next
121 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
122 cases, start the next case from step 1.
124 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
128 if (proc_func == NULL
129 && case_source_is_class (vfm_source, &storage_source_class)
139 internal_procedure (proc_func, aux);
140 close_active_file ();
143 /* Executes a procedure, as procedure(), except that the caller
144 is responsible for calling open_active_file() and
145 close_active_file(). */
147 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
149 static int recursive_call;
151 struct write_case_data wc_data;
153 assert (++recursive_call == 1);
155 wc_data.proc_func = proc_func;
157 create_trns_case (&wc_data.trns_case, default_dict);
158 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
159 wc_data.cases_written = 0;
161 last_vfm_invocation = time (NULL);
163 if (vfm_source != NULL)
164 vfm_source->class->read (vfm_source,
166 write_case, &wc_data);
168 case_destroy (&wc_data.sink_case);
169 case_destroy (&wc_data.trns_case);
171 assert (--recursive_call == 0);
174 /* Creates and returns a case, initializing it from the vectors
175 that say which `value's need to be initialized just once, and
176 which ones need to be re-initialized before every case. */
178 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
180 size_t var_cnt = dict_get_var_cnt (dict);
183 case_create (trns_case, dict_get_next_value_idx (dict));
184 for (i = 0; i < var_cnt; i++)
186 struct variable *v = dict_get_var (dict, i);
187 union value *value = case_data_rw (trns_case, v->fv);
189 if (v->type == NUMERIC)
190 value->f = v->reinit ? 0.0 : SYSMIS;
192 memset (value->s, ' ', v->width);
196 /* Makes all preparations for reading from the data source and writing
199 open_active_file (void)
201 /* Make temp_dict refer to the dictionary right before data
206 temp_dict = default_dict;
209 /* Figure out compaction. */
210 compaction_necessary = (dict_get_next_value_idx (temp_dict)
211 != dict_get_compacted_value_cnt (temp_dict));
214 if (vfm_sink == NULL)
215 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
216 if (vfm_sink->class->open != NULL)
217 vfm_sink->class->open (vfm_sink);
219 /* Allocate memory for lag queue. */
226 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
227 for (i = 0; i < n_lag; i++)
228 case_nullify (&lag_queue[i]);
231 /* Close any unclosed DO IF or LOOP constructs. */
232 discard_ctl_stack ();
235 /* Transforms trns_case and writes it to the replacement active
236 file if advisable. Returns nonzero if more cases can be
237 accepted, zero otherwise. Do not call this function again
238 after it has returned zero once. */
240 write_case (struct write_case_data *wc_data)
242 /* Execute permanent transformations. */
243 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
244 wc_data->cases_written + 1))
248 if (dict_get_case_limit (default_dict)
249 && wc_data->cases_written >= dict_get_case_limit (default_dict))
251 wc_data->cases_written++;
253 /* Write case to LAG queue. */
255 lag_case (&wc_data->trns_case);
257 /* Write case to replacement active file. */
258 if (vfm_sink->class->write != NULL)
260 if (compaction_necessary)
262 dict_compact_case (temp_dict, &wc_data->sink_case, &wc_data->trns_case);
263 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
266 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
269 /* Execute temporary transformations. */
270 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
271 wc_data->cases_written))
274 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
275 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
276 || (dict_get_case_limit (temp_dict)
277 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
279 wc_data->cases_analyzed++;
281 /* Pass case to procedure. */
282 if (wc_data->proc_func != NULL)
283 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
286 clear_case (&wc_data->trns_case);
290 /* Transforms case C using the transformations in TRNS[] with
291 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
292 become case CASE_NUM (1-based) in the output file. Returns
293 zero if the case was filtered out by one of the
294 transformations, nonzero otherwise. */
296 execute_transformations (struct ccase *c,
297 struct trns_header **trns,
298 int first_idx, int last_idx,
303 for (idx = first_idx; idx != last_idx; )
305 int retval = trns[idx]->proc (trns[idx], c, case_num);
324 /* Returns nonzero if case C with case number CASE_NUM should be
325 exclude as specified on FILTER or PROCESS IF, otherwise
328 filter_case (const struct ccase *c, int case_idx)
331 struct variable *filter_var = dict_get_filter (default_dict);
332 if (filter_var != NULL)
334 double f = case_num (c, filter_var->fv);
335 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
340 if (process_if_expr != NULL
341 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
347 /* Add C to the lag queue. */
349 lag_case (const struct ccase *c)
351 if (lag_count < n_lag)
353 case_destroy (&lag_queue[lag_head]);
354 case_clone (&lag_queue[lag_head], c);
355 if (++lag_head >= n_lag)
359 /* Clears the variables in C that need to be cleared between
362 clear_case (struct ccase *c)
364 size_t var_cnt = dict_get_var_cnt (default_dict);
367 for (i = 0; i < var_cnt; i++)
369 struct variable *v = dict_get_var (default_dict, i);
370 if (v->init && v->reinit)
372 if (v->type == NUMERIC)
373 case_data_rw (c, v->fv)->f = SYSMIS;
375 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
380 /* Closes the active file. */
382 close_active_file (void)
384 /* Free memory for lag queue, and turn off lagging. */
389 for (i = 0; i < n_lag; i++)
390 case_destroy (&lag_queue[i]);
395 /* Dictionary from before TEMPORARY becomes permanent.. */
398 dict_destroy (default_dict);
399 default_dict = temp_dict;
403 /* Finish compaction. */
404 if (compaction_necessary)
405 dict_compact_values (default_dict);
407 /* Free data source. */
408 if (vfm_source != NULL)
410 free_case_source (vfm_source);
414 /* Old data sink becomes new data source. */
415 if (vfm_sink->class->make_source != NULL)
416 vfm_source = vfm_sink->class->make_source (vfm_sink);
417 free_case_sink (vfm_sink);
420 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
421 and get rid of all the transformations. */
423 expr_free (process_if_expr);
424 process_if_expr = NULL;
425 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
426 dict_set_filter (default_dict, NULL);
427 dict_set_case_limit (default_dict, 0);
428 dict_clear_vectors (default_dict);
429 cancel_transformations ();
432 /* Storage case stream. */
434 /* Information about storage sink or source. */
435 struct storage_stream_info
437 struct casefile *casefile; /* Storage. */
440 /* Initializes a storage sink. */
442 storage_sink_open (struct case_sink *sink)
444 struct storage_stream_info *info;
446 sink->aux = info = xmalloc (sizeof *info);
447 info->casefile = casefile_create (sink->value_cnt);
450 /* Destroys storage stream represented by INFO. */
452 destroy_storage_stream_info (struct storage_stream_info *info)
456 casefile_destroy (info->casefile);
461 /* Writes case C to the storage sink SINK. */
463 storage_sink_write (struct case_sink *sink, const struct ccase *c)
465 struct storage_stream_info *info = sink->aux;
467 casefile_append (info->casefile, c);
470 /* Destroys internal data in SINK. */
472 storage_sink_destroy (struct case_sink *sink)
474 destroy_storage_stream_info (sink->aux);
477 /* Closes the sink and returns a storage source to read back the
479 static struct case_source *
480 storage_sink_make_source (struct case_sink *sink)
482 struct case_source *source
483 = create_case_source (&storage_source_class, sink->aux);
489 const struct case_sink_class storage_sink_class =
494 storage_sink_destroy,
495 storage_sink_make_source,
498 /* Storage source. */
500 /* Returns the number of cases that will be read by
501 storage_source_read(). */
503 storage_source_count (const struct case_source *source)
505 struct storage_stream_info *info = source->aux;
507 return casefile_get_case_cnt (info->casefile);
510 /* Reads all cases from the storage source and passes them one by one to
513 storage_source_read (struct case_source *source,
514 struct ccase *output_case,
515 write_case_func *write_case, write_case_data wc_data)
517 struct storage_stream_info *info = source->aux;
518 struct ccase casefile_case;
519 struct casereader *reader;
521 for (reader = casefile_get_reader (info->casefile);
522 casereader_read (reader, &casefile_case);
523 case_destroy (&casefile_case))
525 case_copy (output_case, 0,
527 casefile_get_value_cnt (info->casefile));
528 write_case (wc_data);
530 casereader_destroy (reader);
533 /* Destroys the source's internal data. */
535 storage_source_destroy (struct case_source *source)
537 destroy_storage_stream_info (source->aux);
540 /* Storage source. */
541 const struct case_source_class storage_source_class =
544 storage_source_count,
546 storage_source_destroy,
550 storage_source_get_casefile (struct case_source *source)
552 struct storage_stream_info *info = source->aux;
554 assert (source->class == &storage_source_class);
555 return info->casefile;
559 storage_source_create (struct casefile *cf)
561 struct storage_stream_info *info;
563 info = xmalloc (sizeof *info);
566 return create_case_source (&storage_source_class, info);
569 /* Null sink. Used by a few procedures that keep track of output
570 themselves and would throw away anything that the sink
573 const struct case_sink_class null_sink_class =
582 /* Returns a pointer to the lagged case from N_BEFORE cases before the
583 current one, or NULL if there haven't been that many cases yet. */
585 lagged_case (int n_before)
587 assert (n_before >= 1 );
588 assert (n_before <= n_lag);
590 if (n_before <= lag_count)
592 int index = lag_head - n_before;
595 return &lag_queue[index];
601 /* Appends TRNS to t_trns[], the list of all transformations to be
602 performed on data as it is read from the active file. */
604 add_transformation (struct trns_header * trns)
606 if (n_trns >= m_trns)
609 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
611 t_trns[n_trns] = trns;
612 trns->index = n_trns++;
615 /* Cancels all active transformations, including any transformations
616 created by the input program. */
618 cancel_transformations (void)
621 for (i = 0; i < n_trns; i++)
624 t_trns[i]->free (t_trns[i]);
633 /* Creates a case source with class CLASS and auxiliary data AUX
634 and based on dictionary DICT. */
636 create_case_source (const struct case_source_class *class,
639 struct case_source *source = xmalloc (sizeof *source);
640 source->class = class;
645 /* Destroys case source SOURCE. It is the caller's responsible to
646 call the source's destroy function, if any. */
648 free_case_source (struct case_source *source)
652 if (source->class->destroy != NULL)
653 source->class->destroy (source);
658 /* Returns nonzero if a case source is "complex". */
660 case_source_is_complex (const struct case_source *source)
662 return source != NULL && (source->class == &input_program_source_class
663 || source->class == &file_type_source_class);
666 /* Returns nonzero if CLASS is the class of SOURCE. */
668 case_source_is_class (const struct case_source *source,
669 const struct case_source_class *class)
671 return source != NULL && source->class == class;
674 /* Creates a case sink to accept cases from the given DICT with
675 class CLASS and auxiliary data AUX. */
677 create_case_sink (const struct case_sink_class *class,
678 const struct dictionary *dict,
681 struct case_sink *sink = xmalloc (sizeof *sink);
683 sink->value_cnt = dict_get_compacted_value_cnt (dict);
688 /* Destroys case sink SINK. */
690 free_case_sink (struct case_sink *sink)
694 if (sink->class->destroy != NULL)
695 sink->class->destroy (sink);
700 /* Represents auxiliary data for handling SPLIT FILE. */
701 struct split_aux_data
703 size_t case_count; /* Number of cases so far. */
704 struct ccase prev_case; /* Data in previous case. */
706 /* Functions to call... */
707 void (*begin_func) (void *); /* ...before data. */
708 int (*proc_func) (struct ccase *, void *); /* ...with data. */
709 void (*end_func) (void *); /* ...after data. */
710 void *func_aux; /* Auxiliary data. */
713 static int equal_splits (const struct ccase *, const struct ccase *);
714 static int procedure_with_splits_callback (struct ccase *, void *);
715 static void dump_splits (struct ccase *);
717 /* Like procedure(), but it automatically breaks the case stream
718 into SPLIT FILE break groups. Before each group of cases with
719 identical SPLIT FILE variable values, BEGIN_FUNC is called.
720 Then PROC_FUNC is called with each case in the group.
721 END_FUNC is called when the group is finished. FUNC_AUX is
722 passed to each of the functions as auxiliary data.
724 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
725 and END_FUNC will be called at all.
727 If SPLIT FILE is not in effect, then there is one break group
728 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
729 will be called once. */
731 procedure_with_splits (void (*begin_func) (void *aux),
732 int (*proc_func) (struct ccase *, void *aux),
733 void (*end_func) (void *aux),
736 struct split_aux_data split_aux;
738 split_aux.case_count = 0;
739 case_nullify (&split_aux.prev_case);
740 split_aux.begin_func = begin_func;
741 split_aux.proc_func = proc_func;
742 split_aux.end_func = end_func;
743 split_aux.func_aux = func_aux;
746 internal_procedure (procedure_with_splits_callback, &split_aux);
747 if (split_aux.case_count > 0 && end_func != NULL)
749 close_active_file ();
751 case_destroy (&split_aux.prev_case);
754 /* procedure() callback used by procedure_with_splits(). */
756 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
758 struct split_aux_data *split_aux = split_aux_;
760 /* Start a new series if needed. */
761 if (split_aux->case_count == 0
762 || !equal_splits (c, &split_aux->prev_case))
764 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
765 split_aux->end_func (split_aux->func_aux);
768 case_destroy (&split_aux->prev_case);
769 case_clone (&split_aux->prev_case, c);
771 if (split_aux->begin_func != NULL)
772 split_aux->begin_func (split_aux->func_aux);
775 split_aux->case_count++;
776 if (split_aux->proc_func != NULL)
777 return split_aux->proc_func (c, split_aux->func_aux);
782 /* Compares the SPLIT FILE variables in cases A and B and returns
783 nonzero only if they differ. */
785 equal_splits (const struct ccase *a, const struct ccase *b)
787 return case_compare (a, b,
788 dict_get_split_vars (default_dict),
789 dict_get_split_cnt (default_dict)) == 0;
792 /* Dumps out the values of all the split variables for the case C. */
794 dump_splits (struct ccase *c)
796 struct variable *const *split;
801 split_cnt = dict_get_split_cnt (default_dict);
805 t = tab_create (3, split_cnt + 1, 0);
806 tab_dim (t, tab_natural_dimensions);
807 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
808 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
809 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
810 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
811 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
812 split = dict_get_split_vars (default_dict);
813 for (i = 0; i < split_cnt; i++)
815 struct variable *v = split[i];
819 assert (v->type == NUMERIC || v->type == ALPHA);
820 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
822 data_out (temp_buf, &v->print, case_data (c, v->fv));
824 temp_buf[v->print.w] = 0;
825 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
827 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
829 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
831 tab_flags (t, SOMF_NO_TITLE);
835 /* Represents auxiliary data for handling SPLIT FILE in a
836 multipass procedure. */
837 struct multipass_split_aux_data
839 struct ccase prev_case; /* Data in previous case. */
840 struct casefile *casefile; /* Accumulates data for a split. */
842 /* Function to call with the accumulated data. */
843 void (*split_func) (const struct casefile *, void *);
844 void *func_aux; /* Auxiliary data. */
847 static int multipass_split_callback (struct ccase *c, void *aux_);
848 static void multipass_split_output (struct multipass_split_aux_data *);
851 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
855 struct multipass_split_aux_data aux;
857 assert (split_func != NULL);
861 case_nullify (&aux.prev_case);
863 aux.split_func = split_func;
864 aux.func_aux = func_aux;
866 internal_procedure (multipass_split_callback, &aux);
867 if (aux.casefile != NULL)
868 multipass_split_output (&aux);
869 case_destroy (&aux.prev_case);
871 close_active_file ();
874 /* procedure() callback used by multipass_procedure_with_splits(). */
876 multipass_split_callback (struct ccase *c, void *aux_)
878 struct multipass_split_aux_data *aux = aux_;
880 /* Start a new series if needed. */
881 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
883 /* Pass any cases to split_func. */
884 if (aux->casefile != NULL)
885 multipass_split_output (aux);
887 /* Start a new casefile. */
888 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
890 /* Record split values. */
892 case_destroy (&aux->prev_case);
893 case_clone (&aux->prev_case, c);
896 casefile_append (aux->casefile, c);
902 multipass_split_output (struct multipass_split_aux_data *aux)
904 assert (aux->casefile != NULL);
905 aux->split_func (aux->casefile, aux->func_aux);
906 casefile_destroy (aux->casefile);
907 aux->casefile = NULL;