1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <procedure.h>
22 #include <libpspp/message.h>
27 #include <libpspp/alloc.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <language/command.h>
31 #include <data/dictionary.h>
32 #include <language/control/control-stack.h>
33 #include <libpspp/message.h>
34 #include "expressions/public.h"
35 #include <data/file-handle-def.h>
36 #include <libpspp/misc.h>
37 #include <data/settings.h>
38 #include <output/manager.h>
39 #include <output/table.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <data/value-labels.h>
45 #define _(msgid) gettext (msgid)
48 Virtual File Manager (vfm):
50 vfm is used to process data files. It uses the model that
51 data is read from one stream (the data source), processed,
52 then written to another (the data sink). The data source is
53 then deleted and the data sink becomes the data source for the
56 /* Procedure execution data. */
57 struct write_case_data
59 /* Function to call for each case. */
60 bool (*proc_func) (struct ccase *, void *); /* Function. */
61 void *aux; /* Auxiliary data. */
63 struct ccase trns_case; /* Case used for transformations. */
64 struct ccase sink_case; /* Case written to sink, if
65 compaction is necessary. */
66 size_t cases_written; /* Cases output so far. */
67 size_t cases_analyzed; /* Cases passed to procedure so far. */
70 /* The current active file, from which cases are read. */
71 struct case_source *vfm_source;
73 /* The replacement active file, to which cases are written. */
74 struct case_sink *vfm_sink;
76 /* The compactor used to compact a compact, if necessary;
77 otherwise a null pointer. */
78 static struct dict_compactor *compactor;
80 /* Time at which vfm was last invoked. */
81 static time_t last_vfm_invocation;
84 int n_lag; /* Number of cases to lag. */
85 static int lag_count; /* Number of cases in lag_queue so far. */
86 static int lag_head; /* Index where next case will be added. */
87 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
89 /* Active transformations. */
90 struct transformation *t_trns;
91 size_t n_trns, m_trns, f_trns;
93 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
95 static void update_last_vfm_invocation (void);
96 static void create_trns_case (struct ccase *, struct dictionary *);
97 static void open_active_file (void);
98 static bool write_case (struct write_case_data *wc_data);
99 static int execute_transformations (struct ccase *c,
100 struct transformation *trns,
101 int first_idx, int last_idx,
103 static int filter_case (const struct ccase *c, int case_num);
104 static void lag_case (const struct ccase *c);
105 static void clear_case (struct ccase *c);
106 static bool close_active_file (void);
108 /* Public functions. */
110 /* Returns the last time the data was read. */
112 vfm_last_invocation (void)
114 if (last_vfm_invocation == 0)
115 update_last_vfm_invocation ();
116 return last_vfm_invocation;
119 /* Reads the data from the input program and writes it to a new
120 active file. For each case we read from the input program, we
123 1. Execute permanent transformations. If these drop the case,
124 start the next case from step 1.
126 2. N OF CASES. If we have already written N cases, start the
127 next case from step 1.
129 3. Write case to replacement active file.
131 4. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 5. FILTER, PROCESS IF. If these drop the case, start the next
137 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
138 cases, start the next case from step 1.
140 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
142 Returns true if successful, false if an I/O error occurred. */
144 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
146 if (proc_func == NULL
147 && case_source_is_class (vfm_source, &storage_source_class)
153 update_last_vfm_invocation ();
161 ok = internal_procedure (proc_func, aux);
162 if (!close_active_file ())
169 /* Executes a procedure, as procedure(), except that the caller
170 is responsible for calling open_active_file() and
172 Returns true if successful, false if an I/O error occurred. */
174 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
176 static int recursive_call;
177 struct write_case_data wc_data;
180 assert (++recursive_call == 1);
182 wc_data.proc_func = proc_func;
184 create_trns_case (&wc_data.trns_case, default_dict);
185 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
186 wc_data.cases_written = 0;
188 update_last_vfm_invocation ();
190 ok = (vfm_source == NULL
191 || vfm_source->class->read (vfm_source,
193 write_case, &wc_data));
195 case_destroy (&wc_data.sink_case);
196 case_destroy (&wc_data.trns_case);
198 assert (--recursive_call == 0);
203 /* Updates last_vfm_invocation. */
205 update_last_vfm_invocation (void)
207 last_vfm_invocation = time (NULL);
210 /* Creates and returns a case, initializing it from the vectors
211 that say which `value's need to be initialized just once, and
212 which ones need to be re-initialized before every case. */
214 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
216 size_t var_cnt = dict_get_var_cnt (dict);
219 case_create (trns_case, dict_get_next_value_idx (dict));
220 for (i = 0; i < var_cnt; i++)
222 struct variable *v = dict_get_var (dict, i);
223 union value *value = case_data_rw (trns_case, v->fv);
225 if (v->type == NUMERIC)
226 value->f = v->reinit ? 0.0 : SYSMIS;
228 memset (value->s, ' ', v->width);
232 /* Makes all preparations for reading from the data source and writing
235 open_active_file (void)
237 /* Make temp_dict refer to the dictionary right before data
242 temp_dict = default_dict;
245 /* Figure out compaction. */
246 compactor = (dict_needs_compaction (temp_dict)
247 ? dict_make_compactor (temp_dict)
251 if (vfm_sink == NULL)
252 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
253 if (vfm_sink->class->open != NULL)
254 vfm_sink->class->open (vfm_sink);
256 /* Allocate memory for lag queue. */
263 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
264 for (i = 0; i < n_lag; i++)
265 case_nullify (&lag_queue[i]);
268 /* Close any unclosed DO IF or LOOP constructs. */
272 /* Transforms trns_case and writes it to the replacement active
273 file if advisable. Returns nonzero if more cases can be
274 accepted, zero otherwise. Do not call this function again
275 after it has returned zero once. */
277 write_case (struct write_case_data *wc_data)
281 /* Execute permanent transformations. */
282 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
283 temp_trns, wc_data->cases_written + 1);
288 if (dict_get_case_limit (default_dict)
289 && wc_data->cases_written >= dict_get_case_limit (default_dict))
291 wc_data->cases_written++;
293 /* Write case to LAG queue. */
295 lag_case (&wc_data->trns_case);
297 /* Write case to replacement active file. */
298 if (vfm_sink->class->write != NULL)
300 if (compactor != NULL)
302 dict_compactor_compact (compactor, &wc_data->sink_case,
303 &wc_data->trns_case);
304 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
307 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
310 /* Execute temporary transformations. */
311 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
312 n_trns, wc_data->cases_written);
316 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
317 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
318 || (dict_get_case_limit (temp_dict)
319 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
321 wc_data->cases_analyzed++;
323 /* Pass case to procedure. */
324 if (wc_data->proc_func != NULL)
325 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
329 clear_case (&wc_data->trns_case);
333 /* Transforms case C using the transformations in TRNS[] with
334 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
335 become case CASE_NUM (1-based) in the output file. Returns 1
336 if the case was successfully transformed, 0 if it was filtered
337 out by one of the transformations, or -1 if the procedure
338 should be abandoned due to a fatal error. */
340 execute_transformations (struct ccase *c,
341 struct transformation *trns,
342 int first_idx, int last_idx,
347 for (idx = first_idx; idx != last_idx; )
349 struct transformation *t = &trns[idx];
350 int retval = t->proc (t->private, c, case_num);
378 /* Returns nonzero if case C with case number CASE_NUM should be
379 exclude as specified on FILTER or PROCESS IF, otherwise
382 filter_case (const struct ccase *c, int case_idx)
385 struct variable *filter_var = dict_get_filter (default_dict);
386 if (filter_var != NULL)
388 double f = case_num (c, filter_var->fv);
389 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
394 if (process_if_expr != NULL
395 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
401 /* Add C to the lag queue. */
403 lag_case (const struct ccase *c)
405 if (lag_count < n_lag)
407 case_destroy (&lag_queue[lag_head]);
408 case_clone (&lag_queue[lag_head], c);
409 if (++lag_head >= n_lag)
413 /* Clears the variables in C that need to be cleared between
416 clear_case (struct ccase *c)
418 size_t var_cnt = dict_get_var_cnt (default_dict);
421 for (i = 0; i < var_cnt; i++)
423 struct variable *v = dict_get_var (default_dict, i);
426 if (v->type == NUMERIC)
427 case_data_rw (c, v->fv)->f = SYSMIS;
429 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
434 /* Closes the active file. */
436 close_active_file (void)
438 /* Free memory for lag queue, and turn off lagging. */
443 for (i = 0; i < n_lag; i++)
444 case_destroy (&lag_queue[i]);
449 /* Dictionary from before TEMPORARY becomes permanent.. */
452 dict_destroy (default_dict);
453 default_dict = temp_dict;
457 /* Finish compaction. */
458 if (compactor != NULL)
460 dict_compactor_destroy (compactor);
461 dict_compact_values (default_dict);
464 /* Free data source. */
465 free_case_source (vfm_source);
468 /* Old data sink becomes new data source. */
469 if (vfm_sink->class->make_source != NULL)
470 vfm_source = vfm_sink->class->make_source (vfm_sink);
471 free_case_sink (vfm_sink);
474 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
475 and get rid of all the transformations. */
477 expr_free (process_if_expr);
478 process_if_expr = NULL;
479 dict_set_case_limit (default_dict, 0);
480 dict_clear_vectors (default_dict);
481 return cancel_transformations ();
484 /* Storage case stream. */
486 /* Information about storage sink or source. */
487 struct storage_stream_info
489 struct casefile *casefile; /* Storage. */
492 /* Initializes a storage sink. */
494 storage_sink_open (struct case_sink *sink)
496 struct storage_stream_info *info;
498 sink->aux = info = xmalloc (sizeof *info);
499 info->casefile = casefile_create (sink->value_cnt);
502 /* Destroys storage stream represented by INFO. */
504 destroy_storage_stream_info (struct storage_stream_info *info)
508 casefile_destroy (info->casefile);
513 /* Writes case C to the storage sink SINK.
514 Returns true if successful, false if an I/O error occurred. */
516 storage_sink_write (struct case_sink *sink, const struct ccase *c)
518 struct storage_stream_info *info = sink->aux;
520 return casefile_append (info->casefile, c);
523 /* Destroys internal data in SINK. */
525 storage_sink_destroy (struct case_sink *sink)
527 destroy_storage_stream_info (sink->aux);
530 /* Closes the sink and returns a storage source to read back the
532 static struct case_source *
533 storage_sink_make_source (struct case_sink *sink)
535 struct case_source *source
536 = create_case_source (&storage_source_class, sink->aux);
542 const struct case_sink_class storage_sink_class =
547 storage_sink_destroy,
548 storage_sink_make_source,
551 /* Storage source. */
553 /* Returns the number of cases that will be read by
554 storage_source_read(). */
556 storage_source_count (const struct case_source *source)
558 struct storage_stream_info *info = source->aux;
560 return casefile_get_case_cnt (info->casefile);
563 /* Reads all cases from the storage source and passes them one by one to
566 storage_source_read (struct case_source *source,
567 struct ccase *output_case,
568 write_case_func *write_case, write_case_data wc_data)
570 struct storage_stream_info *info = source->aux;
571 struct ccase casefile_case;
572 struct casereader *reader;
575 for (reader = casefile_get_reader (info->casefile);
576 ok && casereader_read (reader, &casefile_case);
577 case_destroy (&casefile_case))
579 case_copy (output_case, 0,
581 casefile_get_value_cnt (info->casefile));
582 ok = write_case (wc_data);
584 casereader_destroy (reader);
589 /* Destroys the source's internal data. */
591 storage_source_destroy (struct case_source *source)
593 destroy_storage_stream_info (source->aux);
596 /* Storage source. */
597 const struct case_source_class storage_source_class =
600 storage_source_count,
602 storage_source_destroy,
606 storage_source_get_casefile (struct case_source *source)
608 struct storage_stream_info *info = source->aux;
610 assert (source->class == &storage_source_class);
611 return info->casefile;
615 storage_source_create (struct casefile *cf)
617 struct storage_stream_info *info;
619 info = xmalloc (sizeof *info);
622 return create_case_source (&storage_source_class, info);
625 /* Null sink. Used by a few procedures that keep track of output
626 themselves and would throw away anything that the sink
629 const struct case_sink_class null_sink_class =
638 /* Returns a pointer to the lagged case from N_BEFORE cases before the
639 current one, or NULL if there haven't been that many cases yet. */
641 lagged_case (int n_before)
643 assert (n_before >= 1 );
644 assert (n_before <= n_lag);
646 if (n_before <= lag_count)
648 int index = lag_head - n_before;
651 return &lag_queue[index];
657 /* Appends TRNS to t_trns[], the list of all transformations to be
658 performed on data as it is read from the active file. */
660 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
662 struct transformation *trns;
663 if (n_trns >= m_trns)
664 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
665 trns = &t_trns[n_trns++];
668 trns->private = private;
671 /* Returns the index number that the next transformation added by
672 add_transformation() will receive. A trns_proc_func that
673 returns this index causes control flow to jump to it. */
675 next_transformation (void)
680 /* Cancels all active transformations, including any transformations
681 created by the input program.
682 Returns true if successful, false if an I/O error occurred. */
684 cancel_transformations (void)
688 for (i = 0; i < n_trns; i++)
690 struct transformation *t = &t_trns[i];
693 if (!t->free (t->private))
704 /* Creates a case source with class CLASS and auxiliary data AUX
705 and based on dictionary DICT. */
707 create_case_source (const struct case_source_class *class,
710 struct case_source *source = xmalloc (sizeof *source);
711 source->class = class;
716 /* Destroys case source SOURCE. It is the caller's responsible to
717 call the source's destroy function, if any. */
719 free_case_source (struct case_source *source)
723 if (source->class->destroy != NULL)
724 source->class->destroy (source);
729 /* Returns nonzero if CLASS is the class of SOURCE. */
731 case_source_is_class (const struct case_source *source,
732 const struct case_source_class *class)
734 return source != NULL && source->class == class;
737 /* Creates a case sink to accept cases from the given DICT with
738 class CLASS and auxiliary data AUX. */
740 create_case_sink (const struct case_sink_class *class,
741 const struct dictionary *dict,
744 struct case_sink *sink = xmalloc (sizeof *sink);
746 sink->value_cnt = dict_get_compacted_value_cnt (dict);
751 /* Destroys case sink SINK. */
753 free_case_sink (struct case_sink *sink)
757 if (sink->class->destroy != NULL)
758 sink->class->destroy (sink);
763 /* Represents auxiliary data for handling SPLIT FILE. */
764 struct split_aux_data
766 size_t case_count; /* Number of cases so far. */
767 struct ccase prev_case; /* Data in previous case. */
769 /* Functions to call... */
770 void (*begin_func) (void *); /* ...before data. */
771 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
772 void (*end_func) (void *); /* ...after data. */
773 void *func_aux; /* Auxiliary data. */
776 static int equal_splits (const struct ccase *, const struct ccase *);
777 static bool procedure_with_splits_callback (struct ccase *, void *);
778 static void dump_splits (struct ccase *);
780 /* Like procedure(), but it automatically breaks the case stream
781 into SPLIT FILE break groups. Before each group of cases with
782 identical SPLIT FILE variable values, BEGIN_FUNC is called.
783 Then PROC_FUNC is called with each case in the group.
784 END_FUNC is called when the group is finished. FUNC_AUX is
785 passed to each of the functions as auxiliary data.
787 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
788 and END_FUNC will be called at all.
790 If SPLIT FILE is not in effect, then there is one break group
791 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
794 Returns true if successful, false if an I/O error occurred. */
796 procedure_with_splits (void (*begin_func) (void *aux),
797 bool (*proc_func) (struct ccase *, void *aux),
798 void (*end_func) (void *aux),
801 struct split_aux_data split_aux;
804 split_aux.case_count = 0;
805 case_nullify (&split_aux.prev_case);
806 split_aux.begin_func = begin_func;
807 split_aux.proc_func = proc_func;
808 split_aux.end_func = end_func;
809 split_aux.func_aux = func_aux;
812 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
813 if (split_aux.case_count > 0 && end_func != NULL)
815 if (!close_active_file ())
818 case_destroy (&split_aux.prev_case);
823 /* procedure() callback used by procedure_with_splits(). */
825 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
827 struct split_aux_data *split_aux = split_aux_;
829 /* Start a new series if needed. */
830 if (split_aux->case_count == 0
831 || !equal_splits (c, &split_aux->prev_case))
833 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
834 split_aux->end_func (split_aux->func_aux);
837 case_destroy (&split_aux->prev_case);
838 case_clone (&split_aux->prev_case, c);
840 if (split_aux->begin_func != NULL)
841 split_aux->begin_func (split_aux->func_aux);
844 split_aux->case_count++;
845 if (split_aux->proc_func != NULL)
846 return split_aux->proc_func (c, split_aux->func_aux);
851 /* Compares the SPLIT FILE variables in cases A and B and returns
852 nonzero only if they differ. */
854 equal_splits (const struct ccase *a, const struct ccase *b)
856 return case_compare (a, b,
857 dict_get_split_vars (default_dict),
858 dict_get_split_cnt (default_dict)) == 0;
861 /* Dumps out the values of all the split variables for the case C. */
863 dump_splits (struct ccase *c)
865 struct variable *const *split;
870 split_cnt = dict_get_split_cnt (default_dict);
874 t = tab_create (3, split_cnt + 1, 0);
875 tab_dim (t, tab_natural_dimensions);
876 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
877 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
878 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
879 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
880 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
881 split = dict_get_split_vars (default_dict);
882 for (i = 0; i < split_cnt; i++)
884 struct variable *v = split[i];
888 assert (v->type == NUMERIC || v->type == ALPHA);
889 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
891 data_out (temp_buf, &v->print, case_data (c, v->fv));
893 temp_buf[v->print.w] = 0;
894 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
896 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
898 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
900 tab_flags (t, SOMF_NO_TITLE);
904 /* Represents auxiliary data for handling SPLIT FILE in a
905 multipass procedure. */
906 struct multipass_split_aux_data
908 struct ccase prev_case; /* Data in previous case. */
909 struct casefile *casefile; /* Accumulates data for a split. */
911 /* Function to call with the accumulated data. */
912 bool (*split_func) (const struct casefile *, void *);
913 void *func_aux; /* Auxiliary data. */
916 static bool multipass_split_callback (struct ccase *c, void *aux_);
917 static void multipass_split_output (struct multipass_split_aux_data *);
919 /* Returns true if successful, false if an I/O error occurred. */
921 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
925 struct multipass_split_aux_data aux;
928 assert (split_func != NULL);
932 case_nullify (&aux.prev_case);
934 aux.split_func = split_func;
935 aux.func_aux = func_aux;
937 ok = internal_procedure (multipass_split_callback, &aux);
938 if (aux.casefile != NULL)
939 multipass_split_output (&aux);
940 case_destroy (&aux.prev_case);
942 if (!close_active_file ())
948 /* procedure() callback used by multipass_procedure_with_splits(). */
950 multipass_split_callback (struct ccase *c, void *aux_)
952 struct multipass_split_aux_data *aux = aux_;
954 /* Start a new series if needed. */
955 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
957 /* Pass any cases to split_func. */
958 if (aux->casefile != NULL)
959 multipass_split_output (aux);
961 /* Start a new casefile. */
962 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
964 /* Record split values. */
966 case_destroy (&aux->prev_case);
967 case_clone (&aux->prev_case, c);
970 return casefile_append (aux->casefile, c);
974 multipass_split_output (struct multipass_split_aux_data *aux)
976 assert (aux->casefile != NULL);
977 aux->split_func (aux->casefile, aux->func_aux);
978 casefile_destroy (aux->casefile);
979 aux->casefile = NULL;
983 /* Discards all the current state in preparation for a data-input
984 command like DATA LIST or GET. */
986 discard_variables (void)
988 dict_clear (default_dict);
989 fh_set_default_handle (NULL);
993 if (vfm_source != NULL)
995 free_case_source (vfm_source);
999 cancel_transformations ();
1003 expr_free (process_if_expr);
1004 process_if_expr = NULL;
1006 cancel_temporary ();