1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
33 #include "dictionary.h"
44 #include "value-labels.h"
47 Virtual File Manager (vfm):
49 vfm is used to process data files. It uses the model that
50 data is read from one stream (the data source), processed,
51 then written to another (the data sink). The data source is
52 then deleted and the data sink becomes the data source for the
55 /* Procedure execution data. */
56 struct write_case_data
58 /* Function to call for each case. */
59 int (*proc_func) (struct ccase *, void *); /* Function. */
60 void *aux; /* Auxiliary data. */
62 struct ccase trns_case; /* Case used for transformations. */
63 struct ccase sink_case; /* Case written to sink, if
64 compaction is necessary. */
65 size_t cases_written; /* Cases output so far. */
66 size_t cases_analyzed; /* Cases passed to procedure so far. */
69 /* The current active file, from which cases are read. */
70 struct case_source *vfm_source;
72 /* The replacement active file, to which cases are written. */
73 struct case_sink *vfm_sink;
75 /* Nonzero if the case needs to have values deleted before being
76 stored, zero otherwise. */
77 static int compaction_necessary;
79 /* Time at which vfm was last invoked. */
80 time_t last_vfm_invocation;
83 int n_lag; /* Number of cases to lag. */
84 static int lag_count; /* Number of cases in lag_queue so far. */
85 static int lag_head; /* Index where next case will be added. */
86 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
88 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
90 static void create_trns_case (struct ccase *, struct dictionary *);
91 static void open_active_file (void);
92 static int write_case (struct write_case_data *wc_data);
93 static int execute_transformations (struct ccase *c,
94 struct trns_header **trns,
95 int first_idx, int last_idx,
97 static int filter_case (const struct ccase *c, int case_num);
98 static void lag_case (const struct ccase *c);
99 static void compact_case (struct ccase *dest, const struct ccase *src);
100 static void clear_case (struct ccase *c);
101 static void close_active_file (void);
103 /* Public functions. */
105 /* Reads the data from the input program and writes it to a new
106 active file. For each case we read from the input program, we
109 1. Execute permanent transformations. If these drop the case,
110 start the next case from step 1.
112 2. N OF CASES. If we have already written N cases, start the
113 next case from step 1.
115 3. Write case to replacement active file.
117 4. Execute temporary transformations. If these drop the case,
118 start the next case from step 1.
120 5. FILTER, PROCESS IF. If these drop the case, start the next
123 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
124 cases, start the next case from step 1.
126 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
128 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
130 if (proc_func == NULL
131 && case_source_is_class (vfm_source, &storage_source_class)
141 internal_procedure (proc_func, aux);
142 close_active_file ();
145 /* Executes a procedure, as procedure(), except that the caller
146 is responsible for calling open_active_file() and
147 close_active_file(). */
149 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
151 static int recursive_call;
153 struct write_case_data wc_data;
155 assert (++recursive_call == 1);
157 wc_data.proc_func = proc_func;
159 create_trns_case (&wc_data.trns_case, default_dict);
160 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
161 wc_data.cases_written = 0;
163 last_vfm_invocation = time (NULL);
165 if (vfm_source != NULL)
166 vfm_source->class->read (vfm_source,
168 write_case, &wc_data);
170 case_destroy (&wc_data.sink_case);
171 case_destroy (&wc_data.trns_case);
173 assert (--recursive_call == 0);
176 /* Creates and returns a case, initializing it from the vectors
177 that say which `value's need to be initialized just once, and
178 which ones need to be re-initialized before every case. */
180 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
182 size_t var_cnt = dict_get_var_cnt (dict);
185 case_create (trns_case, dict_get_next_value_idx (dict));
186 for (i = 0; i < var_cnt; i++)
188 struct variable *v = dict_get_var (dict, i);
189 union value *value = case_data_rw (trns_case, v->fv);
191 if (v->type == NUMERIC)
192 value->f = v->reinit ? 0.0 : SYSMIS;
194 memset (value->s, ' ', v->width);
198 /* Makes all preparations for reading from the data source and writing
201 open_active_file (void)
203 /* Make temp_dict refer to the dictionary right before data
208 temp_dict = default_dict;
211 /* Figure out compaction. */
212 compaction_necessary = (dict_get_next_value_idx (temp_dict)
213 != dict_get_compacted_value_cnt (temp_dict));
216 if (vfm_sink == NULL)
217 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
218 if (vfm_sink->class->open != NULL)
219 vfm_sink->class->open (vfm_sink);
221 /* Allocate memory for lag queue. */
228 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
229 for (i = 0; i < n_lag; i++)
230 case_nullify (&lag_queue[i]);
233 /* Close any unclosed DO IF or LOOP constructs. */
234 discard_ctl_stack ();
237 /* Transforms trns_case and writes it to the replacement active
238 file if advisable. Returns nonzero if more cases can be
239 accepted, zero otherwise. Do not call this function again
240 after it has returned zero once. */
242 write_case (struct write_case_data *wc_data)
244 /* Execute permanent transformations. */
245 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
246 wc_data->cases_written + 1))
250 if (dict_get_case_limit (default_dict)
251 && wc_data->cases_written >= dict_get_case_limit (default_dict))
253 wc_data->cases_written++;
255 /* Write case to LAG queue. */
257 lag_case (&wc_data->trns_case);
259 /* Write case to replacement active file. */
260 if (vfm_sink->class->write != NULL)
262 if (compaction_necessary)
264 compact_case (&wc_data->sink_case, &wc_data->trns_case);
265 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
268 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
271 /* Execute temporary transformations. */
272 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
273 wc_data->cases_written))
276 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
277 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
278 || (dict_get_case_limit (temp_dict)
279 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
281 wc_data->cases_analyzed++;
283 /* Pass case to procedure. */
284 if (wc_data->proc_func != NULL)
285 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
288 clear_case (&wc_data->trns_case);
292 /* Transforms case C using the transformations in TRNS[] with
293 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
294 become case CASE_NUM (1-based) in the output file. Returns
295 zero if the case was filtered out by one of the
296 transformations, nonzero otherwise. */
298 execute_transformations (struct ccase *c,
299 struct trns_header **trns,
300 int first_idx, int last_idx,
305 for (idx = first_idx; idx != last_idx; )
307 int retval = trns[idx]->proc (trns[idx], c, case_num);
326 /* Returns nonzero if case C with case number CASE_NUM should be
327 exclude as specified on FILTER or PROCESS IF, otherwise
330 filter_case (const struct ccase *c, int case_idx)
333 struct variable *filter_var = dict_get_filter (default_dict);
334 if (filter_var != NULL)
336 double f = case_num (c, filter_var->fv);
337 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
342 if (process_if_expr != NULL
343 && expr_evaluate (process_if_expr, c, case_idx, NULL) != 1.0)
349 /* Add C to the lag queue. */
351 lag_case (const struct ccase *c)
353 if (lag_count < n_lag)
355 case_destroy (&lag_queue[lag_head]);
356 case_clone (&lag_queue[lag_head], c);
357 if (++lag_head >= n_lag)
361 /* Copies case SRC to case DEST, compacting it in the process. */
363 compact_case (struct ccase *dest, const struct ccase *src)
369 assert (compaction_necessary);
371 /* Copy all the variables except scratch variables from SRC to
373 /* FIXME: this should be temp_dict not default_dict I guess. */
374 var_cnt = dict_get_var_cnt (default_dict);
375 for (i = 0; i < var_cnt; i++)
377 struct variable *v = dict_get_var (default_dict, i);
379 if (dict_class_from_id (v->name) == DC_SCRATCH)
382 if (v->type == NUMERIC)
384 case_data_rw (dest, nval)->f = case_num (src, v->fv);
389 int w = DIV_RND_UP (v->width, sizeof (union value));
391 memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
392 w * sizeof (union value));
398 /* Clears the variables in C that need to be cleared between
401 clear_case (struct ccase *c)
403 size_t var_cnt = dict_get_var_cnt (default_dict);
406 for (i = 0; i < var_cnt; i++)
408 struct variable *v = dict_get_var (default_dict, i);
409 if (v->init && v->reinit)
411 if (v->type == NUMERIC)
412 case_data_rw (c, v->fv)->f = SYSMIS;
414 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
419 /* Closes the active file. */
421 close_active_file (void)
423 /* Free memory for lag queue, and turn off lagging. */
428 for (i = 0; i < n_lag; i++)
429 case_destroy (&lag_queue[i]);
434 /* Dictionary from before TEMPORARY becomes permanent.. */
437 dict_destroy (default_dict);
438 default_dict = temp_dict;
442 /* Finish compaction. */
443 if (compaction_necessary)
444 dict_compact_values (default_dict);
446 /* Free data source. */
447 if (vfm_source != NULL)
449 free_case_source (vfm_source);
453 /* Old data sink becomes new data source. */
454 if (vfm_sink->class->make_source != NULL)
455 vfm_source = vfm_sink->class->make_source (vfm_sink);
456 free_case_sink (vfm_sink);
459 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
460 and get rid of all the transformations. */
462 expr_free (process_if_expr);
463 process_if_expr = NULL;
464 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
465 dict_set_filter (default_dict, NULL);
466 dict_set_case_limit (default_dict, 0);
467 dict_clear_vectors (default_dict);
468 cancel_transformations ();
471 /* Storage case stream. */
473 /* Information about storage sink or source. */
474 struct storage_stream_info
476 struct casefile *casefile; /* Storage. */
479 /* Initializes a storage sink. */
481 storage_sink_open (struct case_sink *sink)
483 struct storage_stream_info *info;
485 sink->aux = info = xmalloc (sizeof *info);
486 info->casefile = casefile_create (sink->value_cnt);
489 /* Destroys storage stream represented by INFO. */
491 destroy_storage_stream_info (struct storage_stream_info *info)
495 casefile_destroy (info->casefile);
500 /* Writes case C to the storage sink SINK. */
502 storage_sink_write (struct case_sink *sink, const struct ccase *c)
504 struct storage_stream_info *info = sink->aux;
506 casefile_append (info->casefile, c);
509 /* Destroys internal data in SINK. */
511 storage_sink_destroy (struct case_sink *sink)
513 destroy_storage_stream_info (sink->aux);
516 /* Closes the sink and returns a storage source to read back the
518 static struct case_source *
519 storage_sink_make_source (struct case_sink *sink)
521 struct case_source *source
522 = create_case_source (&storage_source_class, sink->dict, sink->aux);
528 const struct case_sink_class storage_sink_class =
533 storage_sink_destroy,
534 storage_sink_make_source,
537 /* Storage source. */
539 /* Returns the number of cases that will be read by
540 storage_source_read(). */
542 storage_source_count (const struct case_source *source)
544 struct storage_stream_info *info = source->aux;
546 return casefile_get_case_cnt (info->casefile);
549 /* Reads all cases from the storage source and passes them one by one to
552 storage_source_read (struct case_source *source,
553 struct ccase *output_case,
554 write_case_func *write_case, write_case_data wc_data)
556 struct storage_stream_info *info = source->aux;
557 struct ccase casefile_case;
558 struct casereader *reader;
560 for (reader = casefile_get_reader (info->casefile);
561 casereader_read (reader, &casefile_case);
562 case_destroy (&casefile_case))
564 case_copy (output_case, 0,
566 casefile_get_value_cnt (info->casefile));
567 write_case (wc_data);
569 casereader_destroy (reader);
572 /* Destroys the source's internal data. */
574 storage_source_destroy (struct case_source *source)
576 destroy_storage_stream_info (source->aux);
579 /* Storage source. */
580 const struct case_source_class storage_source_class =
583 storage_source_count,
585 storage_source_destroy,
589 storage_source_get_casefile (struct case_source *source)
591 struct storage_stream_info *info = source->aux;
593 assert (source->class == &storage_source_class);
594 return info->casefile;
598 storage_source_create (struct casefile *cf, const struct dictionary *dict)
600 struct storage_stream_info *info;
602 info = xmalloc (sizeof *info);
605 return create_case_source (&storage_source_class, dict, info);
608 /* Null sink. Used by a few procedures that keep track of output
609 themselves and would throw away anything that the sink
612 const struct case_sink_class null_sink_class =
621 /* Returns a pointer to the lagged case from N_BEFORE cases before the
622 current one, or NULL if there haven't been that many cases yet. */
624 lagged_case (int n_before)
626 assert (n_before <= n_lag);
627 if (n_before <= lag_count)
629 int index = lag_head - n_before;
632 return &lag_queue[index];
638 /* Appends TRNS to t_trns[], the list of all transformations to be
639 performed on data as it is read from the active file. */
641 add_transformation (struct trns_header * trns)
643 if (n_trns >= m_trns)
646 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
648 t_trns[n_trns] = trns;
649 trns->index = n_trns++;
652 /* Cancels all active transformations, including any transformations
653 created by the input program. */
655 cancel_transformations (void)
658 for (i = 0; i < n_trns; i++)
661 t_trns[i]->free (t_trns[i]);
673 /* Creates a case source with class CLASS and auxiliary data AUX
674 and based on dictionary DICT. */
676 create_case_source (const struct case_source_class *class,
677 const struct dictionary *dict,
680 struct case_source *source = xmalloc (sizeof *source);
681 source->class = class;
682 source->value_cnt = dict_get_next_value_idx (dict);
687 /* Destroys case source SOURCE. It is the caller's responsible to
688 call the source's destroy function, if any. */
690 free_case_source (struct case_source *source)
694 if (source->class->destroy != NULL)
695 source->class->destroy (source);
700 /* Returns nonzero if a case source is "complex". */
702 case_source_is_complex (const struct case_source *source)
704 return source != NULL && (source->class == &input_program_source_class
705 || source->class == &file_type_source_class);
708 /* Returns nonzero if CLASS is the class of SOURCE. */
710 case_source_is_class (const struct case_source *source,
711 const struct case_source_class *class)
713 return source != NULL && source->class == class;
716 /* Creates a case sink with class CLASS and auxiliary data
719 create_case_sink (const struct case_sink_class *class,
720 const struct dictionary *dict,
723 struct case_sink *sink = xmalloc (sizeof *sink);
726 sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
727 sink->value_cnt = dict_get_compacted_value_cnt (dict);
732 /* Destroys case sink SINK. */
734 free_case_sink (struct case_sink *sink)
738 if (sink->class->destroy != NULL)
739 sink->class->destroy (sink);
740 free (sink->idx_to_fv);
745 /* Represents auxiliary data for handling SPLIT FILE. */
746 struct split_aux_data
748 size_t case_count; /* Number of cases so far. */
749 struct ccase prev_case; /* Data in previous case. */
751 /* Functions to call... */
752 void (*begin_func) (void *); /* ...before data. */
753 int (*proc_func) (struct ccase *, void *); /* ...with data. */
754 void (*end_func) (void *); /* ...after data. */
755 void *func_aux; /* Auxiliary data. */
758 static int equal_splits (const struct ccase *, const struct ccase *);
759 static int procedure_with_splits_callback (struct ccase *, void *);
760 static void dump_splits (struct ccase *);
762 /* Like procedure(), but it automatically breaks the case stream
763 into SPLIT FILE break groups. Before each group of cases with
764 identical SPLIT FILE variable values, BEGIN_FUNC is called.
765 Then PROC_FUNC is called with each case in the group.
766 END_FUNC is called when the group is finished. FUNC_AUX is
767 passed to each of the functions as auxiliary data.
769 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
770 and END_FUNC will be called at all.
772 If SPLIT FILE is not in effect, then there is one break group
773 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
774 will be called once. */
776 procedure_with_splits (void (*begin_func) (void *aux),
777 int (*proc_func) (struct ccase *, void *aux),
778 void (*end_func) (void *aux),
781 struct split_aux_data split_aux;
783 split_aux.case_count = 0;
784 case_nullify (&split_aux.prev_case);
785 split_aux.begin_func = begin_func;
786 split_aux.proc_func = proc_func;
787 split_aux.end_func = end_func;
788 split_aux.func_aux = func_aux;
790 procedure (procedure_with_splits_callback, &split_aux);
792 if (split_aux.case_count > 0 && end_func != NULL)
794 case_destroy (&split_aux.prev_case);
797 /* procedure() callback used by procedure_with_splits(). */
799 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
801 struct split_aux_data *split_aux = split_aux_;
803 /* Start a new series if needed. */
804 if (split_aux->case_count == 0
805 || !equal_splits (c, &split_aux->prev_case))
807 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
808 split_aux->end_func (split_aux->func_aux);
811 case_destroy (&split_aux->prev_case);
812 case_clone (&split_aux->prev_case, c);
814 if (split_aux->begin_func != NULL)
815 split_aux->begin_func (split_aux->func_aux);
818 split_aux->case_count++;
819 if (split_aux->proc_func != NULL)
820 return split_aux->proc_func (c, split_aux->func_aux);
825 /* Compares the SPLIT FILE variables in cases A and B and returns
826 nonzero only if they differ. */
828 equal_splits (const struct ccase *a, const struct ccase *b)
830 struct variable *const *split;
834 split = dict_get_split_vars (default_dict);
835 split_cnt = dict_get_split_cnt (default_dict);
836 for (i = 0; i < split_cnt; i++)
838 struct variable *v = split[i];
843 if (case_num (a, v->fv) != case_num (b, v->fv))
847 if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
858 /* Dumps out the values of all the split variables for the case C. */
860 dump_splits (struct ccase *c)
862 struct variable *const *split;
867 split_cnt = dict_get_split_cnt (default_dict);
871 t = tab_create (3, split_cnt + 1, 0);
872 tab_dim (t, tab_natural_dimensions);
873 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
874 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
875 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
876 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
877 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
878 split = dict_get_split_vars (default_dict);
879 for (i = 0; i < split_cnt; i++)
881 struct variable *v = split[i];
885 assert (v->type == NUMERIC || v->type == ALPHA);
886 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
888 data_out (temp_buf, &v->print, case_data (c, v->fv));
890 temp_buf[v->print.w] = 0;
891 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
893 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
895 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
897 tab_flags (t, SOMF_NO_TITLE);
901 /* Represents auxiliary data for handling SPLIT FILE in a
902 multipass procedure. */
903 struct multipass_split_aux_data
905 struct ccase prev_case; /* Data in previous case. */
906 struct casefile *casefile; /* Accumulates data for a split. */
908 /* Function to call with the accumulated data. */
909 void (*split_func) (const struct casefile *, void *);
910 void *func_aux; /* Auxiliary data. */
913 static int multipass_split_callback (struct ccase *c, void *aux_);
914 static void multipass_split_output (struct multipass_split_aux_data *);
917 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
921 struct multipass_split_aux_data aux;
923 assert (split_func != NULL);
927 case_nullify (&aux.prev_case);
929 aux.split_func = split_func;
930 aux.func_aux = func_aux;
932 internal_procedure (multipass_split_callback, &aux);
933 if (aux.casefile != NULL)
934 multipass_split_output (&aux);
935 case_destroy (&aux.prev_case);
937 close_active_file ();
940 /* procedure() callback used by multipass_procedure_with_splits(). */
942 multipass_split_callback (struct ccase *c, void *aux_)
944 struct multipass_split_aux_data *aux = aux_;
946 /* Start a new series if needed. */
947 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
949 /* Pass any cases to split_func. */
950 if (aux->casefile != NULL)
951 multipass_split_output (aux);
953 /* Start a new casefile. */
954 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
956 /* Record split values. */
958 case_destroy (&aux->prev_case);
959 case_clone (&aux->prev_case, c);
962 casefile_append (aux->casefile, c);
968 multipass_split_output (struct multipass_split_aux_data *aux)
970 assert (aux->casefile != NULL);
971 aux->split_func (aux->casefile, aux->func_aux);
972 casefile_destroy (aux->casefile);
973 aux->casefile = NULL;