1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
34 #include "dictionary.h"
35 #include "ctl-stack.h"
37 #include "expressions/public.h"
44 #include "value-labels.h"
47 #define _(msgid) gettext (msgid)
50 Virtual File Manager (vfm):
52 vfm is used to process data files. It uses the model that
53 data is read from one stream (the data source), processed,
54 then written to another (the data sink). The data source is
55 then deleted and the data sink becomes the data source for the
58 /* Procedure execution data. */
59 struct write_case_data
61 /* Function to call for each case. */
62 int (*proc_func) (struct ccase *, void *); /* Function. */
63 void *aux; /* Auxiliary data. */
65 struct ccase trns_case; /* Case used for transformations. */
66 struct ccase sink_case; /* Case written to sink, if
67 compaction is necessary. */
68 size_t cases_written; /* Cases output so far. */
69 size_t cases_analyzed; /* Cases passed to procedure so far. */
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
78 /* Nonzero if the case needs to have values deleted before being
79 stored, zero otherwise. */
80 static int compaction_necessary;
82 /* Time at which vfm was last invoked. */
83 static time_t last_vfm_invocation;
86 int n_lag; /* Number of cases to lag. */
87 static int lag_count; /* Number of cases in lag_queue so far. */
88 static int lag_head; /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
91 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
93 static void update_last_vfm_invocation (void);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (void);
96 static int write_case (struct write_case_data *wc_data);
97 static int execute_transformations (struct ccase *c,
98 struct transformation *trns,
99 int first_idx, int last_idx,
101 static int filter_case (const struct ccase *c, int case_num);
102 static void lag_case (const struct ccase *c);
103 static void clear_case (struct ccase *c);
104 static void close_active_file (void);
106 /* Public functions. */
108 /* Returns the last time the data was read. */
110 vfm_last_invocation (void)
112 if (last_vfm_invocation == 0)
113 update_last_vfm_invocation ();
114 return last_vfm_invocation;
117 /* Reads the data from the input program and writes it to a new
118 active file. For each case we read from the input program, we
121 1. Execute permanent transformations. If these drop the case,
122 start the next case from step 1.
124 2. N OF CASES. If we have already written N cases, start the
125 next case from step 1.
127 3. Write case to replacement active file.
129 4. Execute temporary transformations. If these drop the case,
130 start the next case from step 1.
132 5. FILTER, PROCESS IF. If these drop the case, start the next
135 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
136 cases, start the next case from step 1.
138 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
140 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
142 if (proc_func == NULL
143 && case_source_is_class (vfm_source, &storage_source_class)
149 update_last_vfm_invocation ();
154 internal_procedure (proc_func, aux);
155 close_active_file ();
158 /* Executes a procedure, as procedure(), except that the caller
159 is responsible for calling open_active_file() and
160 close_active_file(). */
162 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
164 static int recursive_call;
166 struct write_case_data wc_data;
168 assert (++recursive_call == 1);
170 wc_data.proc_func = proc_func;
172 create_trns_case (&wc_data.trns_case, default_dict);
173 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
174 wc_data.cases_written = 0;
176 update_last_vfm_invocation ();
178 if (vfm_source != NULL)
179 vfm_source->class->read (vfm_source,
181 write_case, &wc_data);
183 case_destroy (&wc_data.sink_case);
184 case_destroy (&wc_data.trns_case);
186 assert (--recursive_call == 0);
189 /* Updates last_vfm_invocation. */
191 update_last_vfm_invocation (void)
193 last_vfm_invocation = time (NULL);
196 /* Creates and returns a case, initializing it from the vectors
197 that say which `value's need to be initialized just once, and
198 which ones need to be re-initialized before every case. */
200 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
202 size_t var_cnt = dict_get_var_cnt (dict);
205 case_create (trns_case, dict_get_next_value_idx (dict));
206 for (i = 0; i < var_cnt; i++)
208 struct variable *v = dict_get_var (dict, i);
209 union value *value = case_data_rw (trns_case, v->fv);
211 if (v->type == NUMERIC)
212 value->f = v->reinit ? 0.0 : SYSMIS;
214 memset (value->s, ' ', v->width);
218 /* Makes all preparations for reading from the data source and writing
221 open_active_file (void)
223 /* Make temp_dict refer to the dictionary right before data
228 temp_dict = default_dict;
231 /* Figure out compaction. */
232 compaction_necessary = (dict_get_next_value_idx (temp_dict)
233 != dict_get_compacted_value_cnt (temp_dict));
236 if (vfm_sink == NULL)
237 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
238 if (vfm_sink->class->open != NULL)
239 vfm_sink->class->open (vfm_sink);
241 /* Allocate memory for lag queue. */
248 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
249 for (i = 0; i < n_lag; i++)
250 case_nullify (&lag_queue[i]);
253 /* Close any unclosed DO IF or LOOP constructs. */
257 /* Transforms trns_case and writes it to the replacement active
258 file if advisable. Returns nonzero if more cases can be
259 accepted, zero otherwise. Do not call this function again
260 after it has returned zero once. */
262 write_case (struct write_case_data *wc_data)
264 /* Execute permanent transformations. */
265 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
266 wc_data->cases_written + 1))
270 if (dict_get_case_limit (default_dict)
271 && wc_data->cases_written >= dict_get_case_limit (default_dict))
273 wc_data->cases_written++;
275 /* Write case to LAG queue. */
277 lag_case (&wc_data->trns_case);
279 /* Write case to replacement active file. */
280 if (vfm_sink->class->write != NULL)
282 if (compaction_necessary)
284 dict_compact_case (temp_dict, &wc_data->sink_case,
285 &wc_data->trns_case);
286 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
289 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
292 /* Execute temporary transformations. */
293 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
294 wc_data->cases_written))
297 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
298 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
299 || (dict_get_case_limit (temp_dict)
300 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
302 wc_data->cases_analyzed++;
304 /* Pass case to procedure. */
305 if (wc_data->proc_func != NULL)
306 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
309 clear_case (&wc_data->trns_case);
313 /* Transforms case C using the transformations in TRNS[] with
314 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
315 become case CASE_NUM (1-based) in the output file. Returns
316 zero if the case was filtered out by one of the
317 transformations, nonzero otherwise. */
319 execute_transformations (struct ccase *c,
320 struct transformation *trns,
321 int first_idx, int last_idx,
326 for (idx = first_idx; idx != last_idx; )
328 struct transformation *t = &trns[idx];
329 int retval = t->proc (t->private, c, case_num);
348 /* Returns nonzero if case C with case number CASE_NUM should be
349 exclude as specified on FILTER or PROCESS IF, otherwise
352 filter_case (const struct ccase *c, int case_idx)
355 struct variable *filter_var = dict_get_filter (default_dict);
356 if (filter_var != NULL)
358 double f = case_num (c, filter_var->fv);
359 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
364 if (process_if_expr != NULL
365 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
371 /* Add C to the lag queue. */
373 lag_case (const struct ccase *c)
375 if (lag_count < n_lag)
377 case_destroy (&lag_queue[lag_head]);
378 case_clone (&lag_queue[lag_head], c);
379 if (++lag_head >= n_lag)
383 /* Clears the variables in C that need to be cleared between
386 clear_case (struct ccase *c)
388 size_t var_cnt = dict_get_var_cnt (default_dict);
391 for (i = 0; i < var_cnt; i++)
393 struct variable *v = dict_get_var (default_dict, i);
394 if (v->init && v->reinit)
396 if (v->type == NUMERIC)
397 case_data_rw (c, v->fv)->f = SYSMIS;
399 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
404 /* Closes the active file. */
406 close_active_file (void)
408 /* Free memory for lag queue, and turn off lagging. */
413 for (i = 0; i < n_lag; i++)
414 case_destroy (&lag_queue[i]);
419 /* Dictionary from before TEMPORARY becomes permanent.. */
422 dict_destroy (default_dict);
423 default_dict = temp_dict;
427 /* Finish compaction. */
428 if (compaction_necessary)
429 dict_compact_values (default_dict);
431 /* Free data source. */
432 free_case_source (vfm_source);
435 /* Old data sink becomes new data source. */
436 if (vfm_sink->class->make_source != NULL)
437 vfm_source = vfm_sink->class->make_source (vfm_sink);
438 free_case_sink (vfm_sink);
441 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
442 and get rid of all the transformations. */
444 expr_free (process_if_expr);
445 process_if_expr = NULL;
446 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
447 dict_set_filter (default_dict, NULL);
448 dict_set_case_limit (default_dict, 0);
449 dict_clear_vectors (default_dict);
450 cancel_transformations ();
453 /* Storage case stream. */
455 /* Information about storage sink or source. */
456 struct storage_stream_info
458 struct casefile *casefile; /* Storage. */
461 /* Initializes a storage sink. */
463 storage_sink_open (struct case_sink *sink)
465 struct storage_stream_info *info;
467 sink->aux = info = xmalloc (sizeof *info);
468 info->casefile = casefile_create (sink->value_cnt);
471 /* Destroys storage stream represented by INFO. */
473 destroy_storage_stream_info (struct storage_stream_info *info)
477 casefile_destroy (info->casefile);
482 /* Writes case C to the storage sink SINK. */
484 storage_sink_write (struct case_sink *sink, const struct ccase *c)
486 struct storage_stream_info *info = sink->aux;
488 casefile_append (info->casefile, c);
491 /* Destroys internal data in SINK. */
493 storage_sink_destroy (struct case_sink *sink)
495 destroy_storage_stream_info (sink->aux);
498 /* Closes the sink and returns a storage source to read back the
500 static struct case_source *
501 storage_sink_make_source (struct case_sink *sink)
503 struct case_source *source
504 = create_case_source (&storage_source_class, sink->aux);
510 const struct case_sink_class storage_sink_class =
515 storage_sink_destroy,
516 storage_sink_make_source,
519 /* Storage source. */
521 /* Returns the number of cases that will be read by
522 storage_source_read(). */
524 storage_source_count (const struct case_source *source)
526 struct storage_stream_info *info = source->aux;
528 return casefile_get_case_cnt (info->casefile);
531 /* Reads all cases from the storage source and passes them one by one to
534 storage_source_read (struct case_source *source,
535 struct ccase *output_case,
536 write_case_func *write_case, write_case_data wc_data)
538 struct storage_stream_info *info = source->aux;
539 struct ccase casefile_case;
540 struct casereader *reader;
542 for (reader = casefile_get_reader (info->casefile);
543 casereader_read (reader, &casefile_case);
544 case_destroy (&casefile_case))
546 case_copy (output_case, 0,
548 casefile_get_value_cnt (info->casefile));
549 write_case (wc_data);
551 casereader_destroy (reader);
554 /* Destroys the source's internal data. */
556 storage_source_destroy (struct case_source *source)
558 destroy_storage_stream_info (source->aux);
561 /* Storage source. */
562 const struct case_source_class storage_source_class =
565 storage_source_count,
567 storage_source_destroy,
571 storage_source_get_casefile (struct case_source *source)
573 struct storage_stream_info *info = source->aux;
575 assert (source->class == &storage_source_class);
576 return info->casefile;
580 storage_source_create (struct casefile *cf)
582 struct storage_stream_info *info;
584 info = xmalloc (sizeof *info);
587 return create_case_source (&storage_source_class, info);
590 /* Null sink. Used by a few procedures that keep track of output
591 themselves and would throw away anything that the sink
594 const struct case_sink_class null_sink_class =
603 /* Returns a pointer to the lagged case from N_BEFORE cases before the
604 current one, or NULL if there haven't been that many cases yet. */
606 lagged_case (int n_before)
608 assert (n_before >= 1 );
609 assert (n_before <= n_lag);
611 if (n_before <= lag_count)
613 int index = lag_head - n_before;
616 return &lag_queue[index];
622 /* Appends TRNS to t_trns[], the list of all transformations to be
623 performed on data as it is read from the active file. */
625 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
627 struct transformation *trns;
628 if (n_trns >= m_trns)
629 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
630 trns = &t_trns[n_trns++];
633 trns->private = private;
636 /* Returns the index number that the next transformation added by
637 add_transformation() will receive. A trns_proc_func that
638 returns this index causes control flow to jump to it. */
640 next_transformation (void)
645 /* Cancels all active transformations, including any transformations
646 created by the input program. */
648 cancel_transformations (void)
651 for (i = 0; i < n_trns; i++)
653 struct transformation *t = &t_trns[i];
655 t->free (t->private);
663 /* Creates a case source with class CLASS and auxiliary data AUX
664 and based on dictionary DICT. */
666 create_case_source (const struct case_source_class *class,
669 struct case_source *source = xmalloc (sizeof *source);
670 source->class = class;
675 /* Destroys case source SOURCE. It is the caller's responsible to
676 call the source's destroy function, if any. */
678 free_case_source (struct case_source *source)
682 if (source->class->destroy != NULL)
683 source->class->destroy (source);
688 /* Returns nonzero if a case source is "complex". */
690 case_source_is_complex (const struct case_source *source)
692 return source != NULL && (source->class == &input_program_source_class
693 || source->class == &file_type_source_class);
696 /* Returns nonzero if CLASS is the class of SOURCE. */
698 case_source_is_class (const struct case_source *source,
699 const struct case_source_class *class)
701 return source != NULL && source->class == class;
704 /* Creates a case sink to accept cases from the given DICT with
705 class CLASS and auxiliary data AUX. */
707 create_case_sink (const struct case_sink_class *class,
708 const struct dictionary *dict,
711 struct case_sink *sink = xmalloc (sizeof *sink);
713 sink->value_cnt = dict_get_compacted_value_cnt (dict);
718 /* Destroys case sink SINK. */
720 free_case_sink (struct case_sink *sink)
724 if (sink->class->destroy != NULL)
725 sink->class->destroy (sink);
730 /* Represents auxiliary data for handling SPLIT FILE. */
731 struct split_aux_data
733 size_t case_count; /* Number of cases so far. */
734 struct ccase prev_case; /* Data in previous case. */
736 /* Functions to call... */
737 void (*begin_func) (void *); /* ...before data. */
738 int (*proc_func) (struct ccase *, void *); /* ...with data. */
739 void (*end_func) (void *); /* ...after data. */
740 void *func_aux; /* Auxiliary data. */
743 static int equal_splits (const struct ccase *, const struct ccase *);
744 static int procedure_with_splits_callback (struct ccase *, void *);
745 static void dump_splits (struct ccase *);
747 /* Like procedure(), but it automatically breaks the case stream
748 into SPLIT FILE break groups. Before each group of cases with
749 identical SPLIT FILE variable values, BEGIN_FUNC is called.
750 Then PROC_FUNC is called with each case in the group.
751 END_FUNC is called when the group is finished. FUNC_AUX is
752 passed to each of the functions as auxiliary data.
754 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
755 and END_FUNC will be called at all.
757 If SPLIT FILE is not in effect, then there is one break group
758 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
759 will be called once. */
761 procedure_with_splits (void (*begin_func) (void *aux),
762 int (*proc_func) (struct ccase *, void *aux),
763 void (*end_func) (void *aux),
766 struct split_aux_data split_aux;
768 split_aux.case_count = 0;
769 case_nullify (&split_aux.prev_case);
770 split_aux.begin_func = begin_func;
771 split_aux.proc_func = proc_func;
772 split_aux.end_func = end_func;
773 split_aux.func_aux = func_aux;
776 internal_procedure (procedure_with_splits_callback, &split_aux);
777 if (split_aux.case_count > 0 && end_func != NULL)
779 close_active_file ();
781 case_destroy (&split_aux.prev_case);
784 /* procedure() callback used by procedure_with_splits(). */
786 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
788 struct split_aux_data *split_aux = split_aux_;
790 /* Start a new series if needed. */
791 if (split_aux->case_count == 0
792 || !equal_splits (c, &split_aux->prev_case))
794 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
795 split_aux->end_func (split_aux->func_aux);
798 case_destroy (&split_aux->prev_case);
799 case_clone (&split_aux->prev_case, c);
801 if (split_aux->begin_func != NULL)
802 split_aux->begin_func (split_aux->func_aux);
805 split_aux->case_count++;
806 if (split_aux->proc_func != NULL)
807 return split_aux->proc_func (c, split_aux->func_aux);
812 /* Compares the SPLIT FILE variables in cases A and B and returns
813 nonzero only if they differ. */
815 equal_splits (const struct ccase *a, const struct ccase *b)
817 return case_compare (a, b,
818 dict_get_split_vars (default_dict),
819 dict_get_split_cnt (default_dict)) == 0;
822 /* Dumps out the values of all the split variables for the case C. */
824 dump_splits (struct ccase *c)
826 struct variable *const *split;
831 split_cnt = dict_get_split_cnt (default_dict);
835 t = tab_create (3, split_cnt + 1, 0);
836 tab_dim (t, tab_natural_dimensions);
837 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
838 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
839 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
840 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
841 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
842 split = dict_get_split_vars (default_dict);
843 for (i = 0; i < split_cnt; i++)
845 struct variable *v = split[i];
849 assert (v->type == NUMERIC || v->type == ALPHA);
850 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
852 data_out (temp_buf, &v->print, case_data (c, v->fv));
854 temp_buf[v->print.w] = 0;
855 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
857 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
859 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
861 tab_flags (t, SOMF_NO_TITLE);
865 /* Represents auxiliary data for handling SPLIT FILE in a
866 multipass procedure. */
867 struct multipass_split_aux_data
869 struct ccase prev_case; /* Data in previous case. */
870 struct casefile *casefile; /* Accumulates data for a split. */
872 /* Function to call with the accumulated data. */
873 void (*split_func) (const struct casefile *, void *);
874 void *func_aux; /* Auxiliary data. */
877 static int multipass_split_callback (struct ccase *c, void *aux_);
878 static void multipass_split_output (struct multipass_split_aux_data *);
881 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
885 struct multipass_split_aux_data aux;
887 assert (split_func != NULL);
891 case_nullify (&aux.prev_case);
893 aux.split_func = split_func;
894 aux.func_aux = func_aux;
896 internal_procedure (multipass_split_callback, &aux);
897 if (aux.casefile != NULL)
898 multipass_split_output (&aux);
899 case_destroy (&aux.prev_case);
901 close_active_file ();
904 /* procedure() callback used by multipass_procedure_with_splits(). */
906 multipass_split_callback (struct ccase *c, void *aux_)
908 struct multipass_split_aux_data *aux = aux_;
910 /* Start a new series if needed. */
911 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
913 /* Pass any cases to split_func. */
914 if (aux->casefile != NULL)
915 multipass_split_output (aux);
917 /* Start a new casefile. */
918 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
920 /* Record split values. */
922 case_destroy (&aux->prev_case);
923 case_clone (&aux->prev_case, c);
926 casefile_append (aux->casefile, c);
932 multipass_split_output (struct multipass_split_aux_data *aux)
934 assert (aux->casefile != NULL);
935 aux->split_func (aux->casefile, aux->func_aux);
936 casefile_destroy (aux->casefile);
937 aux->casefile = NULL;
941 /* Discards all the current state in preparation for a data-input
942 command like DATA LIST or GET. */
944 discard_variables (void)
946 dict_clear (default_dict);
947 default_handle = NULL;
951 if (vfm_source != NULL)
953 free_case_source (vfm_source);
957 cancel_transformations ();
961 expr_free (process_if_expr);
962 process_if_expr = NULL;
966 pgm_state = STATE_INIT;