1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
33 #include "dictionary.h"
36 #include "expressions/public.h"
43 #include "value-labels.h"
46 Virtual File Manager (vfm):
48 vfm is used to process data files. It uses the model that
49 data is read from one stream (the data source), processed,
50 then written to another (the data sink). The data source is
51 then deleted and the data sink becomes the data source for the
54 /* Procedure execution data. */
55 struct write_case_data
57 /* Function to call for each case. */
58 int (*proc_func) (struct ccase *, void *); /* Function. */
59 void *aux; /* Auxiliary data. */
61 struct ccase trns_case; /* Case used for transformations. */
62 struct ccase sink_case; /* Case written to sink, if
63 compaction is necessary. */
64 size_t cases_written; /* Cases output so far. */
65 size_t cases_analyzed; /* Cases passed to procedure so far. */
68 /* The current active file, from which cases are read. */
69 struct case_source *vfm_source;
71 /* The replacement active file, to which cases are written. */
72 struct case_sink *vfm_sink;
74 /* Nonzero if the case needs to have values deleted before being
75 stored, zero otherwise. */
76 static int compaction_necessary;
78 /* Time at which vfm was last invoked. */
79 time_t last_vfm_invocation;
82 int n_lag; /* Number of cases to lag. */
83 static int lag_count; /* Number of cases in lag_queue so far. */
84 static int lag_head; /* Index where next case will be added. */
85 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
87 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
89 static void create_trns_case (struct ccase *, struct dictionary *);
90 static void open_active_file (void);
91 static int write_case (struct write_case_data *wc_data);
92 static int execute_transformations (struct ccase *c,
93 struct trns_header **trns,
94 int first_idx, int last_idx,
96 static int filter_case (const struct ccase *c, int case_num);
97 static void lag_case (const struct ccase *c);
98 static void compact_case (struct ccase *dest, const struct ccase *src);
99 static void clear_case (struct ccase *c);
100 static void close_active_file (void);
102 /* Public functions. */
104 /* Reads the data from the input program and writes it to a new
105 active file. For each case we read from the input program, we
108 1. Execute permanent transformations. If these drop the case,
109 start the next case from step 1.
111 2. N OF CASES. If we have already written N cases, start the
112 next case from step 1.
114 3. Write case to replacement active file.
116 4. Execute temporary transformations. If these drop the case,
117 start the next case from step 1.
119 5. FILTER, PROCESS IF. If these drop the case, start the next
122 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
123 cases, start the next case from step 1.
125 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
127 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
129 if (proc_func == NULL
130 && case_source_is_class (vfm_source, &storage_source_class)
140 internal_procedure (proc_func, aux);
141 close_active_file ();
144 /* Executes a procedure, as procedure(), except that the caller
145 is responsible for calling open_active_file() and
146 close_active_file(). */
148 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
150 static int recursive_call;
152 struct write_case_data wc_data;
154 assert (++recursive_call == 1);
156 wc_data.proc_func = proc_func;
158 create_trns_case (&wc_data.trns_case, default_dict);
159 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
160 wc_data.cases_written = 0;
162 last_vfm_invocation = time (NULL);
164 if (vfm_source != NULL)
165 vfm_source->class->read (vfm_source,
167 write_case, &wc_data);
169 case_destroy (&wc_data.sink_case);
170 case_destroy (&wc_data.trns_case);
172 assert (--recursive_call == 0);
175 /* Creates and returns a case, initializing it from the vectors
176 that say which `value's need to be initialized just once, and
177 which ones need to be re-initialized before every case. */
179 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
181 size_t var_cnt = dict_get_var_cnt (dict);
184 case_create (trns_case, dict_get_next_value_idx (dict));
185 for (i = 0; i < var_cnt; i++)
187 struct variable *v = dict_get_var (dict, i);
188 union value *value = case_data_rw (trns_case, v->fv);
190 if (v->type == NUMERIC)
191 value->f = v->reinit ? 0.0 : SYSMIS;
193 memset (value->s, ' ', v->width);
197 /* Makes all preparations for reading from the data source and writing
200 open_active_file (void)
202 /* Make temp_dict refer to the dictionary right before data
207 temp_dict = default_dict;
210 /* Figure out compaction. */
211 compaction_necessary = (dict_get_next_value_idx (temp_dict)
212 != dict_get_compacted_value_cnt (temp_dict));
215 if (vfm_sink == NULL)
216 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
217 if (vfm_sink->class->open != NULL)
218 vfm_sink->class->open (vfm_sink);
220 /* Allocate memory for lag queue. */
227 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
228 for (i = 0; i < n_lag; i++)
229 case_nullify (&lag_queue[i]);
232 /* Close any unclosed DO IF or LOOP constructs. */
233 discard_ctl_stack ();
236 /* Transforms trns_case and writes it to the replacement active
237 file if advisable. Returns nonzero if more cases can be
238 accepted, zero otherwise. Do not call this function again
239 after it has returned zero once. */
241 write_case (struct write_case_data *wc_data)
243 /* Execute permanent transformations. */
244 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
245 wc_data->cases_written + 1))
249 if (dict_get_case_limit (default_dict)
250 && wc_data->cases_written >= dict_get_case_limit (default_dict))
252 wc_data->cases_written++;
254 /* Write case to LAG queue. */
256 lag_case (&wc_data->trns_case);
258 /* Write case to replacement active file. */
259 if (vfm_sink->class->write != NULL)
261 if (compaction_necessary)
263 compact_case (&wc_data->sink_case, &wc_data->trns_case);
264 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
267 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
270 /* Execute temporary transformations. */
271 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
272 wc_data->cases_written))
275 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
276 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
277 || (dict_get_case_limit (temp_dict)
278 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
280 wc_data->cases_analyzed++;
282 /* Pass case to procedure. */
283 if (wc_data->proc_func != NULL)
284 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
287 clear_case (&wc_data->trns_case);
291 /* Transforms case C using the transformations in TRNS[] with
292 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
293 become case CASE_NUM (1-based) in the output file. Returns
294 zero if the case was filtered out by one of the
295 transformations, nonzero otherwise. */
297 execute_transformations (struct ccase *c,
298 struct trns_header **trns,
299 int first_idx, int last_idx,
304 for (idx = first_idx; idx != last_idx; )
306 int retval = trns[idx]->proc (trns[idx], c, case_num);
325 /* Returns nonzero if case C with case number CASE_NUM should be
326 exclude as specified on FILTER or PROCESS IF, otherwise
329 filter_case (const struct ccase *c, int case_idx)
332 struct variable *filter_var = dict_get_filter (default_dict);
333 if (filter_var != NULL)
335 double f = case_num (c, filter_var->fv);
336 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
341 if (process_if_expr != NULL
342 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
348 /* Add C to the lag queue. */
350 lag_case (const struct ccase *c)
352 if (lag_count < n_lag)
354 case_destroy (&lag_queue[lag_head]);
355 case_clone (&lag_queue[lag_head], c);
356 if (++lag_head >= n_lag)
360 /* Copies case SRC to case DEST, compacting it in the process. */
362 compact_case (struct ccase *dest, const struct ccase *src)
368 assert (compaction_necessary);
370 /* Copy all the variables except scratch variables from SRC to
372 /* FIXME: this should be temp_dict not default_dict I guess. */
373 var_cnt = dict_get_var_cnt (default_dict);
374 for (i = 0; i < var_cnt; i++)
376 struct variable *v = dict_get_var (default_dict, i);
378 if (dict_class_from_id (v->name) == DC_SCRATCH)
381 if (v->type == NUMERIC)
383 case_data_rw (dest, nval)->f = case_num (src, v->fv);
388 int w = DIV_RND_UP (v->width, sizeof (union value));
390 memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
391 w * sizeof (union value));
397 /* Clears the variables in C that need to be cleared between
400 clear_case (struct ccase *c)
402 size_t var_cnt = dict_get_var_cnt (default_dict);
405 for (i = 0; i < var_cnt; i++)
407 struct variable *v = dict_get_var (default_dict, i);
408 if (v->init && v->reinit)
410 if (v->type == NUMERIC)
411 case_data_rw (c, v->fv)->f = SYSMIS;
413 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
418 /* Closes the active file. */
420 close_active_file (void)
422 /* Free memory for lag queue, and turn off lagging. */
427 for (i = 0; i < n_lag; i++)
428 case_destroy (&lag_queue[i]);
433 /* Dictionary from before TEMPORARY becomes permanent.. */
436 dict_destroy (default_dict);
437 default_dict = temp_dict;
441 /* Finish compaction. */
442 if (compaction_necessary)
443 dict_compact_values (default_dict);
445 /* Free data source. */
446 if (vfm_source != NULL)
448 free_case_source (vfm_source);
452 /* Old data sink becomes new data source. */
453 if (vfm_sink->class->make_source != NULL)
454 vfm_source = vfm_sink->class->make_source (vfm_sink);
455 free_case_sink (vfm_sink);
458 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
459 and get rid of all the transformations. */
461 expr_free (process_if_expr);
462 process_if_expr = NULL;
463 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
464 dict_set_filter (default_dict, NULL);
465 dict_set_case_limit (default_dict, 0);
466 dict_clear_vectors (default_dict);
467 cancel_transformations ();
470 /* Storage case stream. */
472 /* Information about storage sink or source. */
473 struct storage_stream_info
475 struct casefile *casefile; /* Storage. */
478 /* Initializes a storage sink. */
480 storage_sink_open (struct case_sink *sink)
482 struct storage_stream_info *info;
484 sink->aux = info = xmalloc (sizeof *info);
485 info->casefile = casefile_create (sink->value_cnt);
488 /* Destroys storage stream represented by INFO. */
490 destroy_storage_stream_info (struct storage_stream_info *info)
494 casefile_destroy (info->casefile);
499 /* Writes case C to the storage sink SINK. */
501 storage_sink_write (struct case_sink *sink, const struct ccase *c)
503 struct storage_stream_info *info = sink->aux;
505 casefile_append (info->casefile, c);
508 /* Destroys internal data in SINK. */
510 storage_sink_destroy (struct case_sink *sink)
512 destroy_storage_stream_info (sink->aux);
515 /* Closes the sink and returns a storage source to read back the
517 static struct case_source *
518 storage_sink_make_source (struct case_sink *sink)
520 struct case_source *source
521 = create_case_source (&storage_source_class, sink->dict, sink->aux);
527 const struct case_sink_class storage_sink_class =
532 storage_sink_destroy,
533 storage_sink_make_source,
536 /* Storage source. */
538 /* Returns the number of cases that will be read by
539 storage_source_read(). */
541 storage_source_count (const struct case_source *source)
543 struct storage_stream_info *info = source->aux;
545 return casefile_get_case_cnt (info->casefile);
548 /* Reads all cases from the storage source and passes them one by one to
551 storage_source_read (struct case_source *source,
552 struct ccase *output_case,
553 write_case_func *write_case, write_case_data wc_data)
555 struct storage_stream_info *info = source->aux;
556 struct ccase casefile_case;
557 struct casereader *reader;
559 for (reader = casefile_get_reader (info->casefile);
560 casereader_read (reader, &casefile_case);
561 case_destroy (&casefile_case))
563 case_copy (output_case, 0,
565 casefile_get_value_cnt (info->casefile));
566 write_case (wc_data);
568 casereader_destroy (reader);
571 /* Destroys the source's internal data. */
573 storage_source_destroy (struct case_source *source)
575 destroy_storage_stream_info (source->aux);
578 /* Storage source. */
579 const struct case_source_class storage_source_class =
582 storage_source_count,
584 storage_source_destroy,
588 storage_source_get_casefile (struct case_source *source)
590 struct storage_stream_info *info = source->aux;
592 assert (source->class == &storage_source_class);
593 return info->casefile;
597 storage_source_create (struct casefile *cf, const struct dictionary *dict)
599 struct storage_stream_info *info;
601 info = xmalloc (sizeof *info);
604 return create_case_source (&storage_source_class, dict, info);
607 /* Null sink. Used by a few procedures that keep track of output
608 themselves and would throw away anything that the sink
611 const struct case_sink_class null_sink_class =
620 /* Returns a pointer to the lagged case from N_BEFORE cases before the
621 current one, or NULL if there haven't been that many cases yet. */
623 lagged_case (int n_before)
625 assert (n_before >= 1 && n_before <= n_lag);
626 if (n_before <= lag_count)
628 int index = lag_head - n_before;
631 return &lag_queue[index];
637 /* Appends TRNS to t_trns[], the list of all transformations to be
638 performed on data as it is read from the active file. */
640 add_transformation (struct trns_header * trns)
642 if (n_trns >= m_trns)
645 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
647 t_trns[n_trns] = trns;
648 trns->index = n_trns++;
651 /* Cancels all active transformations, including any transformations
652 created by the input program. */
654 cancel_transformations (void)
657 for (i = 0; i < n_trns; i++)
660 t_trns[i]->free (t_trns[i]);
669 /* Creates a case source with class CLASS and auxiliary data AUX
670 and based on dictionary DICT. */
672 create_case_source (const struct case_source_class *class,
673 const struct dictionary *dict,
676 struct case_source *source = xmalloc (sizeof *source);
677 source->class = class;
678 source->value_cnt = dict_get_next_value_idx (dict);
683 /* Destroys case source SOURCE. It is the caller's responsible to
684 call the source's destroy function, if any. */
686 free_case_source (struct case_source *source)
690 if (source->class->destroy != NULL)
691 source->class->destroy (source);
696 /* Returns nonzero if a case source is "complex". */
698 case_source_is_complex (const struct case_source *source)
700 return source != NULL && (source->class == &input_program_source_class
701 || source->class == &file_type_source_class);
704 /* Returns nonzero if CLASS is the class of SOURCE. */
706 case_source_is_class (const struct case_source *source,
707 const struct case_source_class *class)
709 return source != NULL && source->class == class;
712 /* Creates a case sink with class CLASS and auxiliary data
715 create_case_sink (const struct case_sink_class *class,
716 const struct dictionary *dict,
719 struct case_sink *sink = xmalloc (sizeof *sink);
722 sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
723 sink->value_cnt = dict_get_compacted_value_cnt (dict);
728 /* Destroys case sink SINK. */
730 free_case_sink (struct case_sink *sink)
734 if (sink->class->destroy != NULL)
735 sink->class->destroy (sink);
736 free (sink->idx_to_fv);
741 /* Represents auxiliary data for handling SPLIT FILE. */
742 struct split_aux_data
744 size_t case_count; /* Number of cases so far. */
745 struct ccase prev_case; /* Data in previous case. */
747 /* Functions to call... */
748 void (*begin_func) (void *); /* ...before data. */
749 int (*proc_func) (struct ccase *, void *); /* ...with data. */
750 void (*end_func) (void *); /* ...after data. */
751 void *func_aux; /* Auxiliary data. */
754 static int equal_splits (const struct ccase *, const struct ccase *);
755 static int procedure_with_splits_callback (struct ccase *, void *);
756 static void dump_splits (struct ccase *);
758 /* Like procedure(), but it automatically breaks the case stream
759 into SPLIT FILE break groups. Before each group of cases with
760 identical SPLIT FILE variable values, BEGIN_FUNC is called.
761 Then PROC_FUNC is called with each case in the group.
762 END_FUNC is called when the group is finished. FUNC_AUX is
763 passed to each of the functions as auxiliary data.
765 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
766 and END_FUNC will be called at all.
768 If SPLIT FILE is not in effect, then there is one break group
769 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
770 will be called once. */
772 procedure_with_splits (void (*begin_func) (void *aux),
773 int (*proc_func) (struct ccase *, void *aux),
774 void (*end_func) (void *aux),
777 struct split_aux_data split_aux;
779 split_aux.case_count = 0;
780 case_nullify (&split_aux.prev_case);
781 split_aux.begin_func = begin_func;
782 split_aux.proc_func = proc_func;
783 split_aux.end_func = end_func;
784 split_aux.func_aux = func_aux;
786 procedure (procedure_with_splits_callback, &split_aux);
788 if (split_aux.case_count > 0 && end_func != NULL)
790 case_destroy (&split_aux.prev_case);
793 /* procedure() callback used by procedure_with_splits(). */
795 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
797 struct split_aux_data *split_aux = split_aux_;
799 /* Start a new series if needed. */
800 if (split_aux->case_count == 0
801 || !equal_splits (c, &split_aux->prev_case))
803 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
804 split_aux->end_func (split_aux->func_aux);
807 case_destroy (&split_aux->prev_case);
808 case_clone (&split_aux->prev_case, c);
810 if (split_aux->begin_func != NULL)
811 split_aux->begin_func (split_aux->func_aux);
814 split_aux->case_count++;
815 if (split_aux->proc_func != NULL)
816 return split_aux->proc_func (c, split_aux->func_aux);
821 /* Compares the SPLIT FILE variables in cases A and B and returns
822 nonzero only if they differ. */
824 equal_splits (const struct ccase *a, const struct ccase *b)
826 struct variable *const *split;
830 split = dict_get_split_vars (default_dict);
831 split_cnt = dict_get_split_cnt (default_dict);
832 for (i = 0; i < split_cnt; i++)
834 struct variable *v = split[i];
839 if (case_num (a, v->fv) != case_num (b, v->fv))
843 if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
854 /* Dumps out the values of all the split variables for the case C. */
856 dump_splits (struct ccase *c)
858 struct variable *const *split;
863 split_cnt = dict_get_split_cnt (default_dict);
867 t = tab_create (3, split_cnt + 1, 0);
868 tab_dim (t, tab_natural_dimensions);
869 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
870 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
871 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
872 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
873 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
874 split = dict_get_split_vars (default_dict);
875 for (i = 0; i < split_cnt; i++)
877 struct variable *v = split[i];
881 assert (v->type == NUMERIC || v->type == ALPHA);
882 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
884 data_out (temp_buf, &v->print, case_data (c, v->fv));
886 temp_buf[v->print.w] = 0;
887 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
889 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
891 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
893 tab_flags (t, SOMF_NO_TITLE);
897 /* Represents auxiliary data for handling SPLIT FILE in a
898 multipass procedure. */
899 struct multipass_split_aux_data
901 struct ccase prev_case; /* Data in previous case. */
902 struct casefile *casefile; /* Accumulates data for a split. */
904 /* Function to call with the accumulated data. */
905 void (*split_func) (const struct casefile *, void *);
906 void *func_aux; /* Auxiliary data. */
909 static int multipass_split_callback (struct ccase *c, void *aux_);
910 static void multipass_split_output (struct multipass_split_aux_data *);
913 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
917 struct multipass_split_aux_data aux;
919 assert (split_func != NULL);
923 case_nullify (&aux.prev_case);
925 aux.split_func = split_func;
926 aux.func_aux = func_aux;
928 internal_procedure (multipass_split_callback, &aux);
929 if (aux.casefile != NULL)
930 multipass_split_output (&aux);
931 case_destroy (&aux.prev_case);
933 close_active_file ();
936 /* procedure() callback used by multipass_procedure_with_splits(). */
938 multipass_split_callback (struct ccase *c, void *aux_)
940 struct multipass_split_aux_data *aux = aux_;
942 /* Start a new series if needed. */
943 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
945 /* Pass any cases to split_func. */
946 if (aux->casefile != NULL)
947 multipass_split_output (aux);
949 /* Start a new casefile. */
950 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
952 /* Record split values. */
954 case_destroy (&aux->prev_case);
955 case_clone (&aux->prev_case, c);
958 casefile_append (aux->casefile, c);
964 multipass_split_output (struct multipass_split_aux_data *aux)
966 assert (aux->casefile != NULL);
967 aux->split_func (aux->casefile, aux->func_aux);
968 casefile_destroy (aux->casefile);
969 aux->casefile = NULL;