1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
34 #include "dictionary.h"
35 #include "ctl-stack.h"
37 #include "expressions/public.h"
38 #include "file-handle-def.h"
45 #include "value-labels.h"
48 #define _(msgid) gettext (msgid)
51 Virtual File Manager (vfm):
53 vfm is used to process data files. It uses the model that
54 data is read from one stream (the data source), processed,
55 then written to another (the data sink). The data source is
56 then deleted and the data sink becomes the data source for the
59 /* Procedure execution data. */
60 struct write_case_data
62 /* Function to call for each case. */
63 int (*proc_func) (struct ccase *, void *); /* Function. */
64 void *aux; /* Auxiliary data. */
66 struct ccase trns_case; /* Case used for transformations. */
67 struct ccase sink_case; /* Case written to sink, if
68 compaction is necessary. */
69 size_t cases_written; /* Cases output so far. */
70 size_t cases_analyzed; /* Cases passed to procedure so far. */
73 /* The current active file, from which cases are read. */
74 struct case_source *vfm_source;
76 /* The replacement active file, to which cases are written. */
77 struct case_sink *vfm_sink;
79 /* The compactor used to compact a compact, if necessary;
80 otherwise a null pointer. */
81 static struct dict_compactor *compactor;
83 /* Time at which vfm was last invoked. */
84 static time_t last_vfm_invocation;
87 int n_lag; /* Number of cases to lag. */
88 static int lag_count; /* Number of cases in lag_queue so far. */
89 static int lag_head; /* Index where next case will be added. */
90 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
92 static void internal_procedure (int (*proc_func) (struct ccase *, void *),
94 static void update_last_vfm_invocation (void);
95 static void create_trns_case (struct ccase *, struct dictionary *);
96 static void open_active_file (void);
97 static int write_case (struct write_case_data *wc_data);
98 static int execute_transformations (struct ccase *c,
99 struct transformation *trns,
100 int first_idx, int last_idx,
102 static int filter_case (const struct ccase *c, int case_num);
103 static void lag_case (const struct ccase *c);
104 static void clear_case (struct ccase *c);
105 static void close_active_file (void);
107 /* Public functions. */
109 /* Returns the last time the data was read. */
111 vfm_last_invocation (void)
113 if (last_vfm_invocation == 0)
114 update_last_vfm_invocation ();
115 return last_vfm_invocation;
118 /* Reads the data from the input program and writes it to a new
119 active file. For each case we read from the input program, we
122 1. Execute permanent transformations. If these drop the case,
123 start the next case from step 1.
125 2. N OF CASES. If we have already written N cases, start the
126 next case from step 1.
128 3. Write case to replacement active file.
130 4. Execute temporary transformations. If these drop the case,
131 start the next case from step 1.
133 5. FILTER, PROCESS IF. If these drop the case, start the next
136 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
137 cases, start the next case from step 1.
139 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
141 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
143 if (proc_func == NULL
144 && case_source_is_class (vfm_source, &storage_source_class)
150 update_last_vfm_invocation ();
155 internal_procedure (proc_func, aux);
156 close_active_file ();
159 /* Executes a procedure, as procedure(), except that the caller
160 is responsible for calling open_active_file() and
161 close_active_file(). */
163 internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux)
165 static int recursive_call;
167 struct write_case_data wc_data;
169 assert (++recursive_call == 1);
171 wc_data.proc_func = proc_func;
173 create_trns_case (&wc_data.trns_case, default_dict);
174 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
175 wc_data.cases_written = 0;
177 update_last_vfm_invocation ();
179 if (vfm_source != NULL)
180 vfm_source->class->read (vfm_source,
182 write_case, &wc_data);
184 case_destroy (&wc_data.sink_case);
185 case_destroy (&wc_data.trns_case);
187 assert (--recursive_call == 0);
190 /* Updates last_vfm_invocation. */
192 update_last_vfm_invocation (void)
194 last_vfm_invocation = time (NULL);
197 /* Creates and returns a case, initializing it from the vectors
198 that say which `value's need to be initialized just once, and
199 which ones need to be re-initialized before every case. */
201 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
203 size_t var_cnt = dict_get_var_cnt (dict);
206 case_create (trns_case, dict_get_next_value_idx (dict));
207 for (i = 0; i < var_cnt; i++)
209 struct variable *v = dict_get_var (dict, i);
210 union value *value = case_data_rw (trns_case, v->fv);
212 if (v->type == NUMERIC)
213 value->f = v->reinit ? 0.0 : SYSMIS;
215 memset (value->s, ' ', v->width);
219 /* Makes all preparations for reading from the data source and writing
222 open_active_file (void)
224 /* Make temp_dict refer to the dictionary right before data
229 temp_dict = default_dict;
232 /* Figure out compaction. */
233 compactor = (dict_needs_compaction (temp_dict)
234 ? dict_make_compactor (temp_dict)
238 if (vfm_sink == NULL)
239 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
240 if (vfm_sink->class->open != NULL)
241 vfm_sink->class->open (vfm_sink);
243 /* Allocate memory for lag queue. */
250 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
251 for (i = 0; i < n_lag; i++)
252 case_nullify (&lag_queue[i]);
255 /* Close any unclosed DO IF or LOOP constructs. */
259 /* Transforms trns_case and writes it to the replacement active
260 file if advisable. Returns nonzero if more cases can be
261 accepted, zero otherwise. Do not call this function again
262 after it has returned zero once. */
264 write_case (struct write_case_data *wc_data)
266 /* Execute permanent transformations. */
267 if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
268 wc_data->cases_written + 1))
272 if (dict_get_case_limit (default_dict)
273 && wc_data->cases_written >= dict_get_case_limit (default_dict))
275 wc_data->cases_written++;
277 /* Write case to LAG queue. */
279 lag_case (&wc_data->trns_case);
281 /* Write case to replacement active file. */
282 if (vfm_sink->class->write != NULL)
284 if (compactor != NULL)
286 dict_compactor_compact (compactor, &wc_data->sink_case,
287 &wc_data->trns_case);
288 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
291 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
294 /* Execute temporary transformations. */
295 if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
296 wc_data->cases_written))
299 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
300 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
301 || (dict_get_case_limit (temp_dict)
302 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
304 wc_data->cases_analyzed++;
306 /* Pass case to procedure. */
307 if (wc_data->proc_func != NULL)
308 wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
311 clear_case (&wc_data->trns_case);
315 /* Transforms case C using the transformations in TRNS[] with
316 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
317 become case CASE_NUM (1-based) in the output file. Returns
318 zero if the case was filtered out by one of the
319 transformations, nonzero otherwise. */
321 execute_transformations (struct ccase *c,
322 struct transformation *trns,
323 int first_idx, int last_idx,
328 for (idx = first_idx; idx != last_idx; )
330 struct transformation *t = &trns[idx];
331 int retval = t->proc (t->private, c, case_num);
350 /* Returns nonzero if case C with case number CASE_NUM should be
351 exclude as specified on FILTER or PROCESS IF, otherwise
354 filter_case (const struct ccase *c, int case_idx)
357 struct variable *filter_var = dict_get_filter (default_dict);
358 if (filter_var != NULL)
360 double f = case_num (c, filter_var->fv);
361 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
366 if (process_if_expr != NULL
367 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
373 /* Add C to the lag queue. */
375 lag_case (const struct ccase *c)
377 if (lag_count < n_lag)
379 case_destroy (&lag_queue[lag_head]);
380 case_clone (&lag_queue[lag_head], c);
381 if (++lag_head >= n_lag)
385 /* Clears the variables in C that need to be cleared between
388 clear_case (struct ccase *c)
390 size_t var_cnt = dict_get_var_cnt (default_dict);
393 for (i = 0; i < var_cnt; i++)
395 struct variable *v = dict_get_var (default_dict, i);
396 if (v->init && v->reinit)
398 if (v->type == NUMERIC)
399 case_data_rw (c, v->fv)->f = SYSMIS;
401 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
406 /* Closes the active file. */
408 close_active_file (void)
410 /* Free memory for lag queue, and turn off lagging. */
415 for (i = 0; i < n_lag; i++)
416 case_destroy (&lag_queue[i]);
421 /* Dictionary from before TEMPORARY becomes permanent.. */
424 dict_destroy (default_dict);
425 default_dict = temp_dict;
429 /* Finish compaction. */
430 if (compactor != NULL)
432 dict_compactor_destroy (compactor);
433 dict_compact_values (default_dict);
436 /* Free data source. */
437 free_case_source (vfm_source);
440 /* Old data sink becomes new data source. */
441 if (vfm_sink->class->make_source != NULL)
442 vfm_source = vfm_sink->class->make_source (vfm_sink);
443 free_case_sink (vfm_sink);
446 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
447 and get rid of all the transformations. */
449 expr_free (process_if_expr);
450 process_if_expr = NULL;
451 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
452 dict_set_filter (default_dict, NULL);
453 dict_set_case_limit (default_dict, 0);
454 dict_clear_vectors (default_dict);
455 cancel_transformations ();
458 /* Storage case stream. */
460 /* Information about storage sink or source. */
461 struct storage_stream_info
463 struct casefile *casefile; /* Storage. */
466 /* Initializes a storage sink. */
468 storage_sink_open (struct case_sink *sink)
470 struct storage_stream_info *info;
472 sink->aux = info = xmalloc (sizeof *info);
473 info->casefile = casefile_create (sink->value_cnt);
476 /* Destroys storage stream represented by INFO. */
478 destroy_storage_stream_info (struct storage_stream_info *info)
482 casefile_destroy (info->casefile);
487 /* Writes case C to the storage sink SINK. */
489 storage_sink_write (struct case_sink *sink, const struct ccase *c)
491 struct storage_stream_info *info = sink->aux;
493 casefile_append (info->casefile, c);
496 /* Destroys internal data in SINK. */
498 storage_sink_destroy (struct case_sink *sink)
500 destroy_storage_stream_info (sink->aux);
503 /* Closes the sink and returns a storage source to read back the
505 static struct case_source *
506 storage_sink_make_source (struct case_sink *sink)
508 struct case_source *source
509 = create_case_source (&storage_source_class, sink->aux);
515 const struct case_sink_class storage_sink_class =
520 storage_sink_destroy,
521 storage_sink_make_source,
524 /* Storage source. */
526 /* Returns the number of cases that will be read by
527 storage_source_read(). */
529 storage_source_count (const struct case_source *source)
531 struct storage_stream_info *info = source->aux;
533 return casefile_get_case_cnt (info->casefile);
536 /* Reads all cases from the storage source and passes them one by one to
539 storage_source_read (struct case_source *source,
540 struct ccase *output_case,
541 write_case_func *write_case, write_case_data wc_data)
543 struct storage_stream_info *info = source->aux;
544 struct ccase casefile_case;
545 struct casereader *reader;
547 for (reader = casefile_get_reader (info->casefile);
548 casereader_read (reader, &casefile_case);
549 case_destroy (&casefile_case))
551 case_copy (output_case, 0,
553 casefile_get_value_cnt (info->casefile));
554 write_case (wc_data);
556 casereader_destroy (reader);
559 /* Destroys the source's internal data. */
561 storage_source_destroy (struct case_source *source)
563 destroy_storage_stream_info (source->aux);
566 /* Storage source. */
567 const struct case_source_class storage_source_class =
570 storage_source_count,
572 storage_source_destroy,
576 storage_source_get_casefile (struct case_source *source)
578 struct storage_stream_info *info = source->aux;
580 assert (source->class == &storage_source_class);
581 return info->casefile;
585 storage_source_create (struct casefile *cf)
587 struct storage_stream_info *info;
589 info = xmalloc (sizeof *info);
592 return create_case_source (&storage_source_class, info);
595 /* Null sink. Used by a few procedures that keep track of output
596 themselves and would throw away anything that the sink
599 const struct case_sink_class null_sink_class =
608 /* Returns a pointer to the lagged case from N_BEFORE cases before the
609 current one, or NULL if there haven't been that many cases yet. */
611 lagged_case (int n_before)
613 assert (n_before >= 1 );
614 assert (n_before <= n_lag);
616 if (n_before <= lag_count)
618 int index = lag_head - n_before;
621 return &lag_queue[index];
627 /* Appends TRNS to t_trns[], the list of all transformations to be
628 performed on data as it is read from the active file. */
630 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
632 struct transformation *trns;
633 if (n_trns >= m_trns)
634 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
635 trns = &t_trns[n_trns++];
638 trns->private = private;
641 /* Returns the index number that the next transformation added by
642 add_transformation() will receive. A trns_proc_func that
643 returns this index causes control flow to jump to it. */
645 next_transformation (void)
650 /* Cancels all active transformations, including any transformations
651 created by the input program. */
653 cancel_transformations (void)
656 for (i = 0; i < n_trns; i++)
658 struct transformation *t = &t_trns[i];
660 t->free (t->private);
668 /* Creates a case source with class CLASS and auxiliary data AUX
669 and based on dictionary DICT. */
671 create_case_source (const struct case_source_class *class,
674 struct case_source *source = xmalloc (sizeof *source);
675 source->class = class;
680 /* Destroys case source SOURCE. It is the caller's responsible to
681 call the source's destroy function, if any. */
683 free_case_source (struct case_source *source)
687 if (source->class->destroy != NULL)
688 source->class->destroy (source);
693 /* Returns nonzero if a case source is "complex". */
695 case_source_is_complex (const struct case_source *source)
697 return source != NULL && (source->class == &input_program_source_class
698 || source->class == &file_type_source_class);
701 /* Returns nonzero if CLASS is the class of SOURCE. */
703 case_source_is_class (const struct case_source *source,
704 const struct case_source_class *class)
706 return source != NULL && source->class == class;
709 /* Creates a case sink to accept cases from the given DICT with
710 class CLASS and auxiliary data AUX. */
712 create_case_sink (const struct case_sink_class *class,
713 const struct dictionary *dict,
716 struct case_sink *sink = xmalloc (sizeof *sink);
718 sink->value_cnt = dict_get_compacted_value_cnt (dict);
723 /* Destroys case sink SINK. */
725 free_case_sink (struct case_sink *sink)
729 if (sink->class->destroy != NULL)
730 sink->class->destroy (sink);
735 /* Represents auxiliary data for handling SPLIT FILE. */
736 struct split_aux_data
738 size_t case_count; /* Number of cases so far. */
739 struct ccase prev_case; /* Data in previous case. */
741 /* Functions to call... */
742 void (*begin_func) (void *); /* ...before data. */
743 int (*proc_func) (struct ccase *, void *); /* ...with data. */
744 void (*end_func) (void *); /* ...after data. */
745 void *func_aux; /* Auxiliary data. */
748 static int equal_splits (const struct ccase *, const struct ccase *);
749 static int procedure_with_splits_callback (struct ccase *, void *);
750 static void dump_splits (struct ccase *);
752 /* Like procedure(), but it automatically breaks the case stream
753 into SPLIT FILE break groups. Before each group of cases with
754 identical SPLIT FILE variable values, BEGIN_FUNC is called.
755 Then PROC_FUNC is called with each case in the group.
756 END_FUNC is called when the group is finished. FUNC_AUX is
757 passed to each of the functions as auxiliary data.
759 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
760 and END_FUNC will be called at all.
762 If SPLIT FILE is not in effect, then there is one break group
763 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
764 will be called once. */
766 procedure_with_splits (void (*begin_func) (void *aux),
767 int (*proc_func) (struct ccase *, void *aux),
768 void (*end_func) (void *aux),
771 struct split_aux_data split_aux;
773 split_aux.case_count = 0;
774 case_nullify (&split_aux.prev_case);
775 split_aux.begin_func = begin_func;
776 split_aux.proc_func = proc_func;
777 split_aux.end_func = end_func;
778 split_aux.func_aux = func_aux;
781 internal_procedure (procedure_with_splits_callback, &split_aux);
782 if (split_aux.case_count > 0 && end_func != NULL)
784 close_active_file ();
786 case_destroy (&split_aux.prev_case);
789 /* procedure() callback used by procedure_with_splits(). */
791 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
793 struct split_aux_data *split_aux = split_aux_;
795 /* Start a new series if needed. */
796 if (split_aux->case_count == 0
797 || !equal_splits (c, &split_aux->prev_case))
799 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
800 split_aux->end_func (split_aux->func_aux);
803 case_destroy (&split_aux->prev_case);
804 case_clone (&split_aux->prev_case, c);
806 if (split_aux->begin_func != NULL)
807 split_aux->begin_func (split_aux->func_aux);
810 split_aux->case_count++;
811 if (split_aux->proc_func != NULL)
812 return split_aux->proc_func (c, split_aux->func_aux);
817 /* Compares the SPLIT FILE variables in cases A and B and returns
818 nonzero only if they differ. */
820 equal_splits (const struct ccase *a, const struct ccase *b)
822 return case_compare (a, b,
823 dict_get_split_vars (default_dict),
824 dict_get_split_cnt (default_dict)) == 0;
827 /* Dumps out the values of all the split variables for the case C. */
829 dump_splits (struct ccase *c)
831 struct variable *const *split;
836 split_cnt = dict_get_split_cnt (default_dict);
840 t = tab_create (3, split_cnt + 1, 0);
841 tab_dim (t, tab_natural_dimensions);
842 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
843 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
844 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
845 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
846 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
847 split = dict_get_split_vars (default_dict);
848 for (i = 0; i < split_cnt; i++)
850 struct variable *v = split[i];
854 assert (v->type == NUMERIC || v->type == ALPHA);
855 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
857 data_out (temp_buf, &v->print, case_data (c, v->fv));
859 temp_buf[v->print.w] = 0;
860 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
862 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
864 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
866 tab_flags (t, SOMF_NO_TITLE);
870 /* Represents auxiliary data for handling SPLIT FILE in a
871 multipass procedure. */
872 struct multipass_split_aux_data
874 struct ccase prev_case; /* Data in previous case. */
875 struct casefile *casefile; /* Accumulates data for a split. */
877 /* Function to call with the accumulated data. */
878 void (*split_func) (const struct casefile *, void *);
879 void *func_aux; /* Auxiliary data. */
882 static int multipass_split_callback (struct ccase *c, void *aux_);
883 static void multipass_split_output (struct multipass_split_aux_data *);
886 multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
890 struct multipass_split_aux_data aux;
892 assert (split_func != NULL);
896 case_nullify (&aux.prev_case);
898 aux.split_func = split_func;
899 aux.func_aux = func_aux;
901 internal_procedure (multipass_split_callback, &aux);
902 if (aux.casefile != NULL)
903 multipass_split_output (&aux);
904 case_destroy (&aux.prev_case);
906 close_active_file ();
909 /* procedure() callback used by multipass_procedure_with_splits(). */
911 multipass_split_callback (struct ccase *c, void *aux_)
913 struct multipass_split_aux_data *aux = aux_;
915 /* Start a new series if needed. */
916 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
918 /* Pass any cases to split_func. */
919 if (aux->casefile != NULL)
920 multipass_split_output (aux);
922 /* Start a new casefile. */
923 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
925 /* Record split values. */
927 case_destroy (&aux->prev_case);
928 case_clone (&aux->prev_case, c);
931 casefile_append (aux->casefile, c);
937 multipass_split_output (struct multipass_split_aux_data *aux)
939 assert (aux->casefile != NULL);
940 aux->split_func (aux->casefile, aux->func_aux);
941 casefile_destroy (aux->casefile);
942 aux->casefile = NULL;
946 /* Discards all the current state in preparation for a data-input
947 command like DATA LIST or GET. */
949 discard_variables (void)
951 dict_clear (default_dict);
952 fh_set_default_handle (NULL);
956 if (vfm_source != NULL)
958 free_case_source (vfm_source);
962 cancel_transformations ();
966 expr_free (process_if_expr);
967 process_if_expr = NULL;
971 pgm_state = STATE_INIT;