1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
33 #include "dictionary.h"
36 #include "expressions/public.h"
43 #include "value-labels.h"
46 #define _(msgid) gettext (msgid)
49 Virtual File Manager (vfm):
51 vfm is used to process data files. It uses the model that
52 data is read from one stream (the data source), processed,
53 then written to another (the data sink). The data source is
54 then deleted and the data sink becomes the data source for the
57 /* Procedure execution data. */
58 struct write_case_data
60 /* Function to call for each case. */
61 int (*proc_func) (struct ccase *, void *); /* Function. */
62 void *aux; /* Auxiliary data. */
64 struct ccase trns_case; /* Case used for transformations. */
65 struct ccase sink_case; /* Case written to sink, if
66 compaction is necessary. */
67 size_t cases_written; /* Cases output so far. */
68 size_t cases_analyzed; /* Cases passed to procedure so far. */
71 /* The current active file, from which cases are read. */
72 struct case_source *vfm_source;
74 /* The replacement active file, to which cases are written. */
75 struct case_sink *vfm_sink;
77 /* Nonzero if the case needs to have values deleted before being
78 stored, zero otherwise. */
79 static int compaction_necessary;
81 /* Time at which vfm was last invoked. */
82 time_t last_vfm_invocation;
85 int n_lag; /* Number of cases to lag. */
86 static int lag_count; /* Number of cases in lag_queue so far. */
87 static int lag_head; /* Index where next case will be added. */
88 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
90 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
92 static void create_trns_case (struct ccase *, struct dictionary *);
93 static void open_active_file (void);
94 static int write_case (struct write_case_data *wc_data);
95 static int execute_transformations (struct ccase *c,
96 struct trns_header **trns,
97 int first_idx, int last_idx,
99 static int filter_case (const struct ccase *c, int case_num);
100 static void lag_case (const struct ccase *c);
101 static void clear_case (struct ccase *c);
102 static void close_active_file (void);
104 /* Public functions. */
106 /* Reads the data from the input program and writes it to a new
107 active file. For each case we read from the input program, we
110 1. Execute permanent transformations. If these drop the case,
111 start the next case from step 1.
113 2. N OF CASES. If we have already written N cases, start the
114 next case from step 1.
116 3. Write case to replacement active file.
118 4. Execute temporary transformations. If these drop the case,
119 start the next case from step 1.
121 5. FILTER, PROCESS IF. If these drop the case, start the next
124 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
125 cases, start the next case from step 1.
127 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
129 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
131 if (proc_func == NULL
132 && case_source_is_class (vfm_source, &storage_source_class)
142 internal_procedure (proc_func, aux);
143 close_active_file ();
146 /* Executes a procedure, as procedure(), except that the caller
147 is responsible for calling open_active_file() and
148 close_active_file(). */
150 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
152 static int recursive_call;
154 struct write_case_data wc_data;
156 assert (++recursive_call == 1);
158 wc_data.proc_func = proc_func;
160 create_trns_case (&wc_data.trns_case, default_dict);
161 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
162 wc_data.cases_written = 0;
164 last_vfm_invocation = time (NULL);
166 if (vfm_source != NULL)
167 vfm_source->class->read (vfm_source,
169 write_case, &wc_data);
171 case_destroy (&wc_data.sink_case);
172 case_destroy (&wc_data.trns_case);
174 assert (--recursive_call == 0);
177 /* Creates and returns a case, initializing it from the vectors
178 that say which `value's need to be initialized just once, and
179 which ones need to be re-initialized before every case. */
181 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
183 size_t var_cnt = dict_get_var_cnt (dict);
186 case_create (trns_case, dict_get_next_value_idx (dict));
187 for (i = 0; i < var_cnt; i++)
189 struct variable *v = dict_get_var (dict, i);
190 union value *value = case_data_rw (trns_case, v->fv);
192 if (v->type == NUMERIC)
193 value->f = v->reinit ? 0.0 : SYSMIS;
195 memset (value->s, ' ', v->width);
199 /* Makes all preparations for reading from the data source and writing
202 open_active_file (void)
204 /* Make temp_dict refer to the dictionary right before data
209 temp_dict = default_dict;
212 /* Figure out compaction. */
213 compaction_necessary = (dict_get_next_value_idx (temp_dict)
214 != dict_get_compacted_value_cnt (temp_dict));
217 if (vfm_sink == NULL)
218 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
219 if (vfm_sink->class->open != NULL)
220 vfm_sink->class->open (vfm_sink);
222 /* Allocate memory for lag queue. */
229 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
230 for (i = 0; i < n_lag; i++)
231 case_nullify (&lag_queue[i]);
234 /* Close any unclosed DO IF or LOOP constructs. */
235 discard_ctl_stack ();
238 /* Transforms trns_case and writes it to the replacement active
239 file if advisable. Returns nonzero if more cases can be
240 accepted, zero otherwise. Do not call this function again
241 after it has returned zero once. */
243 write_case (struct write_case_data *wc_data)
245 /* Execute permanent transformations. */
246 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
247 wc_data->cases_written + 1))
251 if (dict_get_case_limit (default_dict)
252 && wc_data->cases_written >= dict_get_case_limit (default_dict))
254 wc_data->cases_written++;
256 /* Write case to LAG queue. */
258 lag_case (&wc_data->trns_case);
260 /* Write case to replacement active file. */
261 if (vfm_sink->class->write != NULL)
263 if (compaction_necessary)
265 dict_compact_case (temp_dict, &wc_data->sink_case,
266 &wc_data->trns_case);
267 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
270 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
273 /* Execute temporary transformations. */
274 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
275 wc_data->cases_written))
278 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
279 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
280 || (dict_get_case_limit (temp_dict)
281 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
283 wc_data->cases_analyzed++;
285 /* Pass case to procedure. */
286 if (wc_data->proc_func != NULL)
287 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
290 clear_case (&wc_data->trns_case);
294 /* Transforms case C using the transformations in TRNS[] with
295 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
296 become case CASE_NUM (1-based) in the output file. Returns
297 zero if the case was filtered out by one of the
298 transformations, nonzero otherwise. */
300 execute_transformations (struct ccase *c,
301 struct trns_header **trns,
302 int first_idx, int last_idx,
307 for (idx = first_idx; idx != last_idx; )
309 int retval = trns[idx]->proc (trns[idx], c, case_num);
328 /* Returns nonzero if case C with case number CASE_NUM should be
329 exclude as specified on FILTER or PROCESS IF, otherwise
332 filter_case (const struct ccase *c, int case_idx)
335 struct variable *filter_var = dict_get_filter (default_dict);
336 if (filter_var != NULL)
338 double f = case_num (c, filter_var->fv);
339 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
344 if (process_if_expr != NULL
345 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
351 /* Add C to the lag queue. */
353 lag_case (const struct ccase *c)
355 if (lag_count < n_lag)
357 case_destroy (&lag_queue[lag_head]);
358 case_clone (&lag_queue[lag_head], c);
359 if (++lag_head >= n_lag)
363 /* Clears the variables in C that need to be cleared between
366 clear_case (struct ccase *c)
368 size_t var_cnt = dict_get_var_cnt (default_dict);
371 for (i = 0; i < var_cnt; i++)
373 struct variable *v = dict_get_var (default_dict, i);
374 if (v->init && v->reinit)
376 if (v->type == NUMERIC)
377 case_data_rw (c, v->fv)->f = SYSMIS;
379 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
384 /* Closes the active file. */
386 close_active_file (void)
388 /* Free memory for lag queue, and turn off lagging. */
393 for (i = 0; i < n_lag; i++)
394 case_destroy (&lag_queue[i]);
399 /* Dictionary from before TEMPORARY becomes permanent.. */
402 dict_destroy (default_dict);
403 default_dict = temp_dict;
407 /* Finish compaction. */
408 if (compaction_necessary)
409 dict_compact_values (default_dict);
411 /* Free data source. */
412 free_case_source (vfm_source);
415 /* Old data sink becomes new data source. */
416 if (vfm_sink->class->make_source != NULL)
417 vfm_source = vfm_sink->class->make_source (vfm_sink);
418 free_case_sink (vfm_sink);
421 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
422 and get rid of all the transformations. */
424 expr_free (process_if_expr);
425 process_if_expr = NULL;
426 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
427 dict_set_filter (default_dict, NULL);
428 dict_set_case_limit (default_dict, 0);
429 dict_clear_vectors (default_dict);
430 cancel_transformations ();
433 /* Storage case stream. */
435 /* Information about storage sink or source. */
436 struct storage_stream_info
438 struct casefile *casefile; /* Storage. */
441 /* Initializes a storage sink. */
443 storage_sink_open (struct case_sink *sink)
445 struct storage_stream_info *info;
447 sink->aux = info = xmalloc (sizeof *info);
448 info->casefile = casefile_create (sink->value_cnt);
451 /* Destroys storage stream represented by INFO. */
453 destroy_storage_stream_info (struct storage_stream_info *info)
457 casefile_destroy (info->casefile);
462 /* Writes case C to the storage sink SINK. */
464 storage_sink_write (struct case_sink *sink, const struct ccase *c)
466 struct storage_stream_info *info = sink->aux;
468 casefile_append (info->casefile, c);
471 /* Destroys internal data in SINK. */
473 storage_sink_destroy (struct case_sink *sink)
475 destroy_storage_stream_info (sink->aux);
478 /* Closes the sink and returns a storage source to read back the
480 static struct case_source *
481 storage_sink_make_source (struct case_sink *sink)
483 struct case_source *source
484 = create_case_source (&storage_source_class, sink->aux);
490 const struct case_sink_class storage_sink_class =
495 storage_sink_destroy,
496 storage_sink_make_source,
499 /* Storage source. */
501 /* Returns the number of cases that will be read by
502 storage_source_read(). */
504 storage_source_count (const struct case_source *source)
506 struct storage_stream_info *info = source->aux;
508 return casefile_get_case_cnt (info->casefile);
511 /* Reads all cases from the storage source and passes them one by one to
514 storage_source_read (struct case_source *source,
515 struct ccase *output_case,
516 write_case_func *write_case, write_case_data wc_data)
518 struct storage_stream_info *info = source->aux;
519 struct ccase casefile_case;
520 struct casereader *reader;
522 for (reader = casefile_get_reader (info->casefile);
523 casereader_read (reader, &casefile_case);
524 case_destroy (&casefile_case))
526 case_copy (output_case, 0,
528 casefile_get_value_cnt (info->casefile));
529 write_case (wc_data);
531 casereader_destroy (reader);
534 /* Destroys the source's internal data. */
536 storage_source_destroy (struct case_source *source)
538 destroy_storage_stream_info (source->aux);
541 /* Storage source. */
542 const struct case_source_class storage_source_class =
545 storage_source_count,
547 storage_source_destroy,
551 storage_source_get_casefile (struct case_source *source)
553 struct storage_stream_info *info = source->aux;
555 assert (source->class == &storage_source_class);
556 return info->casefile;
560 storage_source_create (struct casefile *cf)
562 struct storage_stream_info *info;
564 info = xmalloc (sizeof *info);
567 return create_case_source (&storage_source_class, info);
570 /* Null sink. Used by a few procedures that keep track of output
571 themselves and would throw away anything that the sink
574 const struct case_sink_class null_sink_class =
583 /* Returns a pointer to the lagged case from N_BEFORE cases before the
584 current one, or NULL if there haven't been that many cases yet. */
586 lagged_case (int n_before)
588 assert (n_before >= 1 );
589 assert (n_before <= n_lag);
591 if (n_before <= lag_count)
593 int index = lag_head - n_before;
596 return &lag_queue[index];
602 /* Appends TRNS to t_trns[], the list of all transformations to be
603 performed on data as it is read from the active file. */
605 add_transformation (struct trns_header * trns)
607 if (n_trns >= m_trns)
610 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
612 t_trns[n_trns] = trns;
613 trns->index = n_trns++;
616 /* Cancels all active transformations, including any transformations
617 created by the input program. */
619 cancel_transformations (void)
622 for (i = 0; i < n_trns; i++)
625 t_trns[i]->free (t_trns[i]);
634 /* Creates a case source with class CLASS and auxiliary data AUX
635 and based on dictionary DICT. */
637 create_case_source (const struct case_source_class *class,
640 struct case_source *source = xmalloc (sizeof *source);
641 source->class = class;
646 /* Destroys case source SOURCE. It is the caller's responsible to
647 call the source's destroy function, if any. */
649 free_case_source (struct case_source *source)
653 if (source->class->destroy != NULL)
654 source->class->destroy (source);
659 /* Returns nonzero if a case source is "complex". */
661 case_source_is_complex (const struct case_source *source)
663 return source != NULL && (source->class == &input_program_source_class
664 || source->class == &file_type_source_class);
667 /* Returns nonzero if CLASS is the class of SOURCE. */
669 case_source_is_class (const struct case_source *source,
670 const struct case_source_class *class)
672 return source != NULL && source->class == class;
675 /* Creates a case sink to accept cases from the given DICT with
676 class CLASS and auxiliary data AUX. */
678 create_case_sink (const struct case_sink_class *class,
679 const struct dictionary *dict,
682 struct case_sink *sink = xmalloc (sizeof *sink);
684 sink->value_cnt = dict_get_compacted_value_cnt (dict);
689 /* Destroys case sink SINK. */
691 free_case_sink (struct case_sink *sink)
695 if (sink->class->destroy != NULL)
696 sink->class->destroy (sink);
701 /* Represents auxiliary data for handling SPLIT FILE. */
702 struct split_aux_data
704 size_t case_count; /* Number of cases so far. */
705 struct ccase prev_case; /* Data in previous case. */
707 /* Functions to call... */
708 void (*begin_func) (void *); /* ...before data. */
709 int (*proc_func) (struct ccase *, void *); /* ...with data. */
710 void (*end_func) (void *); /* ...after data. */
711 void *func_aux; /* Auxiliary data. */
714 static int equal_splits (const struct ccase *, const struct ccase *);
715 static int procedure_with_splits_callback (struct ccase *, void *);
716 static void dump_splits (struct ccase *);
718 /* Like procedure(), but it automatically breaks the case stream
719 into SPLIT FILE break groups. Before each group of cases with
720 identical SPLIT FILE variable values, BEGIN_FUNC is called.
721 Then PROC_FUNC is called with each case in the group.
722 END_FUNC is called when the group is finished. FUNC_AUX is
723 passed to each of the functions as auxiliary data.
725 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
726 and END_FUNC will be called at all.
728 If SPLIT FILE is not in effect, then there is one break group
729 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
730 will be called once. */
732 procedure_with_splits (void (*begin_func) (void *aux),
733 int (*proc_func) (struct ccase *, void *aux),
734 void (*end_func) (void *aux),
737 struct split_aux_data split_aux;
739 split_aux.case_count = 0;
740 case_nullify (&split_aux.prev_case);
741 split_aux.begin_func = begin_func;
742 split_aux.proc_func = proc_func;
743 split_aux.end_func = end_func;
744 split_aux.func_aux = func_aux;
747 internal_procedure (procedure_with_splits_callback, &split_aux);
748 if (split_aux.case_count > 0 && end_func != NULL)
750 close_active_file ();
752 case_destroy (&split_aux.prev_case);
755 /* procedure() callback used by procedure_with_splits(). */
757 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
759 struct split_aux_data *split_aux = split_aux_;
761 /* Start a new series if needed. */
762 if (split_aux->case_count == 0
763 || !equal_splits (c, &split_aux->prev_case))
765 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
766 split_aux->end_func (split_aux->func_aux);
769 case_destroy (&split_aux->prev_case);
770 case_clone (&split_aux->prev_case, c);
772 if (split_aux->begin_func != NULL)
773 split_aux->begin_func (split_aux->func_aux);
776 split_aux->case_count++;
777 if (split_aux->proc_func != NULL)
778 return split_aux->proc_func (c, split_aux->func_aux);
783 /* Compares the SPLIT FILE variables in cases A and B and returns
784 nonzero only if they differ. */
786 equal_splits (const struct ccase *a, const struct ccase *b)
788 return case_compare (a, b,
789 dict_get_split_vars (default_dict),
790 dict_get_split_cnt (default_dict)) == 0;
793 /* Dumps out the values of all the split variables for the case C. */
795 dump_splits (struct ccase *c)
797 struct variable *const *split;
802 split_cnt = dict_get_split_cnt (default_dict);
806 t = tab_create (3, split_cnt + 1, 0);
807 tab_dim (t, tab_natural_dimensions);
808 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
809 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
810 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
811 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
812 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
813 split = dict_get_split_vars (default_dict);
814 for (i = 0; i < split_cnt; i++)
816 struct variable *v = split[i];
820 assert (v->type == NUMERIC || v->type == ALPHA);
821 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
823 data_out (temp_buf, &v->print, case_data (c, v->fv));
825 temp_buf[v->print.w] = 0;
826 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
828 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
830 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
832 tab_flags (t, SOMF_NO_TITLE);
836 /* Represents auxiliary data for handling SPLIT FILE in a
837 multipass procedure. */
838 struct multipass_split_aux_data
840 struct ccase prev_case; /* Data in previous case. */
841 struct casefile *casefile; /* Accumulates data for a split. */
843 /* Function to call with the accumulated data. */
844 void (*split_func) (const struct casefile *, void *);
845 void *func_aux; /* Auxiliary data. */
848 static int multipass_split_callback (struct ccase *c, void *aux_);
849 static void multipass_split_output (struct multipass_split_aux_data *);
852 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
856 struct multipass_split_aux_data aux;
858 assert (split_func != NULL);
862 case_nullify (&aux.prev_case);
864 aux.split_func = split_func;
865 aux.func_aux = func_aux;
867 internal_procedure (multipass_split_callback, &aux);
868 if (aux.casefile != NULL)
869 multipass_split_output (&aux);
870 case_destroy (&aux.prev_case);
872 close_active_file ();
875 /* procedure() callback used by multipass_procedure_with_splits(). */
877 multipass_split_callback (struct ccase *c, void *aux_)
879 struct multipass_split_aux_data *aux = aux_;
881 /* Start a new series if needed. */
882 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
884 /* Pass any cases to split_func. */
885 if (aux->casefile != NULL)
886 multipass_split_output (aux);
888 /* Start a new casefile. */
889 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
891 /* Record split values. */
893 case_destroy (&aux->prev_case);
894 case_clone (&aux->prev_case, c);
897 casefile_append (aux->casefile, c);
903 multipass_split_output (struct multipass_split_aux_data *aux)
905 assert (aux->casefile != NULL);
906 aux->split_func (aux->casefile, aux->func_aux);
907 casefile_destroy (aux->casefile);
908 aux->casefile = NULL;