1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
34 #include "dictionary.h"
35 #include "ctl-stack.h"
37 #include "expressions/public.h"
44 #include "value-labels.h"
47 #define _(msgid) gettext (msgid)
50 Virtual File Manager (vfm):
52 vfm is used to process data files. It uses the model that
53 data is read from one stream (the data source), processed,
54 then written to another (the data sink). The data source is
55 then deleted and the data sink becomes the data source for the
58 /* Procedure execution data. */
59 struct write_case_data
61 /* Function to call for each case. */
62 int (*proc_func) (struct ccase *, void *); /* Function. */
63 void *aux; /* Auxiliary data. */
65 struct ccase trns_case; /* Case used for transformations. */
66 struct ccase sink_case; /* Case written to sink, if
67 compaction is necessary. */
68 size_t cases_written; /* Cases output so far. */
69 size_t cases_analyzed; /* Cases passed to procedure so far. */
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
78 /* Nonzero if the case needs to have values deleted before being
79 stored, zero otherwise. */
80 static int compaction_necessary;
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
86 int n_lag; /* Number of cases to lag. */
87 static int lag_count; /* Number of cases in lag_queue so far. */
88 static int lag_head; /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
91 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static int write_case (struct write_case_data *wc_data);
96 static int execute_transformations (struct ccase *c,
97 struct transformation *trns,
98 int first_idx, int last_idx,
100 static int filter_case (const struct ccase *c, int case_num);
101 static void lag_case (const struct ccase *c);
102 static void clear_case (struct ccase *c);
103 static void close_active_file (void);
105 /* Public functions. */
107 /* Reads the data from the input program and writes it to a new
108 active file. For each case we read from the input program, we
111 1. Execute permanent transformations. If these drop the case,
112 start the next case from step 1.
114 2. N OF CASES. If we have already written N cases, start the
115 next case from step 1.
117 3. Write case to replacement active file.
119 4. Execute temporary transformations. If these drop the case,
120 start the next case from step 1.
122 5. FILTER, PROCESS IF. If these drop the case, start the next
125 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
126 cases, start the next case from step 1.
128 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
130 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
132 if (proc_func == NULL
133 && case_source_is_class (vfm_source, &storage_source_class)
143 internal_procedure (proc_func, aux);
144 close_active_file ();
147 /* Executes a procedure, as procedure(), except that the caller
148 is responsible for calling open_active_file() and
149 close_active_file(). */
151 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
153 static int recursive_call;
155 struct write_case_data wc_data;
157 assert (++recursive_call == 1);
159 wc_data.proc_func = proc_func;
161 create_trns_case (&wc_data.trns_case, default_dict);
162 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
163 wc_data.cases_written = 0;
165 last_vfm_invocation = time (NULL);
167 if (vfm_source != NULL)
168 vfm_source->class->read (vfm_source,
170 write_case, &wc_data);
172 case_destroy (&wc_data.sink_case);
173 case_destroy (&wc_data.trns_case);
175 assert (--recursive_call == 0);
178 /* Creates and returns a case, initializing it from the vectors
179 that say which `value's need to be initialized just once, and
180 which ones need to be re-initialized before every case. */
182 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
184 size_t var_cnt = dict_get_var_cnt (dict);
187 case_create (trns_case, dict_get_next_value_idx (dict));
188 for (i = 0; i < var_cnt; i++)
190 struct variable *v = dict_get_var (dict, i);
191 union value *value = case_data_rw (trns_case, v->fv);
193 if (v->type == NUMERIC)
194 value->f = v->reinit ? 0.0 : SYSMIS;
196 memset (value->s, ' ', v->width);
200 /* Makes all preparations for reading from the data source and writing
203 open_active_file (void)
205 /* Make temp_dict refer to the dictionary right before data
210 temp_dict = default_dict;
213 /* Figure out compaction. */
214 compaction_necessary = (dict_get_next_value_idx (temp_dict)
215 != dict_get_compacted_value_cnt (temp_dict));
218 if (vfm_sink == NULL)
219 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
220 if (vfm_sink->class->open != NULL)
221 vfm_sink->class->open (vfm_sink);
223 /* Allocate memory for lag queue. */
230 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
231 for (i = 0; i < n_lag; i++)
232 case_nullify (&lag_queue[i]);
235 /* Close any unclosed DO IF or LOOP constructs. */
239 /* Transforms trns_case and writes it to the replacement active
240 file if advisable. Returns nonzero if more cases can be
241 accepted, zero otherwise. Do not call this function again
242 after it has returned zero once. */
244 write_case (struct write_case_data *wc_data)
246 /* Execute permanent transformations. */
247 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
248 wc_data->cases_written + 1))
252 if (dict_get_case_limit (default_dict)
253 && wc_data->cases_written >= dict_get_case_limit (default_dict))
255 wc_data->cases_written++;
257 /* Write case to LAG queue. */
259 lag_case (&wc_data->trns_case);
261 /* Write case to replacement active file. */
262 if (vfm_sink->class->write != NULL)
264 if (compaction_necessary)
266 dict_compact_case (temp_dict, &wc_data->sink_case,
267 &wc_data->trns_case);
268 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
271 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
274 /* Execute temporary transformations. */
275 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
276 wc_data->cases_written))
279 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
280 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
281 || (dict_get_case_limit (temp_dict)
282 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
284 wc_data->cases_analyzed++;
286 /* Pass case to procedure. */
287 if (wc_data->proc_func != NULL)
288 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
291 clear_case (&wc_data->trns_case);
295 /* Transforms case C using the transformations in TRNS[] with
296 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
297 become case CASE_NUM (1-based) in the output file. Returns
298 zero if the case was filtered out by one of the
299 transformations, nonzero otherwise. */
301 execute_transformations (struct ccase *c,
302 struct transformation *trns,
303 int first_idx, int last_idx,
308 for (idx = first_idx; idx != last_idx; )
310 struct transformation *t = &trns[idx];
311 int retval = t->proc (t->private, c, case_num);
330 /* Returns nonzero if case C with case number CASE_NUM should be
331 exclude as specified on FILTER or PROCESS IF, otherwise
334 filter_case (const struct ccase *c, int case_idx)
337 struct variable *filter_var = dict_get_filter (default_dict);
338 if (filter_var != NULL)
340 double f = case_num (c, filter_var->fv);
341 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
346 if (process_if_expr != NULL
347 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
353 /* Add C to the lag queue. */
355 lag_case (const struct ccase *c)
357 if (lag_count < n_lag)
359 case_destroy (&lag_queue[lag_head]);
360 case_clone (&lag_queue[lag_head], c);
361 if (++lag_head >= n_lag)
365 /* Clears the variables in C that need to be cleared between
368 clear_case (struct ccase *c)
370 size_t var_cnt = dict_get_var_cnt (default_dict);
373 for (i = 0; i < var_cnt; i++)
375 struct variable *v = dict_get_var (default_dict, i);
376 if (v->init && v->reinit)
378 if (v->type == NUMERIC)
379 case_data_rw (c, v->fv)->f = SYSMIS;
381 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
386 /* Closes the active file. */
388 close_active_file (void)
390 /* Free memory for lag queue, and turn off lagging. */
395 for (i = 0; i < n_lag; i++)
396 case_destroy (&lag_queue[i]);
401 /* Dictionary from before TEMPORARY becomes permanent.. */
404 dict_destroy (default_dict);
405 default_dict = temp_dict;
409 /* Finish compaction. */
410 if (compaction_necessary)
411 dict_compact_values (default_dict);
413 /* Free data source. */
414 free_case_source (vfm_source);
417 /* Old data sink becomes new data source. */
418 if (vfm_sink->class->make_source != NULL)
419 vfm_source = vfm_sink->class->make_source (vfm_sink);
420 free_case_sink (vfm_sink);
423 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
424 and get rid of all the transformations. */
426 expr_free (process_if_expr);
427 process_if_expr = NULL;
428 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
429 dict_set_filter (default_dict, NULL);
430 dict_set_case_limit (default_dict, 0);
431 dict_clear_vectors (default_dict);
432 cancel_transformations ();
435 /* Storage case stream. */
437 /* Information about storage sink or source. */
438 struct storage_stream_info
440 struct casefile *casefile; /* Storage. */
443 /* Initializes a storage sink. */
445 storage_sink_open (struct case_sink *sink)
447 struct storage_stream_info *info;
449 sink->aux = info = xmalloc (sizeof *info);
450 info->casefile = casefile_create (sink->value_cnt);
453 /* Destroys storage stream represented by INFO. */
455 destroy_storage_stream_info (struct storage_stream_info *info)
459 casefile_destroy (info->casefile);
464 /* Writes case C to the storage sink SINK. */
466 storage_sink_write (struct case_sink *sink, const struct ccase *c)
468 struct storage_stream_info *info = sink->aux;
470 casefile_append (info->casefile, c);
473 /* Destroys internal data in SINK. */
475 storage_sink_destroy (struct case_sink *sink)
477 destroy_storage_stream_info (sink->aux);
480 /* Closes the sink and returns a storage source to read back the
482 static struct case_source *
483 storage_sink_make_source (struct case_sink *sink)
485 struct case_source *source
486 = create_case_source (&storage_source_class, sink->aux);
492 const struct case_sink_class storage_sink_class =
497 storage_sink_destroy,
498 storage_sink_make_source,
501 /* Storage source. */
503 /* Returns the number of cases that will be read by
504 storage_source_read(). */
506 storage_source_count (const struct case_source *source)
508 struct storage_stream_info *info = source->aux;
510 return casefile_get_case_cnt (info->casefile);
513 /* Reads all cases from the storage source and passes them one by one to
516 storage_source_read (struct case_source *source,
517 struct ccase *output_case,
518 write_case_func *write_case, write_case_data wc_data)
520 struct storage_stream_info *info = source->aux;
521 struct ccase casefile_case;
522 struct casereader *reader;
524 for (reader = casefile_get_reader (info->casefile);
525 casereader_read (reader, &casefile_case);
526 case_destroy (&casefile_case))
528 case_copy (output_case, 0,
530 casefile_get_value_cnt (info->casefile));
531 write_case (wc_data);
533 casereader_destroy (reader);
536 /* Destroys the source's internal data. */
538 storage_source_destroy (struct case_source *source)
540 destroy_storage_stream_info (source->aux);
543 /* Storage source. */
544 const struct case_source_class storage_source_class =
547 storage_source_count,
549 storage_source_destroy,
553 storage_source_get_casefile (struct case_source *source)
555 struct storage_stream_info *info = source->aux;
557 assert (source->class == &storage_source_class);
558 return info->casefile;
562 storage_source_create (struct casefile *cf)
564 struct storage_stream_info *info;
566 info = xmalloc (sizeof *info);
569 return create_case_source (&storage_source_class, info);
572 /* Null sink. Used by a few procedures that keep track of output
573 themselves and would throw away anything that the sink
576 const struct case_sink_class null_sink_class =
585 /* Returns a pointer to the lagged case from N_BEFORE cases before the
586 current one, or NULL if there haven't been that many cases yet. */
588 lagged_case (int n_before)
590 assert (n_before >= 1 );
591 assert (n_before <= n_lag);
593 if (n_before <= lag_count)
595 int index = lag_head - n_before;
598 return &lag_queue[index];
604 /* Appends TRNS to t_trns[], the list of all transformations to be
605 performed on data as it is read from the active file. */
607 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
609 struct transformation *trns;
610 if (n_trns >= m_trns)
611 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
612 trns = &t_trns[n_trns++];
615 trns->private = private;
618 /* Returns the index number that the next transformation added by
619 add_transformation() will receive. A trns_proc_func that
620 returns this index causes control flow to jump to it. */
622 next_transformation (void)
627 /* Cancels all active transformations, including any transformations
628 created by the input program. */
630 cancel_transformations (void)
633 for (i = 0; i < n_trns; i++)
635 struct transformation *t = &t_trns[i];
637 t->free (t->private);
645 /* Creates a case source with class CLASS and auxiliary data AUX
646 and based on dictionary DICT. */
648 create_case_source (const struct case_source_class *class,
651 struct case_source *source = xmalloc (sizeof *source);
652 source->class = class;
657 /* Destroys case source SOURCE. It is the caller's responsible to
658 call the source's destroy function, if any. */
660 free_case_source (struct case_source *source)
664 if (source->class->destroy != NULL)
665 source->class->destroy (source);
670 /* Returns nonzero if a case source is "complex". */
672 case_source_is_complex (const struct case_source *source)
674 return source != NULL && (source->class == &input_program_source_class
675 || source->class == &file_type_source_class);
678 /* Returns nonzero if CLASS is the class of SOURCE. */
680 case_source_is_class (const struct case_source *source,
681 const struct case_source_class *class)
683 return source != NULL && source->class == class;
686 /* Creates a case sink to accept cases from the given DICT with
687 class CLASS and auxiliary data AUX. */
689 create_case_sink (const struct case_sink_class *class,
690 const struct dictionary *dict,
693 struct case_sink *sink = xmalloc (sizeof *sink);
695 sink->value_cnt = dict_get_compacted_value_cnt (dict);
700 /* Destroys case sink SINK. */
702 free_case_sink (struct case_sink *sink)
706 if (sink->class->destroy != NULL)
707 sink->class->destroy (sink);
712 /* Represents auxiliary data for handling SPLIT FILE. */
713 struct split_aux_data
715 size_t case_count; /* Number of cases so far. */
716 struct ccase prev_case; /* Data in previous case. */
718 /* Functions to call... */
719 void (*begin_func) (void *); /* ...before data. */
720 int (*proc_func) (struct ccase *, void *); /* ...with data. */
721 void (*end_func) (void *); /* ...after data. */
722 void *func_aux; /* Auxiliary data. */
725 static int equal_splits (const struct ccase *, const struct ccase *);
726 static int procedure_with_splits_callback (struct ccase *, void *);
727 static void dump_splits (struct ccase *);
729 /* Like procedure(), but it automatically breaks the case stream
730 into SPLIT FILE break groups. Before each group of cases with
731 identical SPLIT FILE variable values, BEGIN_FUNC is called.
732 Then PROC_FUNC is called with each case in the group.
733 END_FUNC is called when the group is finished. FUNC_AUX is
734 passed to each of the functions as auxiliary data.
736 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
737 and END_FUNC will be called at all.
739 If SPLIT FILE is not in effect, then there is one break group
740 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
741 will be called once. */
743 procedure_with_splits (void (*begin_func) (void *aux),
744 int (*proc_func) (struct ccase *, void *aux),
745 void (*end_func) (void *aux),
748 struct split_aux_data split_aux;
750 split_aux.case_count = 0;
751 case_nullify (&split_aux.prev_case);
752 split_aux.begin_func = begin_func;
753 split_aux.proc_func = proc_func;
754 split_aux.end_func = end_func;
755 split_aux.func_aux = func_aux;
758 internal_procedure (procedure_with_splits_callback, &split_aux);
759 if (split_aux.case_count > 0 && end_func != NULL)
761 close_active_file ();
763 case_destroy (&split_aux.prev_case);
766 /* procedure() callback used by procedure_with_splits(). */
768 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
770 struct split_aux_data *split_aux = split_aux_;
772 /* Start a new series if needed. */
773 if (split_aux->case_count == 0
774 || !equal_splits (c, &split_aux->prev_case))
776 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
777 split_aux->end_func (split_aux->func_aux);
780 case_destroy (&split_aux->prev_case);
781 case_clone (&split_aux->prev_case, c);
783 if (split_aux->begin_func != NULL)
784 split_aux->begin_func (split_aux->func_aux);
787 split_aux->case_count++;
788 if (split_aux->proc_func != NULL)
789 return split_aux->proc_func (c, split_aux->func_aux);
794 /* Compares the SPLIT FILE variables in cases A and B and returns
795 nonzero only if they differ. */
797 equal_splits (const struct ccase *a, const struct ccase *b)
799 return case_compare (a, b,
800 dict_get_split_vars (default_dict),
801 dict_get_split_cnt (default_dict)) == 0;
804 /* Dumps out the values of all the split variables for the case C. */
806 dump_splits (struct ccase *c)
808 struct variable *const *split;
813 split_cnt = dict_get_split_cnt (default_dict);
817 t = tab_create (3, split_cnt + 1, 0);
818 tab_dim (t, tab_natural_dimensions);
819 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
820 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
821 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
822 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
823 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
824 split = dict_get_split_vars (default_dict);
825 for (i = 0; i < split_cnt; i++)
827 struct variable *v = split[i];
831 assert (v->type == NUMERIC || v->type == ALPHA);
832 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
834 data_out (temp_buf, &v->print, case_data (c, v->fv));
836 temp_buf[v->print.w] = 0;
837 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
839 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
841 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
843 tab_flags (t, SOMF_NO_TITLE);
847 /* Represents auxiliary data for handling SPLIT FILE in a
848 multipass procedure. */
849 struct multipass_split_aux_data
851 struct ccase prev_case; /* Data in previous case. */
852 struct casefile *casefile; /* Accumulates data for a split. */
854 /* Function to call with the accumulated data. */
855 void (*split_func) (const struct casefile *, void *);
856 void *func_aux; /* Auxiliary data. */
859 static int multipass_split_callback (struct ccase *c, void *aux_);
860 static void multipass_split_output (struct multipass_split_aux_data *);
863 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
867 struct multipass_split_aux_data aux;
869 assert (split_func != NULL);
873 case_nullify (&aux.prev_case);
875 aux.split_func = split_func;
876 aux.func_aux = func_aux;
878 internal_procedure (multipass_split_callback, &aux);
879 if (aux.casefile != NULL)
880 multipass_split_output (&aux);
881 case_destroy (&aux.prev_case);
883 close_active_file ();
886 /* procedure() callback used by multipass_procedure_with_splits(). */
888 multipass_split_callback (struct ccase *c, void *aux_)
890 struct multipass_split_aux_data *aux = aux_;
892 /* Start a new series if needed. */
893 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
895 /* Pass any cases to split_func. */
896 if (aux->casefile != NULL)
897 multipass_split_output (aux);
899 /* Start a new casefile. */
900 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
902 /* Record split values. */
904 case_destroy (&aux->prev_case);
905 case_clone (&aux->prev_case, c);
908 casefile_append (aux->casefile, c);
914 multipass_split_output (struct multipass_split_aux_data *aux)
916 assert (aux->casefile != NULL);
917 aux->split_func (aux->casefile, aux->func_aux);
918 casefile_destroy (aux->casefile);
919 aux->casefile = NULL;
923 /* Discards all the current state in preparation for a data-input
924 command like DATA LIST or GET. */
926 discard_variables (void)
928 dict_clear (default_dict);
929 default_handle = NULL;
933 if (vfm_source != NULL)
935 free_case_source (vfm_source);
939 cancel_transformations ();
943 expr_free (process_if_expr);
944 process_if_expr = NULL;
948 pgm_state = STATE_INIT;