1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <procedure.h>
22 #include <libpspp/message.h>
27 #include <unistd.h> /* Required by SunOS4. */
29 #include <libpspp/alloc.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <language/command.h>
33 #include <data/dictionary.h>
34 #include <language/control/control-stack.h>
35 #include <libpspp/message.h>
36 #include "expressions/public.h"
37 #include <data/file-handle-def.h>
38 #include <libpspp/misc.h>
39 #include <data/settings.h>
40 #include <output/manager.h>
41 #include <output/table.h>
42 #include <libpspp/str.h>
43 #include <data/variable.h>
44 #include <data/value-labels.h>
47 #define _(msgid) gettext (msgid)
50 Virtual File Manager (vfm):
52 vfm is used to process data files. It uses the model that
53 data is read from one stream (the data source), processed,
54 then written to another (the data sink). The data source is
55 then deleted and the data sink becomes the data source for the
58 /* Procedure execution data. */
59 struct write_case_data
61 /* Function to call for each case. */
62 bool (*proc_func) (struct ccase *, void *); /* Function. */
63 void *aux; /* Auxiliary data. */
65 struct ccase trns_case; /* Case used for transformations. */
66 struct ccase sink_case; /* Case written to sink, if
67 compaction is necessary. */
68 size_t cases_written; /* Cases output so far. */
69 size_t cases_analyzed; /* Cases passed to procedure so far. */
72 /* The current active file, from which cases are read. */
73 struct case_source *vfm_source;
75 /* The replacement active file, to which cases are written. */
76 struct case_sink *vfm_sink;
78 /* The compactor used to compact a compact, if necessary;
79 otherwise a null pointer. */
80 static struct dict_compactor *compactor;
82 /* Time at which vfm was last invoked. */
83 static time_t last_vfm_invocation;
86 int n_lag; /* Number of cases to lag. */
87 static int lag_count; /* Number of cases in lag_queue so far. */
88 static int lag_head; /* Index where next case will be added. */
89 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
91 /* Active transformations. */
92 struct transformation *t_trns;
93 size_t n_trns, m_trns, f_trns;
95 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
97 static void update_last_vfm_invocation (void);
98 static void create_trns_case (struct ccase *, struct dictionary *);
99 static void open_active_file (void);
100 static bool write_case (struct write_case_data *wc_data);
101 static int execute_transformations (struct ccase *c,
102 struct transformation *trns,
103 int first_idx, int last_idx,
105 static int filter_case (const struct ccase *c, int case_num);
106 static void lag_case (const struct ccase *c);
107 static void clear_case (struct ccase *c);
108 static bool close_active_file (void);
110 /* Public functions. */
112 /* Returns the last time the data was read. */
114 vfm_last_invocation (void)
116 if (last_vfm_invocation == 0)
117 update_last_vfm_invocation ();
118 return last_vfm_invocation;
121 /* Reads the data from the input program and writes it to a new
122 active file. For each case we read from the input program, we
125 1. Execute permanent transformations. If these drop the case,
126 start the next case from step 1.
128 2. N OF CASES. If we have already written N cases, start the
129 next case from step 1.
131 3. Write case to replacement active file.
133 4. Execute temporary transformations. If these drop the case,
134 start the next case from step 1.
136 5. FILTER, PROCESS IF. If these drop the case, start the next
139 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
140 cases, start the next case from step 1.
142 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
144 Returns true if successful, false if an I/O error occurred. */
146 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
148 if (proc_func == NULL
149 && case_source_is_class (vfm_source, &storage_source_class)
155 update_last_vfm_invocation ();
163 ok = internal_procedure (proc_func, aux);
164 if (!close_active_file ())
171 /* Executes a procedure, as procedure(), except that the caller
172 is responsible for calling open_active_file() and
174 Returns true if successful, false if an I/O error occurred. */
176 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
178 static int recursive_call;
179 struct write_case_data wc_data;
182 assert (++recursive_call == 1);
184 wc_data.proc_func = proc_func;
186 create_trns_case (&wc_data.trns_case, default_dict);
187 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
188 wc_data.cases_written = 0;
190 update_last_vfm_invocation ();
192 ok = (vfm_source == NULL
193 || vfm_source->class->read (vfm_source,
195 write_case, &wc_data));
197 case_destroy (&wc_data.sink_case);
198 case_destroy (&wc_data.trns_case);
200 assert (--recursive_call == 0);
205 /* Updates last_vfm_invocation. */
207 update_last_vfm_invocation (void)
209 last_vfm_invocation = time (NULL);
212 /* Creates and returns a case, initializing it from the vectors
213 that say which `value's need to be initialized just once, and
214 which ones need to be re-initialized before every case. */
216 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
218 size_t var_cnt = dict_get_var_cnt (dict);
221 case_create (trns_case, dict_get_next_value_idx (dict));
222 for (i = 0; i < var_cnt; i++)
224 struct variable *v = dict_get_var (dict, i);
225 union value *value = case_data_rw (trns_case, v->fv);
227 if (v->type == NUMERIC)
228 value->f = v->reinit ? 0.0 : SYSMIS;
230 memset (value->s, ' ', v->width);
234 /* Makes all preparations for reading from the data source and writing
237 open_active_file (void)
239 /* Make temp_dict refer to the dictionary right before data
244 temp_dict = default_dict;
247 /* Figure out compaction. */
248 compactor = (dict_needs_compaction (temp_dict)
249 ? dict_make_compactor (temp_dict)
253 if (vfm_sink == NULL)
254 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
255 if (vfm_sink->class->open != NULL)
256 vfm_sink->class->open (vfm_sink);
258 /* Allocate memory for lag queue. */
265 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
266 for (i = 0; i < n_lag; i++)
267 case_nullify (&lag_queue[i]);
270 /* Close any unclosed DO IF or LOOP constructs. */
274 /* Transforms trns_case and writes it to the replacement active
275 file if advisable. Returns nonzero if more cases can be
276 accepted, zero otherwise. Do not call this function again
277 after it has returned zero once. */
279 write_case (struct write_case_data *wc_data)
283 /* Execute permanent transformations. */
284 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
285 temp_trns, wc_data->cases_written + 1);
290 if (dict_get_case_limit (default_dict)
291 && wc_data->cases_written >= dict_get_case_limit (default_dict))
293 wc_data->cases_written++;
295 /* Write case to LAG queue. */
297 lag_case (&wc_data->trns_case);
299 /* Write case to replacement active file. */
300 if (vfm_sink->class->write != NULL)
302 if (compactor != NULL)
304 dict_compactor_compact (compactor, &wc_data->sink_case,
305 &wc_data->trns_case);
306 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
309 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
312 /* Execute temporary transformations. */
313 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
314 n_trns, wc_data->cases_written);
318 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
319 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
320 || (dict_get_case_limit (temp_dict)
321 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
323 wc_data->cases_analyzed++;
325 /* Pass case to procedure. */
326 if (wc_data->proc_func != NULL)
327 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
331 clear_case (&wc_data->trns_case);
335 /* Transforms case C using the transformations in TRNS[] with
336 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
337 become case CASE_NUM (1-based) in the output file. Returns 1
338 if the case was successfully transformed, 0 if it was filtered
339 out by one of the transformations, or -1 if the procedure
340 should be abandoned due to a fatal error. */
342 execute_transformations (struct ccase *c,
343 struct transformation *trns,
344 int first_idx, int last_idx,
349 for (idx = first_idx; idx != last_idx; )
351 struct transformation *t = &trns[idx];
352 int retval = t->proc (t->private, c, case_num);
380 /* Returns nonzero if case C with case number CASE_NUM should be
381 exclude as specified on FILTER or PROCESS IF, otherwise
384 filter_case (const struct ccase *c, int case_idx)
387 struct variable *filter_var = dict_get_filter (default_dict);
388 if (filter_var != NULL)
390 double f = case_num (c, filter_var->fv);
391 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
396 if (process_if_expr != NULL
397 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
403 /* Add C to the lag queue. */
405 lag_case (const struct ccase *c)
407 if (lag_count < n_lag)
409 case_destroy (&lag_queue[lag_head]);
410 case_clone (&lag_queue[lag_head], c);
411 if (++lag_head >= n_lag)
415 /* Clears the variables in C that need to be cleared between
418 clear_case (struct ccase *c)
420 size_t var_cnt = dict_get_var_cnt (default_dict);
423 for (i = 0; i < var_cnt; i++)
425 struct variable *v = dict_get_var (default_dict, i);
426 if (v->init && v->reinit)
428 if (v->type == NUMERIC)
429 case_data_rw (c, v->fv)->f = SYSMIS;
431 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
436 /* Closes the active file. */
438 close_active_file (void)
440 /* Free memory for lag queue, and turn off lagging. */
445 for (i = 0; i < n_lag; i++)
446 case_destroy (&lag_queue[i]);
451 /* Dictionary from before TEMPORARY becomes permanent.. */
454 dict_destroy (default_dict);
455 default_dict = temp_dict;
459 /* Finish compaction. */
460 if (compactor != NULL)
462 dict_compactor_destroy (compactor);
463 dict_compact_values (default_dict);
466 /* Free data source. */
467 free_case_source (vfm_source);
470 /* Old data sink becomes new data source. */
471 if (vfm_sink->class->make_source != NULL)
472 vfm_source = vfm_sink->class->make_source (vfm_sink);
473 free_case_sink (vfm_sink);
476 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
477 and get rid of all the transformations. */
479 expr_free (process_if_expr);
480 process_if_expr = NULL;
481 dict_set_case_limit (default_dict, 0);
482 dict_clear_vectors (default_dict);
483 return cancel_transformations ();
486 /* Storage case stream. */
488 /* Information about storage sink or source. */
489 struct storage_stream_info
491 struct casefile *casefile; /* Storage. */
494 /* Initializes a storage sink. */
496 storage_sink_open (struct case_sink *sink)
498 struct storage_stream_info *info;
500 sink->aux = info = xmalloc (sizeof *info);
501 info->casefile = casefile_create (sink->value_cnt);
504 /* Destroys storage stream represented by INFO. */
506 destroy_storage_stream_info (struct storage_stream_info *info)
510 casefile_destroy (info->casefile);
515 /* Writes case C to the storage sink SINK.
516 Returns true if successful, false if an I/O error occurred. */
518 storage_sink_write (struct case_sink *sink, const struct ccase *c)
520 struct storage_stream_info *info = sink->aux;
522 return casefile_append (info->casefile, c);
525 /* Destroys internal data in SINK. */
527 storage_sink_destroy (struct case_sink *sink)
529 destroy_storage_stream_info (sink->aux);
532 /* Closes the sink and returns a storage source to read back the
534 static struct case_source *
535 storage_sink_make_source (struct case_sink *sink)
537 struct case_source *source
538 = create_case_source (&storage_source_class, sink->aux);
544 const struct case_sink_class storage_sink_class =
549 storage_sink_destroy,
550 storage_sink_make_source,
553 /* Storage source. */
555 /* Returns the number of cases that will be read by
556 storage_source_read(). */
558 storage_source_count (const struct case_source *source)
560 struct storage_stream_info *info = source->aux;
562 return casefile_get_case_cnt (info->casefile);
565 /* Reads all cases from the storage source and passes them one by one to
568 storage_source_read (struct case_source *source,
569 struct ccase *output_case,
570 write_case_func *write_case, write_case_data wc_data)
572 struct storage_stream_info *info = source->aux;
573 struct ccase casefile_case;
574 struct casereader *reader;
577 for (reader = casefile_get_reader (info->casefile);
578 ok && casereader_read (reader, &casefile_case);
579 case_destroy (&casefile_case))
581 case_copy (output_case, 0,
583 casefile_get_value_cnt (info->casefile));
584 ok = write_case (wc_data);
586 casereader_destroy (reader);
591 /* Destroys the source's internal data. */
593 storage_source_destroy (struct case_source *source)
595 destroy_storage_stream_info (source->aux);
598 /* Storage source. */
599 const struct case_source_class storage_source_class =
602 storage_source_count,
604 storage_source_destroy,
608 storage_source_get_casefile (struct case_source *source)
610 struct storage_stream_info *info = source->aux;
612 assert (source->class == &storage_source_class);
613 return info->casefile;
617 storage_source_create (struct casefile *cf)
619 struct storage_stream_info *info;
621 info = xmalloc (sizeof *info);
624 return create_case_source (&storage_source_class, info);
627 /* Null sink. Used by a few procedures that keep track of output
628 themselves and would throw away anything that the sink
631 const struct case_sink_class null_sink_class =
640 /* Returns a pointer to the lagged case from N_BEFORE cases before the
641 current one, or NULL if there haven't been that many cases yet. */
643 lagged_case (int n_before)
645 assert (n_before >= 1 );
646 assert (n_before <= n_lag);
648 if (n_before <= lag_count)
650 int index = lag_head - n_before;
653 return &lag_queue[index];
659 /* Appends TRNS to t_trns[], the list of all transformations to be
660 performed on data as it is read from the active file. */
662 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
664 struct transformation *trns;
665 if (n_trns >= m_trns)
666 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
667 trns = &t_trns[n_trns++];
670 trns->private = private;
673 /* Returns the index number that the next transformation added by
674 add_transformation() will receive. A trns_proc_func that
675 returns this index causes control flow to jump to it. */
677 next_transformation (void)
682 /* Cancels all active transformations, including any transformations
683 created by the input program.
684 Returns true if successful, false if an I/O error occurred. */
686 cancel_transformations (void)
690 for (i = 0; i < n_trns; i++)
692 struct transformation *t = &t_trns[i];
695 if (!t->free (t->private))
706 /* Creates a case source with class CLASS and auxiliary data AUX
707 and based on dictionary DICT. */
709 create_case_source (const struct case_source_class *class,
712 struct case_source *source = xmalloc (sizeof *source);
713 source->class = class;
718 /* Destroys case source SOURCE. It is the caller's responsible to
719 call the source's destroy function, if any. */
721 free_case_source (struct case_source *source)
725 if (source->class->destroy != NULL)
726 source->class->destroy (source);
731 /* Returns nonzero if a case source is "complex". */
733 case_source_is_complex (const struct case_source *source)
735 return source != NULL && (source->class == &input_program_source_class
736 || source->class == &file_type_source_class);
739 /* Returns nonzero if CLASS is the class of SOURCE. */
741 case_source_is_class (const struct case_source *source,
742 const struct case_source_class *class)
744 return source != NULL && source->class == class;
747 /* Creates a case sink to accept cases from the given DICT with
748 class CLASS and auxiliary data AUX. */
750 create_case_sink (const struct case_sink_class *class,
751 const struct dictionary *dict,
754 struct case_sink *sink = xmalloc (sizeof *sink);
756 sink->value_cnt = dict_get_compacted_value_cnt (dict);
761 /* Destroys case sink SINK. */
763 free_case_sink (struct case_sink *sink)
767 if (sink->class->destroy != NULL)
768 sink->class->destroy (sink);
773 /* Represents auxiliary data for handling SPLIT FILE. */
774 struct split_aux_data
776 size_t case_count; /* Number of cases so far. */
777 struct ccase prev_case; /* Data in previous case. */
779 /* Functions to call... */
780 void (*begin_func) (void *); /* ...before data. */
781 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
782 void (*end_func) (void *); /* ...after data. */
783 void *func_aux; /* Auxiliary data. */
786 static int equal_splits (const struct ccase *, const struct ccase *);
787 static bool procedure_with_splits_callback (struct ccase *, void *);
788 static void dump_splits (struct ccase *);
790 /* Like procedure(), but it automatically breaks the case stream
791 into SPLIT FILE break groups. Before each group of cases with
792 identical SPLIT FILE variable values, BEGIN_FUNC is called.
793 Then PROC_FUNC is called with each case in the group.
794 END_FUNC is called when the group is finished. FUNC_AUX is
795 passed to each of the functions as auxiliary data.
797 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
798 and END_FUNC will be called at all.
800 If SPLIT FILE is not in effect, then there is one break group
801 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
804 Returns true if successful, false if an I/O error occurred. */
806 procedure_with_splits (void (*begin_func) (void *aux),
807 bool (*proc_func) (struct ccase *, void *aux),
808 void (*end_func) (void *aux),
811 struct split_aux_data split_aux;
814 split_aux.case_count = 0;
815 case_nullify (&split_aux.prev_case);
816 split_aux.begin_func = begin_func;
817 split_aux.proc_func = proc_func;
818 split_aux.end_func = end_func;
819 split_aux.func_aux = func_aux;
822 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
823 if (split_aux.case_count > 0 && end_func != NULL)
825 if (!close_active_file ())
828 case_destroy (&split_aux.prev_case);
833 /* procedure() callback used by procedure_with_splits(). */
835 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
837 struct split_aux_data *split_aux = split_aux_;
839 /* Start a new series if needed. */
840 if (split_aux->case_count == 0
841 || !equal_splits (c, &split_aux->prev_case))
843 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
844 split_aux->end_func (split_aux->func_aux);
847 case_destroy (&split_aux->prev_case);
848 case_clone (&split_aux->prev_case, c);
850 if (split_aux->begin_func != NULL)
851 split_aux->begin_func (split_aux->func_aux);
854 split_aux->case_count++;
855 if (split_aux->proc_func != NULL)
856 return split_aux->proc_func (c, split_aux->func_aux);
861 /* Compares the SPLIT FILE variables in cases A and B and returns
862 nonzero only if they differ. */
864 equal_splits (const struct ccase *a, const struct ccase *b)
866 return case_compare (a, b,
867 dict_get_split_vars (default_dict),
868 dict_get_split_cnt (default_dict)) == 0;
871 /* Dumps out the values of all the split variables for the case C. */
873 dump_splits (struct ccase *c)
875 struct variable *const *split;
880 split_cnt = dict_get_split_cnt (default_dict);
884 t = tab_create (3, split_cnt + 1, 0);
885 tab_dim (t, tab_natural_dimensions);
886 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
887 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
888 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
889 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
890 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
891 split = dict_get_split_vars (default_dict);
892 for (i = 0; i < split_cnt; i++)
894 struct variable *v = split[i];
898 assert (v->type == NUMERIC || v->type == ALPHA);
899 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
901 data_out (temp_buf, &v->print, case_data (c, v->fv));
903 temp_buf[v->print.w] = 0;
904 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
906 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
908 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
910 tab_flags (t, SOMF_NO_TITLE);
914 /* Represents auxiliary data for handling SPLIT FILE in a
915 multipass procedure. */
916 struct multipass_split_aux_data
918 struct ccase prev_case; /* Data in previous case. */
919 struct casefile *casefile; /* Accumulates data for a split. */
921 /* Function to call with the accumulated data. */
922 bool (*split_func) (const struct casefile *, void *);
923 void *func_aux; /* Auxiliary data. */
926 static bool multipass_split_callback (struct ccase *c, void *aux_);
927 static void multipass_split_output (struct multipass_split_aux_data *);
929 /* Returns true if successful, false if an I/O error occurred. */
931 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
935 struct multipass_split_aux_data aux;
938 assert (split_func != NULL);
942 case_nullify (&aux.prev_case);
944 aux.split_func = split_func;
945 aux.func_aux = func_aux;
947 ok = internal_procedure (multipass_split_callback, &aux);
948 if (aux.casefile != NULL)
949 multipass_split_output (&aux);
950 case_destroy (&aux.prev_case);
952 if (!close_active_file ())
958 /* procedure() callback used by multipass_procedure_with_splits(). */
960 multipass_split_callback (struct ccase *c, void *aux_)
962 struct multipass_split_aux_data *aux = aux_;
964 /* Start a new series if needed. */
965 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
967 /* Pass any cases to split_func. */
968 if (aux->casefile != NULL)
969 multipass_split_output (aux);
971 /* Start a new casefile. */
972 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
974 /* Record split values. */
976 case_destroy (&aux->prev_case);
977 case_clone (&aux->prev_case, c);
980 return casefile_append (aux->casefile, c);
984 multipass_split_output (struct multipass_split_aux_data *aux)
986 assert (aux->casefile != NULL);
987 aux->split_func (aux->casefile, aux->func_aux);
988 casefile_destroy (aux->casefile);
989 aux->casefile = NULL;
993 /* Discards all the current state in preparation for a data-input
994 command like DATA LIST or GET. */
996 discard_variables (void)
998 dict_clear (default_dict);
999 fh_set_default_handle (NULL);
1003 if (vfm_source != NULL)
1005 free_case_source (vfm_source);
1009 cancel_transformations ();
1013 expr_free (process_if_expr);
1014 process_if_expr = NULL;
1016 cancel_temporary ();
1018 pgm_state = STATE_INIT;