1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <procedure.h>
29 #include "expressions/public.h"
30 #include <data/case-source.h>
31 #include <data/case-sink.h>
32 #include <data/case.h>
33 #include <data/casefile.h>
34 #include <data/dictionary.h>
35 #include <data/file-handle-def.h>
36 #include <data/settings.h>
37 #include <data/storage-stream.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/control/control-stack.h>
41 #include <libpspp/alloc.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/str.h>
45 #include <output/manager.h>
46 #include <output/table.h>
49 #define _(msgid) gettext (msgid)
52 Virtual File Manager (vfm):
54 vfm is used to process data files. It uses the model that
55 data is read from one stream (the data source), processed,
56 then written to another (the data sink). The data source is
57 then deleted and the data sink becomes the data source for the
60 /* Procedure execution data. */
61 struct write_case_data
63 /* Function to call for each case. */
64 bool (*proc_func) (struct ccase *, void *); /* Function. */
65 void *aux; /* Auxiliary data. */
67 struct ccase trns_case; /* Case used for transformations. */
68 struct ccase sink_case; /* Case written to sink, if
69 compaction is necessary. */
70 size_t cases_written; /* Cases output so far. */
71 size_t cases_analyzed; /* Cases passed to procedure so far. */
74 /* The current active file, from which cases are read. */
75 struct case_source *vfm_source;
77 /* The replacement active file, to which cases are written. */
78 struct case_sink *vfm_sink;
80 /* The compactor used to compact a compact, if necessary;
81 otherwise a null pointer. */
82 static struct dict_compactor *compactor;
84 /* Time at which vfm was last invoked. */
85 static time_t last_vfm_invocation;
87 /* Whether we're inside a procedure.
88 For debugging purposes only. */
89 static bool in_procedure;
92 int n_lag; /* Number of cases to lag. */
93 static int lag_count; /* Number of cases in lag_queue so far. */
94 static int lag_head; /* Index where next case will be added. */
95 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
97 /* Active transformations. */
98 struct transformation *t_trns;
99 size_t n_trns, m_trns, f_trns;
101 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
103 static void update_last_vfm_invocation (void);
104 static void create_trns_case (struct ccase *, struct dictionary *);
105 static void open_active_file (void);
106 static bool write_case (struct write_case_data *wc_data);
107 static int execute_transformations (struct ccase *c,
108 struct transformation *trns,
109 int first_idx, int last_idx,
111 static int filter_case (const struct ccase *c, int case_num);
112 static void lag_case (const struct ccase *c);
113 static void clear_case (struct ccase *c);
114 static bool close_active_file (void);
116 /* Public functions. */
118 /* Returns the last time the data was read. */
120 time_of_last_procedure (void)
122 if (last_vfm_invocation == 0)
123 update_last_vfm_invocation ();
124 return last_vfm_invocation;
127 /* Reads the data from the input program and writes it to a new
128 active file. For each case we read from the input program, we
131 1. Execute permanent transformations. If these drop the case,
132 start the next case from step 1.
134 2. N OF CASES. If we have already written N cases, start the
135 next case from step 1.
137 3. Write case to replacement active file.
139 4. Execute temporary transformations. If these drop the case,
140 start the next case from step 1.
142 5. FILTER, PROCESS IF. If these drop the case, start the next
145 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
146 cases, start the next case from step 1.
148 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
150 Returns true if successful, false if an I/O error occurred. */
152 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
154 if (proc_func == NULL
155 && case_source_is_class (vfm_source, &storage_source_class)
161 update_last_vfm_invocation ();
169 ok = internal_procedure (proc_func, aux);
170 if (!close_active_file ())
177 /* Executes a procedure, as procedure(), except that the caller
178 is responsible for calling open_active_file() and
180 Returns true if successful, false if an I/O error occurred. */
182 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
184 struct write_case_data wc_data;
187 wc_data.proc_func = proc_func;
189 create_trns_case (&wc_data.trns_case, default_dict);
190 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
191 wc_data.cases_written = 0;
193 update_last_vfm_invocation ();
195 ok = (vfm_source == NULL
196 || vfm_source->class->read (vfm_source,
198 write_case, &wc_data));
200 case_destroy (&wc_data.sink_case);
201 case_destroy (&wc_data.trns_case);
206 /* Updates last_vfm_invocation. */
208 update_last_vfm_invocation (void)
210 last_vfm_invocation = time (NULL);
213 /* Creates and returns a case, initializing it from the vectors
214 that say which `value's need to be initialized just once, and
215 which ones need to be re-initialized before every case. */
217 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
219 size_t var_cnt = dict_get_var_cnt (dict);
222 case_create (trns_case, dict_get_next_value_idx (dict));
223 for (i = 0; i < var_cnt; i++)
225 struct variable *v = dict_get_var (dict, i);
226 union value *value = case_data_rw (trns_case, v->fv);
228 if (v->type == NUMERIC)
229 value->f = v->leave ? 0.0 : SYSMIS;
231 memset (value->s, ' ', v->width);
235 /* Makes all preparations for reading from the data source and writing
238 open_active_file (void)
240 assert (!in_procedure);
243 /* Make temp_dict refer to the dictionary right before data
248 temp_dict = default_dict;
251 /* Figure out compaction. */
252 compactor = (dict_needs_compaction (temp_dict)
253 ? dict_make_compactor (temp_dict)
257 if (vfm_sink == NULL)
258 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
259 if (vfm_sink->class->open != NULL)
260 vfm_sink->class->open (vfm_sink);
262 /* Allocate memory for lag queue. */
269 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
270 for (i = 0; i < n_lag; i++)
271 case_nullify (&lag_queue[i]);
274 /* Close any unclosed DO IF or LOOP constructs. */
278 /* Transforms trns_case and writes it to the replacement active
279 file if advisable. Returns true if more cases can be
280 accepted, false otherwise. Do not call this function again
281 after it has returned false once. */
283 write_case (struct write_case_data *wc_data)
287 /* Execute permanent transformations. */
288 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
289 temp_trns, wc_data->cases_written + 1);
294 if (dict_get_case_limit (default_dict)
295 && wc_data->cases_written >= dict_get_case_limit (default_dict))
297 wc_data->cases_written++;
299 /* Write case to LAG queue. */
301 lag_case (&wc_data->trns_case);
303 /* Write case to replacement active file. */
304 if (vfm_sink->class->write != NULL)
306 if (compactor != NULL)
308 dict_compactor_compact (compactor, &wc_data->sink_case,
309 &wc_data->trns_case);
310 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
313 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
316 /* Execute temporary transformations. */
317 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
318 n_trns, wc_data->cases_written);
322 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
323 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
324 || (dict_get_case_limit (temp_dict)
325 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
327 wc_data->cases_analyzed++;
329 /* Pass case to procedure. */
330 if (wc_data->proc_func != NULL)
331 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
335 clear_case (&wc_data->trns_case);
339 /* Transforms case C using the transformations in TRNS[] with
340 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
341 become case CASE_NUM (1-based) in the output file. Returns 1
342 if the case was successfully transformed, 0 if it was filtered
343 out by one of the transformations, or -1 if the procedure
344 should be abandoned due to a fatal error. */
346 execute_transformations (struct ccase *c,
347 struct transformation *trns,
348 int first_idx, int last_idx,
353 for (idx = first_idx; idx != last_idx; )
355 struct transformation *t = &trns[idx];
356 int retval = t->proc (t->private, c, case_num);
384 /* Returns nonzero if case C with case number CASE_NUM should be
385 excluded as specified on FILTER or PROCESS IF, otherwise
388 filter_case (const struct ccase *c, int case_idx)
391 struct variable *filter_var = dict_get_filter (default_dict);
392 if (filter_var != NULL)
394 double f = case_num (c, filter_var->fv);
395 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
400 if (process_if_expr != NULL
401 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
407 /* Add C to the lag queue. */
409 lag_case (const struct ccase *c)
411 if (lag_count < n_lag)
413 case_destroy (&lag_queue[lag_head]);
414 case_clone (&lag_queue[lag_head], c);
415 if (++lag_head >= n_lag)
419 /* Clears the variables in C that need to be cleared between
422 clear_case (struct ccase *c)
424 size_t var_cnt = dict_get_var_cnt (default_dict);
427 for (i = 0; i < var_cnt; i++)
429 struct variable *v = dict_get_var (default_dict, i);
432 if (v->type == NUMERIC)
433 case_data_rw (c, v->fv)->f = SYSMIS;
435 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
440 /* Closes the active file. */
442 close_active_file (void)
444 /* Free memory for lag queue, and turn off lagging. */
449 for (i = 0; i < n_lag; i++)
450 case_destroy (&lag_queue[i]);
455 /* Dictionary from before TEMPORARY becomes permanent.. */
458 dict_destroy (default_dict);
459 default_dict = temp_dict;
463 /* Finish compaction. */
464 if (compactor != NULL)
466 dict_compactor_destroy (compactor);
467 dict_compact_values (default_dict);
470 /* Free data source. */
471 free_case_source (vfm_source);
474 /* Old data sink becomes new data source. */
475 if (vfm_sink->class->make_source != NULL)
476 vfm_source = vfm_sink->class->make_source (vfm_sink);
477 free_case_sink (vfm_sink);
480 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
481 and get rid of all the transformations. */
483 expr_free (process_if_expr);
484 process_if_expr = NULL;
485 dict_set_case_limit (default_dict, 0);
486 dict_clear_vectors (default_dict);
488 assert (in_procedure);
489 in_procedure = false;
491 return cancel_transformations ();
494 /* Returns a pointer to the lagged case from N_BEFORE cases before the
495 current one, or NULL if there haven't been that many cases yet. */
497 lagged_case (int n_before)
499 assert (n_before >= 1 );
500 assert (n_before <= n_lag);
502 if (n_before <= lag_count)
504 int index = lag_head - n_before;
507 return &lag_queue[index];
513 /* Appends TRNS to t_trns[], the list of all transformations to be
514 performed on data as it is read from the active file. */
516 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
518 struct transformation *trns;
520 assert (!in_procedure);
522 if (n_trns >= m_trns)
523 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
524 trns = &t_trns[n_trns++];
527 trns->private = private;
530 /* Returns the index number that the next transformation added by
531 add_transformation() will receive. A trns_proc_func that
532 returns this index causes control flow to jump to it. */
534 next_transformation (void)
539 /* Cancels all active transformations, including any transformations
540 created by the input program.
541 Returns true if successful, false if an I/O error occurred. */
543 cancel_transformations (void)
547 for (i = 0; i < n_trns; i++)
549 struct transformation *t = &t_trns[i];
552 if (!t->free (t->private))
563 /* Represents auxiliary data for handling SPLIT FILE. */
564 struct split_aux_data
566 size_t case_count; /* Number of cases so far. */
567 struct ccase prev_case; /* Data in previous case. */
569 /* Functions to call... */
570 void (*begin_func) (void *); /* ...before data. */
571 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
572 void (*end_func) (void *); /* ...after data. */
573 void *func_aux; /* Auxiliary data. */
576 static int equal_splits (const struct ccase *, const struct ccase *);
577 static bool procedure_with_splits_callback (struct ccase *, void *);
578 static void dump_splits (struct ccase *);
580 /* Like procedure(), but it automatically breaks the case stream
581 into SPLIT FILE break groups. Before each group of cases with
582 identical SPLIT FILE variable values, BEGIN_FUNC is called.
583 Then PROC_FUNC is called with each case in the group.
584 END_FUNC is called when the group is finished. FUNC_AUX is
585 passed to each of the functions as auxiliary data.
587 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
588 and END_FUNC will be called at all.
590 If SPLIT FILE is not in effect, then there is one break group
591 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
594 Returns true if successful, false if an I/O error occurred. */
596 procedure_with_splits (void (*begin_func) (void *aux),
597 bool (*proc_func) (struct ccase *, void *aux),
598 void (*end_func) (void *aux),
601 struct split_aux_data split_aux;
604 split_aux.case_count = 0;
605 case_nullify (&split_aux.prev_case);
606 split_aux.begin_func = begin_func;
607 split_aux.proc_func = proc_func;
608 split_aux.end_func = end_func;
609 split_aux.func_aux = func_aux;
612 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
613 if (split_aux.case_count > 0 && end_func != NULL)
615 if (!close_active_file ())
618 case_destroy (&split_aux.prev_case);
623 /* procedure() callback used by procedure_with_splits(). */
625 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
627 struct split_aux_data *split_aux = split_aux_;
629 /* Start a new series if needed. */
630 if (split_aux->case_count == 0
631 || !equal_splits (c, &split_aux->prev_case))
633 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
634 split_aux->end_func (split_aux->func_aux);
637 case_destroy (&split_aux->prev_case);
638 case_clone (&split_aux->prev_case, c);
640 if (split_aux->begin_func != NULL)
641 split_aux->begin_func (split_aux->func_aux);
644 split_aux->case_count++;
645 if (split_aux->proc_func != NULL)
646 return split_aux->proc_func (c, split_aux->func_aux);
651 /* Compares the SPLIT FILE variables in cases A and B and returns
652 nonzero only if they differ. */
654 equal_splits (const struct ccase *a, const struct ccase *b)
656 return case_compare (a, b,
657 dict_get_split_vars (default_dict),
658 dict_get_split_cnt (default_dict)) == 0;
661 /* Dumps out the values of all the split variables for the case C. */
663 dump_splits (struct ccase *c)
665 struct variable *const *split;
670 split_cnt = dict_get_split_cnt (default_dict);
674 t = tab_create (3, split_cnt + 1, 0);
675 tab_dim (t, tab_natural_dimensions);
676 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
677 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
678 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
679 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
680 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
681 split = dict_get_split_vars (default_dict);
682 for (i = 0; i < split_cnt; i++)
684 struct variable *v = split[i];
688 assert (v->type == NUMERIC || v->type == ALPHA);
689 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
691 data_out (temp_buf, &v->print, case_data (c, v->fv));
693 temp_buf[v->print.w] = 0;
694 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
696 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
698 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
700 tab_flags (t, SOMF_NO_TITLE);
704 /* Represents auxiliary data for handling SPLIT FILE in a
705 multipass procedure. */
706 struct multipass_split_aux_data
708 struct ccase prev_case; /* Data in previous case. */
709 struct casefile *casefile; /* Accumulates data for a split. */
711 /* Function to call with the accumulated data. */
712 bool (*split_func) (const struct casefile *, void *);
713 void *func_aux; /* Auxiliary data. */
716 static bool multipass_split_callback (struct ccase *c, void *aux_);
717 static void multipass_split_output (struct multipass_split_aux_data *);
719 /* Returns true if successful, false if an I/O error occurred. */
721 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
725 struct multipass_split_aux_data aux;
728 assert (split_func != NULL);
732 case_nullify (&aux.prev_case);
734 aux.split_func = split_func;
735 aux.func_aux = func_aux;
737 ok = internal_procedure (multipass_split_callback, &aux);
738 if (aux.casefile != NULL)
739 multipass_split_output (&aux);
740 case_destroy (&aux.prev_case);
742 if (!close_active_file ())
748 /* procedure() callback used by multipass_procedure_with_splits(). */
750 multipass_split_callback (struct ccase *c, void *aux_)
752 struct multipass_split_aux_data *aux = aux_;
754 /* Start a new series if needed. */
755 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
757 /* Pass any cases to split_func. */
758 if (aux->casefile != NULL)
759 multipass_split_output (aux);
761 /* Start a new casefile. */
762 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
764 /* Record split values. */
766 case_destroy (&aux->prev_case);
767 case_clone (&aux->prev_case, c);
770 return casefile_append (aux->casefile, c);
774 multipass_split_output (struct multipass_split_aux_data *aux)
776 assert (aux->casefile != NULL);
777 aux->split_func (aux->casefile, aux->func_aux);
778 casefile_destroy (aux->casefile);
779 aux->casefile = NULL;
783 /* Discards all the current state in preparation for a data-input
784 command like DATA LIST or GET. */
786 discard_variables (void)
788 dict_clear (default_dict);
789 fh_set_default_handle (NULL);
793 if (vfm_source != NULL)
795 free_case_source (vfm_source);
799 cancel_transformations ();
803 expr_free (process_if_expr);
804 process_if_expr = NULL;