1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
42 #include "value-labels.h"
45 Virtual File Manager (vfm):
47 vfm is used to process data files. It uses the model that
48 data is read from one stream (the data source), processed,
49 then written to another (the data sink). The data source is
50 then deleted and the data sink becomes the data source for the
53 /* Procedure execution data. */
54 struct write_case_data
56 /* Function to call for each case. */
57 int (*proc_func) (struct ccase *, void *); /* Function. */
58 void *aux; /* Auxiliary data. */
60 struct ccase *trns_case; /* Case used for transformations. */
61 struct ccase *sink_case; /* Case written to sink, if
62 compaction is necessary. */
63 size_t cases_written; /* Cases output so far. */
64 size_t cases_analyzed; /* Cases passed to procedure so far. */
67 /* The current active file, from which cases are read. */
68 struct case_source *vfm_source;
70 /* The replacement active file, to which cases are written. */
71 struct case_sink *vfm_sink;
73 /* Nonzero if the case needs to have values deleted before being
74 stored, zero otherwise. */
75 static int compaction_necessary;
77 /* Time at which vfm was last invoked. */
78 time_t last_vfm_invocation;
81 int n_lag; /* Number of cases to lag. */
82 static int lag_count; /* Number of cases in lag_queue so far. */
83 static int lag_head; /* Index where next case will be added. */
84 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
86 static struct ccase *create_trns_case (struct dictionary *);
87 static void open_active_file (void);
88 static int write_case (struct write_case_data *wc_data);
89 static int execute_transformations (struct ccase *c,
90 struct trns_header **trns,
91 int first_idx, int last_idx,
93 static int filter_case (const struct ccase *c, int case_num);
94 static void lag_case (const struct ccase *c);
95 static void compact_case (struct ccase *dest, const struct ccase *src);
96 static void clear_case (struct ccase *c);
97 static void close_active_file (void);
99 /* Public functions. */
101 /* Reads the data from the input program and writes it to a new
102 active file. For each case we read from the input program, we
105 1. Execute permanent transformations. If these drop the case,
106 start the next case from step 1.
108 2. N OF CASES. If we have already written N cases, start the
109 next case from step 1.
111 3. Write case to replacement active file.
113 4. Execute temporary transformations. If these drop the case,
114 start the next case from step 1.
116 5. FILTER, PROCESS IF. If these drop the case, start the next
119 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
120 cases, start the next case from step 1.
122 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
124 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
126 static int recursive_call;
128 struct write_case_data wc_data;
130 assert (++recursive_call == 1);
132 wc_data.proc_func = proc_func;
134 wc_data.trns_case = create_trns_case (default_dict);
135 wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
136 wc_data.cases_written = 0;
138 last_vfm_invocation = time (NULL);
141 if (vfm_source != NULL)
142 vfm_source->class->read (vfm_source,
144 write_case, &wc_data);
145 close_active_file ();
147 free (wc_data.sink_case);
148 free (wc_data.trns_case);
150 assert (--recursive_call == 0);
153 /* Creates and returns a case, initializing it from the vectors
154 that say which `value's need to be initialized just once, and
155 which ones need to be re-initialized before every case. */
156 static struct ccase *
157 create_trns_case (struct dictionary *dict)
159 struct ccase *c = xmalloc (dict_get_case_size (dict));
160 size_t var_cnt = dict_get_var_cnt (dict);
163 for (i = 0; i < var_cnt; i++)
165 struct variable *v = dict_get_var (dict, i);
167 if (v->type == NUMERIC)
170 c->data[v->fv].f = 0.0;
172 c->data[v->fv].f = SYSMIS;
175 memset (c->data[v->fv].s, ' ', v->width);
180 /* Makes all preparations for reading from the data source and writing
183 open_active_file (void)
185 /* Make temp_dict refer to the dictionary right before data
190 temp_dict = default_dict;
193 /* Figure out compaction. */
194 compaction_necessary = (dict_get_next_value_idx (temp_dict)
195 != dict_get_compacted_value_cnt (temp_dict));
198 if (vfm_sink == NULL)
199 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
200 if (vfm_sink->class->open != NULL)
201 vfm_sink->class->open (vfm_sink);
203 /* Allocate memory for lag queue. */
210 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
211 for (i = 0; i < n_lag; i++)
212 lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
215 /* Close any unclosed DO IF or LOOP constructs. */
216 discard_ctl_stack ();
219 /* Transforms trns_case and writes it to the replacement active
220 file if advisable. Returns nonzero if more cases can be
221 accepted, zero otherwise. Do not call this function again
222 after it has returned zero once. */
224 write_case (struct write_case_data *wc_data)
226 /* Execute permanent transformations. */
227 if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
228 wc_data->cases_written + 1))
232 if (dict_get_case_limit (default_dict)
233 && wc_data->cases_written >= dict_get_case_limit (default_dict))
235 wc_data->cases_written++;
237 /* Write case to LAG queue. */
239 lag_case (wc_data->trns_case);
241 /* Write case to replacement active file. */
242 if (vfm_sink->class->write != NULL)
244 if (compaction_necessary)
246 compact_case (wc_data->sink_case, wc_data->trns_case);
247 vfm_sink->class->write (vfm_sink, wc_data->sink_case);
250 vfm_sink->class->write (vfm_sink, wc_data->trns_case);
253 /* Execute temporary transformations. */
254 if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
255 wc_data->cases_written))
258 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
259 if (filter_case (wc_data->trns_case, wc_data->cases_written)
260 || (dict_get_case_limit (temp_dict)
261 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
263 wc_data->cases_analyzed++;
265 /* Pass case to procedure. */
266 if (wc_data->proc_func != NULL)
267 wc_data->proc_func (wc_data->trns_case, wc_data->aux);
270 clear_case (wc_data->trns_case);
274 /* Transforms case C using the transformations in TRNS[] with
275 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
276 become case CASE_NUM (1-based) in the output file. Returns
277 zero if the case was filtered out by one of the
278 transformations, nonzero otherwise. */
280 execute_transformations (struct ccase *c,
281 struct trns_header **trns,
282 int first_idx, int last_idx,
287 for (idx = first_idx; idx != last_idx; )
289 int retval = trns[idx]->proc (trns[idx], c, case_num);
308 /* Returns nonzero if case C with case number CASE_NUM should be
309 exclude as specified on FILTER or PROCESS IF, otherwise
312 filter_case (const struct ccase *c, int case_num)
315 struct variable *filter_var = dict_get_filter (default_dict);
316 if (filter_var != NULL)
318 double f = c->data[filter_var->fv].f;
319 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
324 if (process_if_expr != NULL
325 && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
331 /* Add C to the lag queue. */
333 lag_case (const struct ccase *c)
335 if (lag_count < n_lag)
337 memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
338 if (++lag_head >= n_lag)
342 /* Copies case SRC to case DEST, compacting it in the process. */
344 compact_case (struct ccase *dest, const struct ccase *src)
350 assert (compaction_necessary);
352 /* Copy all the variables except scratch variables from SRC to
354 /* FIXME: this should be temp_dict not default_dict I guess. */
355 var_cnt = dict_get_var_cnt (default_dict);
356 for (i = 0; i < var_cnt; i++)
358 struct variable *v = dict_get_var (default_dict, i);
360 if (dict_class_from_id (v->name) == DC_SCRATCH)
363 if (v->type == NUMERIC)
364 dest->data[nval++] = src->data[v->fv];
367 int w = DIV_RND_UP (v->width, sizeof (union value));
369 memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
375 /* Clears the variables in C that need to be cleared between
378 clear_case (struct ccase *c)
380 size_t var_cnt = dict_get_var_cnt (default_dict);
383 for (i = 0; i < var_cnt; i++)
385 struct variable *v = dict_get_var (default_dict, i);
386 if (v->init && v->reinit)
388 if (v->type == NUMERIC)
389 c->data[v->fv].f = SYSMIS;
391 memset (c->data[v->fv].s, ' ', v->width);
396 /* Closes the active file. */
398 close_active_file (void)
400 /* Free memory for lag queue, and turn off lagging. */
405 for (i = 0; i < n_lag; i++)
411 /* Dictionary from before TEMPORARY becomes permanent.. */
414 dict_destroy (default_dict);
415 default_dict = temp_dict;
419 /* Finish compaction. */
420 if (compaction_necessary)
421 dict_compact_values (default_dict);
423 /* Free data source. */
424 if (vfm_source != NULL)
426 if (vfm_source->class->destroy != NULL)
427 vfm_source->class->destroy (vfm_source);
431 /* Old data sink becomes new data source. */
432 if (vfm_sink->class->make_source != NULL)
433 vfm_source = vfm_sink->class->make_source (vfm_sink);
436 if (vfm_sink->class->destroy != NULL)
437 vfm_sink->class->destroy (vfm_sink);
440 free_case_sink (vfm_sink);
443 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
444 and get rid of all the transformations. */
446 expr_free (process_if_expr);
447 process_if_expr = NULL;
448 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
449 dict_set_filter (default_dict, NULL);
450 dict_set_case_limit (default_dict, 0);
451 dict_clear_vectors (default_dict);
452 cancel_transformations ();
455 /* Storage case stream. */
457 /* Information about storage sink or source. */
458 struct storage_stream_info
460 struct casefile *casefile; /* Storage. */
463 /* Initializes a storage sink. */
465 storage_sink_open (struct case_sink *sink)
467 struct storage_stream_info *info;
469 sink->aux = info = xmalloc (sizeof *info);
470 info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
473 /* Destroys storage stream represented by INFO. */
475 destroy_storage_stream_info (struct storage_stream_info *info)
477 casefile_destroy (info->casefile);
481 /* Writes case C to the storage sink SINK. */
483 storage_sink_write (struct case_sink *sink, const struct ccase *c)
485 struct storage_stream_info *info = sink->aux;
487 casefile_append (info->casefile, c);
490 /* Destroys internal data in SINK. */
492 storage_sink_destroy (struct case_sink *sink)
494 destroy_storage_stream_info (sink->aux);
497 /* Closes and destroys the sink and returns a storage source to
498 read back the written data. */
499 static struct case_source *
500 storage_sink_make_source (struct case_sink *sink)
502 return create_case_source (&storage_source_class, sink->dict, sink->aux);
506 const struct case_sink_class storage_sink_class =
511 storage_sink_destroy,
512 storage_sink_make_source,
515 /* Storage source. */
517 /* Returns the number of cases that will be read by
518 storage_source_read(). */
520 storage_source_count (const struct case_source *source)
522 struct storage_stream_info *info = source->aux;
524 return casefile_get_case_cnt (info->casefile);
527 /* Reads all cases from the storage source and passes them one by one to
530 storage_source_read (struct case_source *source,
531 struct ccase *output_case,
532 write_case_func *write_case, write_case_data wc_data)
534 struct storage_stream_info *info = source->aux;
535 const struct ccase *casefile_case;
536 struct casereader *reader;
538 reader = casefile_get_reader (info->casefile);
539 while (casereader_read (reader, &casefile_case))
541 memcpy (output_case, casefile_case,
542 casefile_get_case_size (info->casefile));
543 write_case (wc_data);
545 casereader_destroy (reader);
548 /* Destroys the source's internal data. */
550 storage_source_destroy (struct case_source *source)
552 destroy_storage_stream_info (source->aux);
555 /* Storage source. */
556 const struct case_source_class storage_source_class =
559 storage_source_count,
561 storage_source_destroy,
565 storage_source_get_casefile (struct case_source *source)
567 struct storage_stream_info *info = source->aux;
569 assert (source->class == &storage_source_class);
570 return info->casefile;
573 /* Null sink. Used by a few procedures that keep track of output
574 themselves and would throw away anything that the sink
577 const struct case_sink_class null_sink_class =
586 /* Returns a pointer to the lagged case from N_BEFORE cases before the
587 current one, or NULL if there haven't been that many cases yet. */
589 lagged_case (int n_before)
591 assert (n_before <= n_lag);
592 if (n_before > lag_count)
596 int index = lag_head - n_before;
599 return lag_queue[index];
603 /* Appends TRNS to t_trns[], the list of all transformations to be
604 performed on data as it is read from the active file. */
606 add_transformation (struct trns_header * trns)
608 if (n_trns >= m_trns)
611 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
613 t_trns[n_trns] = trns;
614 trns->index = n_trns++;
617 /* Cancels all active transformations, including any transformations
618 created by the input program. */
620 cancel_transformations (void)
623 for (i = 0; i < n_trns; i++)
626 t_trns[i]->free (t_trns[i]);
637 /* Creates a case source with class CLASS and auxiliary data AUX
638 and based on dictionary DICT. */
640 create_case_source (const struct case_source_class *class,
641 const struct dictionary *dict,
644 struct case_source *source = xmalloc (sizeof *source);
645 source->class = class;
646 source->value_cnt = dict_get_next_value_idx (dict);
651 /* Returns nonzero if a case source is "complex". */
653 case_source_is_complex (const struct case_source *source)
655 return source != NULL && (source->class == &input_program_source_class
656 || source->class == &file_type_source_class);
659 /* Returns nonzero if CLASS is the class of SOURCE. */
661 case_source_is_class (const struct case_source *source,
662 const struct case_source_class *class)
664 return source != NULL && source->class == class;
667 /* Creates a case sink with class CLASS and auxiliary data
670 create_case_sink (const struct case_sink_class *class,
671 const struct dictionary *dict,
674 struct case_sink *sink = xmalloc (sizeof *sink);
677 sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
678 sink->value_cnt = dict_get_compacted_value_cnt (dict);
683 /* Destroys case sink SINK. It is the caller's responsible to
684 call the sink's destroy function, if any. */
686 free_case_sink (struct case_sink *sink)
688 free (sink->idx_to_fv);
692 /* Represents auxiliary data for handling SPLIT FILE. */
693 struct split_aux_data
695 size_t case_count; /* Number of cases so far. */
696 struct ccase *prev_case; /* Data in previous case. */
698 /* Functions to call... */
699 void (*begin_func) (void *); /* ...before data. */
700 int (*proc_func) (struct ccase *, void *); /* ...with data. */
701 void (*end_func) (void *); /* ...after data. */
702 void *func_aux; /* Auxiliary data. */
705 static int equal_splits (const struct ccase *, const struct ccase *);
706 static int procedure_with_splits_callback (struct ccase *, void *);
707 static void dump_splits (struct ccase *);
709 /* Like procedure(), but it automatically breaks the case stream
710 into SPLIT FILE break groups. Before each group of cases with
711 identical SPLIT FILE variable values, BEGIN_FUNC is called.
712 Then PROC_FUNC is called with each case in the group.
713 END_FUNC is called when the group is finished. FUNC_AUX is
714 passed to each of the functions as auxiliary data.
716 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
717 and END_FUNC will be called at all.
719 If SPLIT FILE is not in effect, then there is one break group
720 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
721 will be called once. */
723 procedure_with_splits (void (*begin_func) (void *aux),
724 int (*proc_func) (struct ccase *, void *aux),
725 void (*end_func) (void *aux),
728 struct split_aux_data split_aux;
730 split_aux.case_count = 0;
731 split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
732 split_aux.begin_func = begin_func;
733 split_aux.proc_func = proc_func;
734 split_aux.end_func = end_func;
735 split_aux.func_aux = func_aux;
737 procedure (procedure_with_splits_callback, &split_aux);
739 if (split_aux.case_count > 0 && end_func != NULL)
741 free (split_aux.prev_case);
744 /* procedure() callback used by procedure_with_splits(). */
746 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
748 struct split_aux_data *split_aux = split_aux_;
750 /* Start a new series if needed. */
751 if (split_aux->case_count == 0
752 || !equal_splits (c, split_aux->prev_case))
754 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
755 split_aux->end_func (split_aux->func_aux);
758 memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
760 if (split_aux->begin_func != NULL)
761 split_aux->begin_func (split_aux->func_aux);
764 split_aux->case_count++;
765 if (split_aux->proc_func != NULL)
766 return split_aux->proc_func (c, split_aux->func_aux);
771 /* Compares the SPLIT FILE variables in cases A and B and returns
772 nonzero only if they differ. */
774 equal_splits (const struct ccase *a, const struct ccase *b)
776 struct variable *const *split;
780 split = dict_get_split_vars (default_dict);
781 split_cnt = dict_get_split_cnt (default_dict);
782 for (i = 0; i < split_cnt; i++)
784 struct variable *v = split[i];
789 if (a->data[v->fv].f != b->data[v->fv].f)
793 if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
804 /* Dumps out the values of all the split variables for the case C. */
806 dump_splits (struct ccase *c)
808 struct variable *const *split;
813 split_cnt = dict_get_split_cnt (default_dict);
817 t = tab_create (3, split_cnt + 1, 0);
818 tab_dim (t, tab_natural_dimensions);
819 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
820 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
821 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
822 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
823 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
824 split = dict_get_split_vars (default_dict);
825 for (i = 0; i < split_cnt; i++)
827 struct variable *v = split[i];
831 assert (v->type == NUMERIC || v->type == ALPHA);
832 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
834 data_out (temp_buf, &v->print, &c->data[v->fv]);
836 temp_buf[v->print.w] = 0;
837 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
839 val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
841 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
843 tab_flags (t, SOMF_NO_TITLE);
847 /* Represents auxiliary data for handling SPLIT FILE in a
848 multipass procedure. */
849 struct multipass_split_aux_data
851 struct ccase *prev_case; /* Data in previous case. */
852 struct casefile *casefile; /* Accumulates data for a split. */
854 /* Function to call with the accumulated data. */
855 void (*split_func) (const struct casefile *, void *);
856 void *func_aux; /* Auxiliary data. */
859 static int multipass_split_callback (struct ccase *c, void *aux_);
860 static void multipass_split_output (struct multipass_split_aux_data *);
863 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
867 struct multipass_split_aux_data aux;
869 assert (split_func != NULL);
871 aux.prev_case = xmalloc (dict_get_case_size (default_dict));
873 aux.split_func = split_func;
874 aux.func_aux = func_aux;
876 procedure (multipass_split_callback, &aux);
878 if (aux.casefile != NULL)
879 multipass_split_output (&aux);
880 free (aux.prev_case);
883 /* procedure() callback used by multipass_procedure_with_splits(). */
885 multipass_split_callback (struct ccase *c, void *aux_)
887 struct multipass_split_aux_data *aux = aux_;
889 /* Start a new series if needed. */
890 if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
892 /* Pass any cases to split_func. */
893 if (aux->casefile != NULL)
894 multipass_split_output (aux);
896 /* Start a new casefile. */
897 aux->casefile = casefile_create (dict_get_case_size (default_dict));
899 /* Record split values. */
901 memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
904 casefile_append (aux->casefile, c);
910 multipass_split_output (struct multipass_split_aux_data *aux)
912 assert (aux->casefile != NULL);
913 aux->split_func (aux->casefile, aux->func_aux);
914 casefile_destroy (aux->casefile);
915 aux->casefile = NULL;