1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
42 #include "value-labels.h"
45 Virtual File Manager (vfm):
47 vfm is used to process data files. It uses the model that
48 data is read from one stream (the data source), processed,
49 then written to another (the data sink). The data source is
50 then deleted and the data sink becomes the data source for the
53 /* Procedure execution data. */
54 struct write_case_data
56 /* Function to call for each case. */
57 int (*proc_func) (struct ccase *, void *); /* Function. */
58 void *aux; /* Auxiliary data. */
60 struct ccase *trns_case; /* Case used for transformations. */
61 struct ccase *sink_case; /* Case written to sink, if
62 compaction is necessary. */
63 size_t cases_written; /* Cases output so far. */
64 size_t cases_analyzed; /* Cases passed to procedure so far. */
67 /* The current active file, from which cases are read. */
68 struct case_source *vfm_source;
70 /* The replacement active file, to which cases are written. */
71 struct case_sink *vfm_sink;
73 /* Nonzero if the case needs to have values deleted before being
74 stored, zero otherwise. */
75 static int compaction_necessary;
77 /* Time at which vfm was last invoked. */
78 time_t last_vfm_invocation;
81 int n_lag; /* Number of cases to lag. */
82 static int lag_count; /* Number of cases in lag_queue so far. */
83 static int lag_head; /* Index where next case will be added. */
84 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
86 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
88 static struct ccase *create_trns_case (struct dictionary *);
89 static void open_active_file (void);
90 static int write_case (struct write_case_data *wc_data);
91 static int execute_transformations (struct ccase *c,
92 struct trns_header **trns,
93 int first_idx, int last_idx,
95 static int filter_case (const struct ccase *c, int case_num);
96 static void lag_case (const struct ccase *c);
97 static void compact_case (struct ccase *dest, const struct ccase *src);
98 static void clear_case (struct ccase *c);
99 static void close_active_file (void);
101 /* Public functions. */
103 /* Reads the data from the input program and writes it to a new
104 active file. For each case we read from the input program, we
107 1. Execute permanent transformations. If these drop the case,
108 start the next case from step 1.
110 2. N OF CASES. If we have already written N cases, start the
111 next case from step 1.
113 3. Write case to replacement active file.
115 4. Execute temporary transformations. If these drop the case,
116 start the next case from step 1.
118 5. FILTER, PROCESS IF. If these drop the case, start the next
121 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
122 cases, start the next case from step 1.
124 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
126 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
129 internal_procedure (proc_func, aux);
130 close_active_file ();
133 /* Executes a procedure, as procedure(), except that the caller
134 is responsible for calling open_active_file() and
135 close_active_file(). */
137 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
139 static int recursive_call;
141 struct write_case_data wc_data;
143 assert (++recursive_call == 1);
145 wc_data.proc_func = proc_func;
147 wc_data.trns_case = create_trns_case (default_dict);
148 wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
149 wc_data.cases_written = 0;
151 last_vfm_invocation = time (NULL);
153 if (vfm_source != NULL)
154 vfm_source->class->read (vfm_source,
156 write_case, &wc_data);
158 free (wc_data.sink_case);
159 free (wc_data.trns_case);
161 assert (--recursive_call == 0);
164 /* Creates and returns a case, initializing it from the vectors
165 that say which `value's need to be initialized just once, and
166 which ones need to be re-initialized before every case. */
167 static struct ccase *
168 create_trns_case (struct dictionary *dict)
170 struct ccase *c = xmalloc (dict_get_case_size (dict));
171 size_t var_cnt = dict_get_var_cnt (dict);
174 for (i = 0; i < var_cnt; i++)
176 struct variable *v = dict_get_var (dict, i);
178 if (v->type == NUMERIC)
181 c->data[v->fv].f = 0.0;
183 c->data[v->fv].f = SYSMIS;
186 memset (c->data[v->fv].s, ' ', v->width);
191 /* Makes all preparations for reading from the data source and writing
194 open_active_file (void)
196 /* Make temp_dict refer to the dictionary right before data
201 temp_dict = default_dict;
204 /* Figure out compaction. */
205 compaction_necessary = (dict_get_next_value_idx (temp_dict)
206 != dict_get_compacted_value_cnt (temp_dict));
209 if (vfm_sink == NULL)
210 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
211 if (vfm_sink->class->open != NULL)
212 vfm_sink->class->open (vfm_sink);
214 /* Allocate memory for lag queue. */
221 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
222 for (i = 0; i < n_lag; i++)
223 lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
226 /* Close any unclosed DO IF or LOOP constructs. */
227 discard_ctl_stack ();
230 /* Transforms trns_case and writes it to the replacement active
231 file if advisable. Returns nonzero if more cases can be
232 accepted, zero otherwise. Do not call this function again
233 after it has returned zero once. */
235 write_case (struct write_case_data *wc_data)
237 /* Execute permanent transformations. */
238 if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
239 wc_data->cases_written + 1))
243 if (dict_get_case_limit (default_dict)
244 && wc_data->cases_written >= dict_get_case_limit (default_dict))
246 wc_data->cases_written++;
248 /* Write case to LAG queue. */
250 lag_case (wc_data->trns_case);
252 /* Write case to replacement active file. */
253 if (vfm_sink->class->write != NULL)
255 if (compaction_necessary)
257 compact_case (wc_data->sink_case, wc_data->trns_case);
258 vfm_sink->class->write (vfm_sink, wc_data->sink_case);
261 vfm_sink->class->write (vfm_sink, wc_data->trns_case);
264 /* Execute temporary transformations. */
265 if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
266 wc_data->cases_written))
269 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
270 if (filter_case (wc_data->trns_case, wc_data->cases_written)
271 || (dict_get_case_limit (temp_dict)
272 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
274 wc_data->cases_analyzed++;
276 /* Pass case to procedure. */
277 if (wc_data->proc_func != NULL)
278 wc_data->proc_func (wc_data->trns_case, wc_data->aux);
281 clear_case (wc_data->trns_case);
285 /* Transforms case C using the transformations in TRNS[] with
286 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
287 become case CASE_NUM (1-based) in the output file. Returns
288 zero if the case was filtered out by one of the
289 transformations, nonzero otherwise. */
291 execute_transformations (struct ccase *c,
292 struct trns_header **trns,
293 int first_idx, int last_idx,
298 for (idx = first_idx; idx != last_idx; )
300 int retval = trns[idx]->proc (trns[idx], c, case_num);
319 /* Returns nonzero if case C with case number CASE_NUM should be
320 exclude as specified on FILTER or PROCESS IF, otherwise
323 filter_case (const struct ccase *c, int case_num)
326 struct variable *filter_var = dict_get_filter (default_dict);
327 if (filter_var != NULL)
329 double f = c->data[filter_var->fv].f;
330 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
335 if (process_if_expr != NULL
336 && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
342 /* Add C to the lag queue. */
344 lag_case (const struct ccase *c)
346 if (lag_count < n_lag)
348 memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
349 if (++lag_head >= n_lag)
353 /* Copies case SRC to case DEST, compacting it in the process. */
355 compact_case (struct ccase *dest, const struct ccase *src)
361 assert (compaction_necessary);
363 /* Copy all the variables except scratch variables from SRC to
365 /* FIXME: this should be temp_dict not default_dict I guess. */
366 var_cnt = dict_get_var_cnt (default_dict);
367 for (i = 0; i < var_cnt; i++)
369 struct variable *v = dict_get_var (default_dict, i);
371 if (dict_class_from_id (v->name) == DC_SCRATCH)
374 if (v->type == NUMERIC)
375 dest->data[nval++] = src->data[v->fv];
378 int w = DIV_RND_UP (v->width, sizeof (union value));
380 memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
386 /* Clears the variables in C that need to be cleared between
389 clear_case (struct ccase *c)
391 size_t var_cnt = dict_get_var_cnt (default_dict);
394 for (i = 0; i < var_cnt; i++)
396 struct variable *v = dict_get_var (default_dict, i);
397 if (v->init && v->reinit)
399 if (v->type == NUMERIC)
400 c->data[v->fv].f = SYSMIS;
402 memset (c->data[v->fv].s, ' ', v->width);
407 /* Closes the active file. */
409 close_active_file (void)
411 /* Free memory for lag queue, and turn off lagging. */
416 for (i = 0; i < n_lag; i++)
422 /* Dictionary from before TEMPORARY becomes permanent.. */
425 dict_destroy (default_dict);
426 default_dict = temp_dict;
430 /* Finish compaction. */
431 if (compaction_necessary)
432 dict_compact_values (default_dict);
434 /* Free data source. */
435 if (vfm_source != NULL)
437 if (vfm_source->class->destroy != NULL)
438 vfm_source->class->destroy (vfm_source);
442 /* Old data sink becomes new data source. */
443 if (vfm_sink->class->make_source != NULL)
444 vfm_source = vfm_sink->class->make_source (vfm_sink);
447 if (vfm_sink->class->destroy != NULL)
448 vfm_sink->class->destroy (vfm_sink);
451 free_case_sink (vfm_sink);
454 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
455 and get rid of all the transformations. */
457 expr_free (process_if_expr);
458 process_if_expr = NULL;
459 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
460 dict_set_filter (default_dict, NULL);
461 dict_set_case_limit (default_dict, 0);
462 dict_clear_vectors (default_dict);
463 cancel_transformations ();
466 /* Storage case stream. */
468 /* Information about storage sink or source. */
469 struct storage_stream_info
471 struct casefile *casefile; /* Storage. */
474 /* Initializes a storage sink. */
476 storage_sink_open (struct case_sink *sink)
478 struct storage_stream_info *info;
480 sink->aux = info = xmalloc (sizeof *info);
481 info->casefile = casefile_create (sink->value_cnt * sizeof (union value));
484 /* Destroys storage stream represented by INFO. */
486 destroy_storage_stream_info (struct storage_stream_info *info)
488 casefile_destroy (info->casefile);
492 /* Writes case C to the storage sink SINK. */
494 storage_sink_write (struct case_sink *sink, const struct ccase *c)
496 struct storage_stream_info *info = sink->aux;
498 casefile_append (info->casefile, c);
501 /* Destroys internal data in SINK. */
503 storage_sink_destroy (struct case_sink *sink)
505 destroy_storage_stream_info (sink->aux);
508 /* Closes and destroys the sink and returns a storage source to
509 read back the written data. */
510 static struct case_source *
511 storage_sink_make_source (struct case_sink *sink)
513 return create_case_source (&storage_source_class, sink->dict, sink->aux);
517 const struct case_sink_class storage_sink_class =
522 storage_sink_destroy,
523 storage_sink_make_source,
526 /* Storage source. */
528 /* Returns the number of cases that will be read by
529 storage_source_read(). */
531 storage_source_count (const struct case_source *source)
533 struct storage_stream_info *info = source->aux;
535 return casefile_get_case_cnt (info->casefile);
538 /* Reads all cases from the storage source and passes them one by one to
541 storage_source_read (struct case_source *source,
542 struct ccase *output_case,
543 write_case_func *write_case, write_case_data wc_data)
545 struct storage_stream_info *info = source->aux;
546 const struct ccase *casefile_case;
547 struct casereader *reader;
549 reader = casefile_get_reader (info->casefile);
550 while (casereader_read (reader, &casefile_case))
552 memcpy (output_case, casefile_case,
553 casefile_get_case_size (info->casefile));
554 write_case (wc_data);
556 casereader_destroy (reader);
559 /* Destroys the source's internal data. */
561 storage_source_destroy (struct case_source *source)
563 destroy_storage_stream_info (source->aux);
566 /* Storage source. */
567 const struct case_source_class storage_source_class =
570 storage_source_count,
572 storage_source_destroy,
576 storage_source_get_casefile (struct case_source *source)
578 struct storage_stream_info *info = source->aux;
580 assert (source->class == &storage_source_class);
581 return info->casefile;
584 /* Null sink. Used by a few procedures that keep track of output
585 themselves and would throw away anything that the sink
588 const struct case_sink_class null_sink_class =
597 /* Returns a pointer to the lagged case from N_BEFORE cases before the
598 current one, or NULL if there haven't been that many cases yet. */
600 lagged_case (int n_before)
602 assert (n_before <= n_lag);
603 if (n_before > lag_count)
607 int index = lag_head - n_before;
610 return lag_queue[index];
614 /* Appends TRNS to t_trns[], the list of all transformations to be
615 performed on data as it is read from the active file. */
617 add_transformation (struct trns_header * trns)
619 if (n_trns >= m_trns)
622 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
624 t_trns[n_trns] = trns;
625 trns->index = n_trns++;
628 /* Cancels all active transformations, including any transformations
629 created by the input program. */
631 cancel_transformations (void)
634 for (i = 0; i < n_trns; i++)
637 t_trns[i]->free (t_trns[i]);
648 /* Creates a case source with class CLASS and auxiliary data AUX
649 and based on dictionary DICT. */
651 create_case_source (const struct case_source_class *class,
652 const struct dictionary *dict,
655 struct case_source *source = xmalloc (sizeof *source);
656 source->class = class;
657 source->value_cnt = dict_get_next_value_idx (dict);
662 /* Returns nonzero if a case source is "complex". */
664 case_source_is_complex (const struct case_source *source)
666 return source != NULL && (source->class == &input_program_source_class
667 || source->class == &file_type_source_class);
670 /* Returns nonzero if CLASS is the class of SOURCE. */
672 case_source_is_class (const struct case_source *source,
673 const struct case_source_class *class)
675 return source != NULL && source->class == class;
678 /* Creates a case sink with class CLASS and auxiliary data
681 create_case_sink (const struct case_sink_class *class,
682 const struct dictionary *dict,
685 struct case_sink *sink = xmalloc (sizeof *sink);
688 sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
689 sink->value_cnt = dict_get_compacted_value_cnt (dict);
694 /* Destroys case sink SINK. It is the caller's responsible to
695 call the sink's destroy function, if any. */
697 free_case_sink (struct case_sink *sink)
699 free (sink->idx_to_fv);
703 /* Represents auxiliary data for handling SPLIT FILE. */
704 struct split_aux_data
706 size_t case_count; /* Number of cases so far. */
707 struct ccase *prev_case; /* Data in previous case. */
709 /* Functions to call... */
710 void (*begin_func) (void *); /* ...before data. */
711 int (*proc_func) (struct ccase *, void *); /* ...with data. */
712 void (*end_func) (void *); /* ...after data. */
713 void *func_aux; /* Auxiliary data. */
716 static int equal_splits (const struct ccase *, const struct ccase *);
717 static int procedure_with_splits_callback (struct ccase *, void *);
718 static void dump_splits (struct ccase *);
720 /* Like procedure(), but it automatically breaks the case stream
721 into SPLIT FILE break groups. Before each group of cases with
722 identical SPLIT FILE variable values, BEGIN_FUNC is called.
723 Then PROC_FUNC is called with each case in the group.
724 END_FUNC is called when the group is finished. FUNC_AUX is
725 passed to each of the functions as auxiliary data.
727 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
728 and END_FUNC will be called at all.
730 If SPLIT FILE is not in effect, then there is one break group
731 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
732 will be called once. */
734 procedure_with_splits (void (*begin_func) (void *aux),
735 int (*proc_func) (struct ccase *, void *aux),
736 void (*end_func) (void *aux),
739 struct split_aux_data split_aux;
741 split_aux.case_count = 0;
742 split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
743 split_aux.begin_func = begin_func;
744 split_aux.proc_func = proc_func;
745 split_aux.end_func = end_func;
746 split_aux.func_aux = func_aux;
748 procedure (procedure_with_splits_callback, &split_aux);
750 if (split_aux.case_count > 0 && end_func != NULL)
752 free (split_aux.prev_case);
755 /* procedure() callback used by procedure_with_splits(). */
757 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
759 struct split_aux_data *split_aux = split_aux_;
761 /* Start a new series if needed. */
762 if (split_aux->case_count == 0
763 || !equal_splits (c, split_aux->prev_case))
765 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
766 split_aux->end_func (split_aux->func_aux);
769 memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
771 if (split_aux->begin_func != NULL)
772 split_aux->begin_func (split_aux->func_aux);
775 split_aux->case_count++;
776 if (split_aux->proc_func != NULL)
777 return split_aux->proc_func (c, split_aux->func_aux);
782 /* Compares the SPLIT FILE variables in cases A and B and returns
783 nonzero only if they differ. */
785 equal_splits (const struct ccase *a, const struct ccase *b)
787 struct variable *const *split;
791 split = dict_get_split_vars (default_dict);
792 split_cnt = dict_get_split_cnt (default_dict);
793 for (i = 0; i < split_cnt; i++)
795 struct variable *v = split[i];
800 if (a->data[v->fv].f != b->data[v->fv].f)
804 if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
815 /* Dumps out the values of all the split variables for the case C. */
817 dump_splits (struct ccase *c)
819 struct variable *const *split;
824 split_cnt = dict_get_split_cnt (default_dict);
828 t = tab_create (3, split_cnt + 1, 0);
829 tab_dim (t, tab_natural_dimensions);
830 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
831 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
832 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
833 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
834 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
835 split = dict_get_split_vars (default_dict);
836 for (i = 0; i < split_cnt; i++)
838 struct variable *v = split[i];
842 assert (v->type == NUMERIC || v->type == ALPHA);
843 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
845 data_out (temp_buf, &v->print, &c->data[v->fv]);
847 temp_buf[v->print.w] = 0;
848 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
850 val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
852 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
854 tab_flags (t, SOMF_NO_TITLE);
858 /* Represents auxiliary data for handling SPLIT FILE in a
859 multipass procedure. */
860 struct multipass_split_aux_data
862 struct ccase *prev_case; /* Data in previous case. */
863 struct casefile *casefile; /* Accumulates data for a split. */
865 /* Function to call with the accumulated data. */
866 void (*split_func) (const struct casefile *, void *);
867 void *func_aux; /* Auxiliary data. */
870 static int multipass_split_callback (struct ccase *c, void *aux_);
871 static void multipass_split_output (struct multipass_split_aux_data *);
874 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
878 struct multipass_split_aux_data aux;
880 assert (split_func != NULL);
884 aux.prev_case = xmalloc (dict_get_case_size (default_dict));
886 aux.split_func = split_func;
887 aux.func_aux = func_aux;
889 internal_procedure (multipass_split_callback, &aux);
890 if (aux.casefile != NULL)
891 multipass_split_output (&aux);
892 free (aux.prev_case);
894 close_active_file ();
897 /* procedure() callback used by multipass_procedure_with_splits(). */
899 multipass_split_callback (struct ccase *c, void *aux_)
901 struct multipass_split_aux_data *aux = aux_;
903 /* Start a new series if needed. */
904 if (aux->casefile == NULL || !equal_splits (c, aux->prev_case))
906 /* Pass any cases to split_func. */
907 if (aux->casefile != NULL)
908 multipass_split_output (aux);
910 /* Start a new casefile. */
911 aux->casefile = casefile_create (dict_get_case_size (default_dict));
913 /* Record split values. */
915 memcpy (aux->prev_case, c, dict_get_case_size (default_dict));
918 casefile_append (aux->casefile, c);
924 multipass_split_output (struct multipass_split_aux_data *aux)
926 assert (aux->casefile != NULL);
927 aux->split_func (aux->casefile, aux->func_aux);
928 casefile_destroy (aux->casefile);
929 aux->casefile = NULL;