1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <procedure.h>
22 #include <libpspp/message.h>
27 #include <libpspp/alloc.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <language/command.h>
31 #include <data/dictionary.h>
32 #include <language/control/control-stack.h>
33 #include <libpspp/message.h>
34 #include "expressions/public.h"
35 #include <data/file-handle-def.h>
36 #include <libpspp/misc.h>
37 #include <data/settings.h>
38 #include <output/manager.h>
39 #include <output/table.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <data/value-labels.h>
45 #define _(msgid) gettext (msgid)
48 Virtual File Manager (vfm):
50 vfm is used to process data files. It uses the model that
51 data is read from one stream (the data source), processed,
52 then written to another (the data sink). The data source is
53 then deleted and the data sink becomes the data source for the
56 /* Procedure execution data. */
57 struct write_case_data
59 /* Function to call for each case. */
60 bool (*proc_func) (struct ccase *, void *); /* Function. */
61 void *aux; /* Auxiliary data. */
63 struct ccase trns_case; /* Case used for transformations. */
64 struct ccase sink_case; /* Case written to sink, if
65 compaction is necessary. */
66 size_t cases_written; /* Cases output so far. */
67 size_t cases_analyzed; /* Cases passed to procedure so far. */
70 /* The current active file, from which cases are read. */
71 struct case_source *vfm_source;
73 /* The replacement active file, to which cases are written. */
74 struct case_sink *vfm_sink;
76 /* The compactor used to compact a compact, if necessary;
77 otherwise a null pointer. */
78 static struct dict_compactor *compactor;
80 /* Time at which vfm was last invoked. */
81 static time_t last_vfm_invocation;
84 int n_lag; /* Number of cases to lag. */
85 static int lag_count; /* Number of cases in lag_queue so far. */
86 static int lag_head; /* Index where next case will be added. */
87 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
89 /* Active transformations. */
90 struct transformation *t_trns;
91 size_t n_trns, m_trns, f_trns;
93 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
95 static void update_last_vfm_invocation (void);
96 static void create_trns_case (struct ccase *, struct dictionary *);
97 static void open_active_file (void);
98 static bool write_case (struct write_case_data *wc_data);
99 static int execute_transformations (struct ccase *c,
100 struct transformation *trns,
101 int first_idx, int last_idx,
103 static int filter_case (const struct ccase *c, int case_num);
104 static void lag_case (const struct ccase *c);
105 static void clear_case (struct ccase *c);
106 static bool close_active_file (void);
108 /* Public functions. */
110 /* Returns the last time the data was read. */
112 vfm_last_invocation (void)
114 if (last_vfm_invocation == 0)
115 update_last_vfm_invocation ();
116 return last_vfm_invocation;
119 /* Reads the data from the input program and writes it to a new
120 active file. For each case we read from the input program, we
123 1. Execute permanent transformations. If these drop the case,
124 start the next case from step 1.
126 2. N OF CASES. If we have already written N cases, start the
127 next case from step 1.
129 3. Write case to replacement active file.
131 4. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 5. FILTER, PROCESS IF. If these drop the case, start the next
137 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
138 cases, start the next case from step 1.
140 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
142 Returns true if successful, false if an I/O error occurred. */
144 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
146 if (proc_func == NULL
147 && case_source_is_class (vfm_source, &storage_source_class)
153 update_last_vfm_invocation ();
161 ok = internal_procedure (proc_func, aux);
162 if (!close_active_file ())
169 /* Executes a procedure, as procedure(), except that the caller
170 is responsible for calling open_active_file() and
172 Returns true if successful, false if an I/O error occurred. */
174 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
176 static int recursive_call;
177 struct write_case_data wc_data;
180 assert (++recursive_call == 1);
182 wc_data.proc_func = proc_func;
184 create_trns_case (&wc_data.trns_case, default_dict);
185 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
186 wc_data.cases_written = 0;
188 update_last_vfm_invocation ();
190 ok = (vfm_source == NULL
191 || vfm_source->class->read (vfm_source,
193 write_case, &wc_data));
195 case_destroy (&wc_data.sink_case);
196 case_destroy (&wc_data.trns_case);
198 assert (--recursive_call == 0);
203 /* Updates last_vfm_invocation. */
205 update_last_vfm_invocation (void)
207 last_vfm_invocation = time (NULL);
210 /* Creates and returns a case, initializing it from the vectors
211 that say which `value's need to be initialized just once, and
212 which ones need to be re-initialized before every case. */
214 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
216 size_t var_cnt = dict_get_var_cnt (dict);
219 case_create (trns_case, dict_get_next_value_idx (dict));
220 for (i = 0; i < var_cnt; i++)
222 struct variable *v = dict_get_var (dict, i);
223 union value *value = case_data_rw (trns_case, v->fv);
225 if (v->type == NUMERIC)
226 value->f = v->reinit ? 0.0 : SYSMIS;
228 memset (value->s, ' ', v->width);
232 /* Makes all preparations for reading from the data source and writing
235 open_active_file (void)
237 /* Make temp_dict refer to the dictionary right before data
242 temp_dict = default_dict;
245 /* Figure out compaction. */
246 compactor = (dict_needs_compaction (temp_dict)
247 ? dict_make_compactor (temp_dict)
251 if (vfm_sink == NULL)
252 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
253 if (vfm_sink->class->open != NULL)
254 vfm_sink->class->open (vfm_sink);
256 /* Allocate memory for lag queue. */
263 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
264 for (i = 0; i < n_lag; i++)
265 case_nullify (&lag_queue[i]);
268 /* Close any unclosed DO IF or LOOP constructs. */
272 /* Transforms trns_case and writes it to the replacement active
273 file if advisable. Returns nonzero if more cases can be
274 accepted, zero otherwise. Do not call this function again
275 after it has returned zero once. */
277 write_case (struct write_case_data *wc_data)
281 /* Execute permanent transformations. */
282 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
283 temp_trns, wc_data->cases_written + 1);
288 if (dict_get_case_limit (default_dict)
289 && wc_data->cases_written >= dict_get_case_limit (default_dict))
291 wc_data->cases_written++;
293 /* Write case to LAG queue. */
295 lag_case (&wc_data->trns_case);
297 /* Write case to replacement active file. */
298 if (vfm_sink->class->write != NULL)
300 if (compactor != NULL)
302 dict_compactor_compact (compactor, &wc_data->sink_case,
303 &wc_data->trns_case);
304 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
307 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
310 /* Execute temporary transformations. */
311 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
312 n_trns, wc_data->cases_written);
316 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
317 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
318 || (dict_get_case_limit (temp_dict)
319 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
321 wc_data->cases_analyzed++;
323 /* Pass case to procedure. */
324 if (wc_data->proc_func != NULL)
325 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
329 clear_case (&wc_data->trns_case);
333 /* Transforms case C using the transformations in TRNS[] with
334 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
335 become case CASE_NUM (1-based) in the output file. Returns 1
336 if the case was successfully transformed, 0 if it was filtered
337 out by one of the transformations, or -1 if the procedure
338 should be abandoned due to a fatal error. */
340 execute_transformations (struct ccase *c,
341 struct transformation *trns,
342 int first_idx, int last_idx,
347 for (idx = first_idx; idx != last_idx; )
349 struct transformation *t = &trns[idx];
350 int retval = t->proc (t->private, c, case_num);
378 /* Returns nonzero if case C with case number CASE_NUM should be
379 exclude as specified on FILTER or PROCESS IF, otherwise
382 filter_case (const struct ccase *c, int case_idx)
385 struct variable *filter_var = dict_get_filter (default_dict);
386 if (filter_var != NULL)
388 double f = case_num (c, filter_var->fv);
389 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
394 if (process_if_expr != NULL
395 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
401 /* Add C to the lag queue. */
403 lag_case (const struct ccase *c)
405 if (lag_count < n_lag)
407 case_destroy (&lag_queue[lag_head]);
408 case_clone (&lag_queue[lag_head], c);
409 if (++lag_head >= n_lag)
413 /* Clears the variables in C that need to be cleared between
416 clear_case (struct ccase *c)
418 size_t var_cnt = dict_get_var_cnt (default_dict);
421 for (i = 0; i < var_cnt; i++)
423 struct variable *v = dict_get_var (default_dict, i);
424 if (v->init && v->reinit)
426 if (v->type == NUMERIC)
427 case_data_rw (c, v->fv)->f = SYSMIS;
429 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
434 /* Closes the active file. */
436 close_active_file (void)
438 /* Free memory for lag queue, and turn off lagging. */
443 for (i = 0; i < n_lag; i++)
444 case_destroy (&lag_queue[i]);
449 /* Dictionary from before TEMPORARY becomes permanent.. */
452 dict_destroy (default_dict);
453 default_dict = temp_dict;
457 /* Finish compaction. */
458 if (compactor != NULL)
460 dict_compactor_destroy (compactor);
461 dict_compact_values (default_dict);
464 /* Free data source. */
465 free_case_source (vfm_source);
468 /* Old data sink becomes new data source. */
469 if (vfm_sink->class->make_source != NULL)
470 vfm_source = vfm_sink->class->make_source (vfm_sink);
471 free_case_sink (vfm_sink);
474 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
475 and get rid of all the transformations. */
477 expr_free (process_if_expr);
478 process_if_expr = NULL;
479 dict_set_case_limit (default_dict, 0);
480 dict_clear_vectors (default_dict);
481 return cancel_transformations ();
484 /* Storage case stream. */
486 /* Information about storage sink or source. */
487 struct storage_stream_info
489 struct casefile *casefile; /* Storage. */
492 /* Initializes a storage sink. */
494 storage_sink_open (struct case_sink *sink)
496 struct storage_stream_info *info;
498 sink->aux = info = xmalloc (sizeof *info);
499 info->casefile = casefile_create (sink->value_cnt);
502 /* Destroys storage stream represented by INFO. */
504 destroy_storage_stream_info (struct storage_stream_info *info)
508 casefile_destroy (info->casefile);
513 /* Writes case C to the storage sink SINK.
514 Returns true if successful, false if an I/O error occurred. */
516 storage_sink_write (struct case_sink *sink, const struct ccase *c)
518 struct storage_stream_info *info = sink->aux;
520 return casefile_append (info->casefile, c);
523 /* Destroys internal data in SINK. */
525 storage_sink_destroy (struct case_sink *sink)
527 destroy_storage_stream_info (sink->aux);
530 /* Closes the sink and returns a storage source to read back the
532 static struct case_source *
533 storage_sink_make_source (struct case_sink *sink)
535 struct case_source *source
536 = create_case_source (&storage_source_class, sink->aux);
542 const struct case_sink_class storage_sink_class =
547 storage_sink_destroy,
548 storage_sink_make_source,
551 /* Storage source. */
553 /* Returns the number of cases that will be read by
554 storage_source_read(). */
556 storage_source_count (const struct case_source *source)
558 struct storage_stream_info *info = source->aux;
560 return casefile_get_case_cnt (info->casefile);
563 /* Reads all cases from the storage source and passes them one by one to
566 storage_source_read (struct case_source *source,
567 struct ccase *output_case,
568 write_case_func *write_case, write_case_data wc_data)
570 struct storage_stream_info *info = source->aux;
571 struct ccase casefile_case;
572 struct casereader *reader;
575 for (reader = casefile_get_reader (info->casefile);
576 ok && casereader_read (reader, &casefile_case);
577 case_destroy (&casefile_case))
579 case_copy (output_case, 0,
581 casefile_get_value_cnt (info->casefile));
582 ok = write_case (wc_data);
584 casereader_destroy (reader);
589 /* Destroys the source's internal data. */
591 storage_source_destroy (struct case_source *source)
593 destroy_storage_stream_info (source->aux);
596 /* Storage source. */
597 const struct case_source_class storage_source_class =
600 storage_source_count,
602 storage_source_destroy,
606 storage_source_get_casefile (struct case_source *source)
608 struct storage_stream_info *info = source->aux;
610 assert (source->class == &storage_source_class);
611 return info->casefile;
615 storage_source_create (struct casefile *cf)
617 struct storage_stream_info *info;
619 info = xmalloc (sizeof *info);
622 return create_case_source (&storage_source_class, info);
625 /* Null sink. Used by a few procedures that keep track of output
626 themselves and would throw away anything that the sink
629 const struct case_sink_class null_sink_class =
638 /* Returns a pointer to the lagged case from N_BEFORE cases before the
639 current one, or NULL if there haven't been that many cases yet. */
641 lagged_case (int n_before)
643 assert (n_before >= 1 );
644 assert (n_before <= n_lag);
646 if (n_before <= lag_count)
648 int index = lag_head - n_before;
651 return &lag_queue[index];
657 /* Appends TRNS to t_trns[], the list of all transformations to be
658 performed on data as it is read from the active file. */
660 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
662 struct transformation *trns;
663 if (n_trns >= m_trns)
664 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
665 trns = &t_trns[n_trns++];
668 trns->private = private;
671 /* Returns the index number that the next transformation added by
672 add_transformation() will receive. A trns_proc_func that
673 returns this index causes control flow to jump to it. */
675 next_transformation (void)
680 /* Cancels all active transformations, including any transformations
681 created by the input program.
682 Returns true if successful, false if an I/O error occurred. */
684 cancel_transformations (void)
688 for (i = 0; i < n_trns; i++)
690 struct transformation *t = &t_trns[i];
693 if (!t->free (t->private))
704 /* Creates a case source with class CLASS and auxiliary data AUX
705 and based on dictionary DICT. */
707 create_case_source (const struct case_source_class *class,
710 struct case_source *source = xmalloc (sizeof *source);
711 source->class = class;
716 /* Destroys case source SOURCE. It is the caller's responsible to
717 call the source's destroy function, if any. */
719 free_case_source (struct case_source *source)
723 if (source->class->destroy != NULL)
724 source->class->destroy (source);
729 /* Returns nonzero if a case source is "complex". */
731 case_source_is_complex (const struct case_source *source)
733 return source != NULL && (source->class == &input_program_source_class
734 || source->class == &file_type_source_class);
737 /* Returns nonzero if CLASS is the class of SOURCE. */
739 case_source_is_class (const struct case_source *source,
740 const struct case_source_class *class)
742 return source != NULL && source->class == class;
745 /* Creates a case sink to accept cases from the given DICT with
746 class CLASS and auxiliary data AUX. */
748 create_case_sink (const struct case_sink_class *class,
749 const struct dictionary *dict,
752 struct case_sink *sink = xmalloc (sizeof *sink);
754 sink->value_cnt = dict_get_compacted_value_cnt (dict);
759 /* Destroys case sink SINK. */
761 free_case_sink (struct case_sink *sink)
765 if (sink->class->destroy != NULL)
766 sink->class->destroy (sink);
771 /* Represents auxiliary data for handling SPLIT FILE. */
772 struct split_aux_data
774 size_t case_count; /* Number of cases so far. */
775 struct ccase prev_case; /* Data in previous case. */
777 /* Functions to call... */
778 void (*begin_func) (void *); /* ...before data. */
779 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
780 void (*end_func) (void *); /* ...after data. */
781 void *func_aux; /* Auxiliary data. */
784 static int equal_splits (const struct ccase *, const struct ccase *);
785 static bool procedure_with_splits_callback (struct ccase *, void *);
786 static void dump_splits (struct ccase *);
788 /* Like procedure(), but it automatically breaks the case stream
789 into SPLIT FILE break groups. Before each group of cases with
790 identical SPLIT FILE variable values, BEGIN_FUNC is called.
791 Then PROC_FUNC is called with each case in the group.
792 END_FUNC is called when the group is finished. FUNC_AUX is
793 passed to each of the functions as auxiliary data.
795 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
796 and END_FUNC will be called at all.
798 If SPLIT FILE is not in effect, then there is one break group
799 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
802 Returns true if successful, false if an I/O error occurred. */
804 procedure_with_splits (void (*begin_func) (void *aux),
805 bool (*proc_func) (struct ccase *, void *aux),
806 void (*end_func) (void *aux),
809 struct split_aux_data split_aux;
812 split_aux.case_count = 0;
813 case_nullify (&split_aux.prev_case);
814 split_aux.begin_func = begin_func;
815 split_aux.proc_func = proc_func;
816 split_aux.end_func = end_func;
817 split_aux.func_aux = func_aux;
820 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
821 if (split_aux.case_count > 0 && end_func != NULL)
823 if (!close_active_file ())
826 case_destroy (&split_aux.prev_case);
831 /* procedure() callback used by procedure_with_splits(). */
833 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
835 struct split_aux_data *split_aux = split_aux_;
837 /* Start a new series if needed. */
838 if (split_aux->case_count == 0
839 || !equal_splits (c, &split_aux->prev_case))
841 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
842 split_aux->end_func (split_aux->func_aux);
845 case_destroy (&split_aux->prev_case);
846 case_clone (&split_aux->prev_case, c);
848 if (split_aux->begin_func != NULL)
849 split_aux->begin_func (split_aux->func_aux);
852 split_aux->case_count++;
853 if (split_aux->proc_func != NULL)
854 return split_aux->proc_func (c, split_aux->func_aux);
859 /* Compares the SPLIT FILE variables in cases A and B and returns
860 nonzero only if they differ. */
862 equal_splits (const struct ccase *a, const struct ccase *b)
864 return case_compare (a, b,
865 dict_get_split_vars (default_dict),
866 dict_get_split_cnt (default_dict)) == 0;
869 /* Dumps out the values of all the split variables for the case C. */
871 dump_splits (struct ccase *c)
873 struct variable *const *split;
878 split_cnt = dict_get_split_cnt (default_dict);
882 t = tab_create (3, split_cnt + 1, 0);
883 tab_dim (t, tab_natural_dimensions);
884 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
885 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
886 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
887 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
888 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
889 split = dict_get_split_vars (default_dict);
890 for (i = 0; i < split_cnt; i++)
892 struct variable *v = split[i];
896 assert (v->type == NUMERIC || v->type == ALPHA);
897 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
899 data_out (temp_buf, &v->print, case_data (c, v->fv));
901 temp_buf[v->print.w] = 0;
902 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
904 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
906 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
908 tab_flags (t, SOMF_NO_TITLE);
912 /* Represents auxiliary data for handling SPLIT FILE in a
913 multipass procedure. */
914 struct multipass_split_aux_data
916 struct ccase prev_case; /* Data in previous case. */
917 struct casefile *casefile; /* Accumulates data for a split. */
919 /* Function to call with the accumulated data. */
920 bool (*split_func) (const struct casefile *, void *);
921 void *func_aux; /* Auxiliary data. */
924 static bool multipass_split_callback (struct ccase *c, void *aux_);
925 static void multipass_split_output (struct multipass_split_aux_data *);
927 /* Returns true if successful, false if an I/O error occurred. */
929 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
933 struct multipass_split_aux_data aux;
936 assert (split_func != NULL);
940 case_nullify (&aux.prev_case);
942 aux.split_func = split_func;
943 aux.func_aux = func_aux;
945 ok = internal_procedure (multipass_split_callback, &aux);
946 if (aux.casefile != NULL)
947 multipass_split_output (&aux);
948 case_destroy (&aux.prev_case);
950 if (!close_active_file ())
956 /* procedure() callback used by multipass_procedure_with_splits(). */
958 multipass_split_callback (struct ccase *c, void *aux_)
960 struct multipass_split_aux_data *aux = aux_;
962 /* Start a new series if needed. */
963 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
965 /* Pass any cases to split_func. */
966 if (aux->casefile != NULL)
967 multipass_split_output (aux);
969 /* Start a new casefile. */
970 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
972 /* Record split values. */
974 case_destroy (&aux->prev_case);
975 case_clone (&aux->prev_case, c);
978 return casefile_append (aux->casefile, c);
982 multipass_split_output (struct multipass_split_aux_data *aux)
984 assert (aux->casefile != NULL);
985 aux->split_func (aux->casefile, aux->func_aux);
986 casefile_destroy (aux->casefile);
987 aux->casefile = NULL;
991 /* Discards all the current state in preparation for a data-input
992 command like DATA LIST or GET. */
994 discard_variables (void)
996 dict_clear (default_dict);
997 fh_set_default_handle (NULL);
1001 if (vfm_source != NULL)
1003 free_case_source (vfm_source);
1007 cancel_transformations ();
1011 expr_free (process_if_expr);
1012 process_if_expr = NULL;
1014 cancel_temporary ();
1016 pgm_state = STATE_INIT;