1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <procedure.h>
29 #include "expressions/public.h"
30 #include <data/case-source.h>
31 #include <data/case-sink.h>
32 #include <data/case.h>
33 #include <data/casefile.h>
34 #include <data/dictionary.h>
35 #include <data/file-handle-def.h>
36 #include <data/settings.h>
37 #include <data/storage-stream.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/control/control-stack.h>
41 #include <libpspp/alloc.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
46 #include <output/manager.h>
47 #include <output/table.h>
50 #define _(msgid) gettext (msgid)
53 Virtual File Manager (vfm):
55 vfm is used to process data files. It uses the model that
56 data is read from one stream (the data source), processed,
57 then written to another (the data sink). The data source is
58 then deleted and the data sink becomes the data source for the
61 /* Procedure execution data. */
62 struct write_case_data
64 /* Function to call for each case. */
65 bool (*proc_func) (struct ccase *, void *); /* Function. */
66 void *aux; /* Auxiliary data. */
68 struct ccase trns_case; /* Case used for transformations. */
69 struct ccase sink_case; /* Case written to sink, if
70 compaction is necessary. */
71 size_t cases_written; /* Cases output so far. */
72 size_t cases_analyzed; /* Cases passed to procedure so far. */
75 /* The current active file, from which cases are read. */
76 struct case_source *vfm_source;
78 /* The replacement active file, to which cases are written. */
79 struct case_sink *vfm_sink;
81 /* The compactor used to compact a compact, if necessary;
82 otherwise a null pointer. */
83 static struct dict_compactor *compactor;
85 /* Time at which vfm was last invoked. */
86 static time_t last_vfm_invocation;
89 int n_lag; /* Number of cases to lag. */
90 static int lag_count; /* Number of cases in lag_queue so far. */
91 static int lag_head; /* Index where next case will be added. */
92 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
94 /* Active transformations. */
95 struct transformation *t_trns;
96 size_t n_trns, m_trns, f_trns;
98 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
100 static void update_last_vfm_invocation (void);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (void);
103 static bool write_case (struct write_case_data *wc_data);
104 static int execute_transformations (struct ccase *c,
105 struct transformation *trns,
106 int first_idx, int last_idx,
108 static int filter_case (const struct ccase *c, int case_num);
109 static void lag_case (const struct ccase *c);
110 static void clear_case (struct ccase *c);
111 static bool close_active_file (void);
113 /* Public functions. */
115 /* Returns the last time the data was read. */
117 time_of_last_procedure (void)
119 if (last_vfm_invocation == 0)
120 update_last_vfm_invocation ();
121 return last_vfm_invocation;
124 /* Reads the data from the input program and writes it to a new
125 active file. For each case we read from the input program, we
128 1. Execute permanent transformations. If these drop the case,
129 start the next case from step 1.
131 2. N OF CASES. If we have already written N cases, start the
132 next case from step 1.
134 3. Write case to replacement active file.
136 4. Execute temporary transformations. If these drop the case,
137 start the next case from step 1.
139 5. FILTER, PROCESS IF. If these drop the case, start the next
142 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
143 cases, start the next case from step 1.
145 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
147 Returns true if successful, false if an I/O error occurred. */
149 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
151 if (proc_func == NULL
152 && case_source_is_class (vfm_source, &storage_source_class)
158 update_last_vfm_invocation ();
166 ok = internal_procedure (proc_func, aux);
167 if (!close_active_file ())
174 /* Executes a procedure, as procedure(), except that the caller
175 is responsible for calling open_active_file() and
177 Returns true if successful, false if an I/O error occurred. */
179 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
181 static int recursive_call;
182 struct write_case_data wc_data;
185 assert (++recursive_call == 1);
187 wc_data.proc_func = proc_func;
189 create_trns_case (&wc_data.trns_case, default_dict);
190 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
191 wc_data.cases_written = 0;
193 update_last_vfm_invocation ();
195 ok = (vfm_source == NULL
196 || vfm_source->class->read (vfm_source,
198 write_case, &wc_data));
200 case_destroy (&wc_data.sink_case);
201 case_destroy (&wc_data.trns_case);
203 assert (--recursive_call == 0);
208 /* Updates last_vfm_invocation. */
210 update_last_vfm_invocation (void)
212 last_vfm_invocation = time (NULL);
215 /* Creates and returns a case, initializing it from the vectors
216 that say which `value's need to be initialized just once, and
217 which ones need to be re-initialized before every case. */
219 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
221 size_t var_cnt = dict_get_var_cnt (dict);
224 case_create (trns_case, dict_get_next_value_idx (dict));
225 for (i = 0; i < var_cnt; i++)
227 struct variable *v = dict_get_var (dict, i);
228 union value *value = case_data_rw (trns_case, v->fv);
230 if (v->type == NUMERIC)
231 value->f = v->reinit ? 0.0 : SYSMIS;
233 memset (value->s, ' ', v->width);
237 /* Makes all preparations for reading from the data source and writing
240 open_active_file (void)
242 /* Make temp_dict refer to the dictionary right before data
247 temp_dict = default_dict;
250 /* Figure out compaction. */
251 compactor = (dict_needs_compaction (temp_dict)
252 ? dict_make_compactor (temp_dict)
256 if (vfm_sink == NULL)
257 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
258 if (vfm_sink->class->open != NULL)
259 vfm_sink->class->open (vfm_sink);
261 /* Allocate memory for lag queue. */
268 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
269 for (i = 0; i < n_lag; i++)
270 case_nullify (&lag_queue[i]);
273 /* Close any unclosed DO IF or LOOP constructs. */
277 /* Transforms trns_case and writes it to the replacement active
278 file if advisable. Returns true if more cases can be
279 accepted, false otherwise. Do not call this function again
280 after it has returned false once. */
282 write_case (struct write_case_data *wc_data)
286 /* Execute permanent transformations. */
287 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
288 temp_trns, wc_data->cases_written + 1);
293 if (dict_get_case_limit (default_dict)
294 && wc_data->cases_written >= dict_get_case_limit (default_dict))
296 wc_data->cases_written++;
298 /* Write case to LAG queue. */
300 lag_case (&wc_data->trns_case);
302 /* Write case to replacement active file. */
303 if (vfm_sink->class->write != NULL)
305 if (compactor != NULL)
307 dict_compactor_compact (compactor, &wc_data->sink_case,
308 &wc_data->trns_case);
309 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
312 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
315 /* Execute temporary transformations. */
316 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
317 n_trns, wc_data->cases_written);
321 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
322 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
323 || (dict_get_case_limit (temp_dict)
324 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
326 wc_data->cases_analyzed++;
328 /* Pass case to procedure. */
329 if (wc_data->proc_func != NULL)
330 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
334 clear_case (&wc_data->trns_case);
338 /* Transforms case C using the transformations in TRNS[] with
339 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
340 become case CASE_NUM (1-based) in the output file. Returns 1
341 if the case was successfully transformed, 0 if it was filtered
342 out by one of the transformations, or -1 if the procedure
343 should be abandoned due to a fatal error. */
345 execute_transformations (struct ccase *c,
346 struct transformation *trns,
347 int first_idx, int last_idx,
352 for (idx = first_idx; idx != last_idx; )
354 struct transformation *t = &trns[idx];
355 int retval = t->proc (t->private, c, case_num);
383 /* Returns nonzero if case C with case number CASE_NUM should be
384 excluded as specified on FILTER or PROCESS IF, otherwise
387 filter_case (const struct ccase *c, int case_idx)
390 struct variable *filter_var = dict_get_filter (default_dict);
391 if (filter_var != NULL)
393 double f = case_num (c, filter_var->fv);
394 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
399 if (process_if_expr != NULL
400 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
406 /* Add C to the lag queue. */
408 lag_case (const struct ccase *c)
410 if (lag_count < n_lag)
412 case_destroy (&lag_queue[lag_head]);
413 case_clone (&lag_queue[lag_head], c);
414 if (++lag_head >= n_lag)
418 /* Clears the variables in C that need to be cleared between
421 clear_case (struct ccase *c)
423 size_t var_cnt = dict_get_var_cnt (default_dict);
426 for (i = 0; i < var_cnt; i++)
428 struct variable *v = dict_get_var (default_dict, i);
431 if (v->type == NUMERIC)
432 case_data_rw (c, v->fv)->f = SYSMIS;
434 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
439 /* Closes the active file. */
441 close_active_file (void)
443 /* Free memory for lag queue, and turn off lagging. */
448 for (i = 0; i < n_lag; i++)
449 case_destroy (&lag_queue[i]);
454 /* Dictionary from before TEMPORARY becomes permanent.. */
457 dict_destroy (default_dict);
458 default_dict = temp_dict;
462 /* Finish compaction. */
463 if (compactor != NULL)
465 dict_compactor_destroy (compactor);
466 dict_compact_values (default_dict);
469 /* Free data source. */
470 free_case_source (vfm_source);
473 /* Old data sink becomes new data source. */
474 if (vfm_sink->class->make_source != NULL)
475 vfm_source = vfm_sink->class->make_source (vfm_sink);
476 free_case_sink (vfm_sink);
479 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
480 and get rid of all the transformations. */
482 expr_free (process_if_expr);
483 process_if_expr = NULL;
484 dict_set_case_limit (default_dict, 0);
485 dict_clear_vectors (default_dict);
486 return cancel_transformations ();
489 /* Returns a pointer to the lagged case from N_BEFORE cases before the
490 current one, or NULL if there haven't been that many cases yet. */
492 lagged_case (int n_before)
494 assert (n_before >= 1 );
495 assert (n_before <= n_lag);
497 if (n_before <= lag_count)
499 int index = lag_head - n_before;
502 return &lag_queue[index];
508 /* Appends TRNS to t_trns[], the list of all transformations to be
509 performed on data as it is read from the active file. */
511 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
513 struct transformation *trns;
514 if (n_trns >= m_trns)
515 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
516 trns = &t_trns[n_trns++];
519 trns->private = private;
522 /* Returns the index number that the next transformation added by
523 add_transformation() will receive. A trns_proc_func that
524 returns this index causes control flow to jump to it. */
526 next_transformation (void)
531 /* Cancels all active transformations, including any transformations
532 created by the input program.
533 Returns true if successful, false if an I/O error occurred. */
535 cancel_transformations (void)
539 for (i = 0; i < n_trns; i++)
541 struct transformation *t = &t_trns[i];
544 if (!t->free (t->private))
555 /* Represents auxiliary data for handling SPLIT FILE. */
556 struct split_aux_data
558 size_t case_count; /* Number of cases so far. */
559 struct ccase prev_case; /* Data in previous case. */
561 /* Functions to call... */
562 void (*begin_func) (void *); /* ...before data. */
563 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
564 void (*end_func) (void *); /* ...after data. */
565 void *func_aux; /* Auxiliary data. */
568 static int equal_splits (const struct ccase *, const struct ccase *);
569 static bool procedure_with_splits_callback (struct ccase *, void *);
570 static void dump_splits (struct ccase *);
572 /* Like procedure(), but it automatically breaks the case stream
573 into SPLIT FILE break groups. Before each group of cases with
574 identical SPLIT FILE variable values, BEGIN_FUNC is called.
575 Then PROC_FUNC is called with each case in the group.
576 END_FUNC is called when the group is finished. FUNC_AUX is
577 passed to each of the functions as auxiliary data.
579 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
580 and END_FUNC will be called at all.
582 If SPLIT FILE is not in effect, then there is one break group
583 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
586 Returns true if successful, false if an I/O error occurred. */
588 procedure_with_splits (void (*begin_func) (void *aux),
589 bool (*proc_func) (struct ccase *, void *aux),
590 void (*end_func) (void *aux),
593 struct split_aux_data split_aux;
596 split_aux.case_count = 0;
597 case_nullify (&split_aux.prev_case);
598 split_aux.begin_func = begin_func;
599 split_aux.proc_func = proc_func;
600 split_aux.end_func = end_func;
601 split_aux.func_aux = func_aux;
604 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
605 if (split_aux.case_count > 0 && end_func != NULL)
607 if (!close_active_file ())
610 case_destroy (&split_aux.prev_case);
615 /* procedure() callback used by procedure_with_splits(). */
617 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
619 struct split_aux_data *split_aux = split_aux_;
621 /* Start a new series if needed. */
622 if (split_aux->case_count == 0
623 || !equal_splits (c, &split_aux->prev_case))
625 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
626 split_aux->end_func (split_aux->func_aux);
629 case_destroy (&split_aux->prev_case);
630 case_clone (&split_aux->prev_case, c);
632 if (split_aux->begin_func != NULL)
633 split_aux->begin_func (split_aux->func_aux);
636 split_aux->case_count++;
637 if (split_aux->proc_func != NULL)
638 return split_aux->proc_func (c, split_aux->func_aux);
643 /* Compares the SPLIT FILE variables in cases A and B and returns
644 nonzero only if they differ. */
646 equal_splits (const struct ccase *a, const struct ccase *b)
648 return case_compare (a, b,
649 dict_get_split_vars (default_dict),
650 dict_get_split_cnt (default_dict)) == 0;
653 /* Dumps out the values of all the split variables for the case C. */
655 dump_splits (struct ccase *c)
657 struct variable *const *split;
662 split_cnt = dict_get_split_cnt (default_dict);
666 t = tab_create (3, split_cnt + 1, 0);
667 tab_dim (t, tab_natural_dimensions);
668 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
669 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
670 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
671 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
672 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
673 split = dict_get_split_vars (default_dict);
674 for (i = 0; i < split_cnt; i++)
676 struct variable *v = split[i];
680 assert (v->type == NUMERIC || v->type == ALPHA);
681 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
683 data_out (temp_buf, &v->print, case_data (c, v->fv));
685 temp_buf[v->print.w] = 0;
686 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
688 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
690 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
692 tab_flags (t, SOMF_NO_TITLE);
696 /* Represents auxiliary data for handling SPLIT FILE in a
697 multipass procedure. */
698 struct multipass_split_aux_data
700 struct ccase prev_case; /* Data in previous case. */
701 struct casefile *casefile; /* Accumulates data for a split. */
703 /* Function to call with the accumulated data. */
704 bool (*split_func) (const struct casefile *, void *);
705 void *func_aux; /* Auxiliary data. */
708 static bool multipass_split_callback (struct ccase *c, void *aux_);
709 static void multipass_split_output (struct multipass_split_aux_data *);
711 /* Returns true if successful, false if an I/O error occurred. */
713 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
717 struct multipass_split_aux_data aux;
720 assert (split_func != NULL);
724 case_nullify (&aux.prev_case);
726 aux.split_func = split_func;
727 aux.func_aux = func_aux;
729 ok = internal_procedure (multipass_split_callback, &aux);
730 if (aux.casefile != NULL)
731 multipass_split_output (&aux);
732 case_destroy (&aux.prev_case);
734 if (!close_active_file ())
740 /* procedure() callback used by multipass_procedure_with_splits(). */
742 multipass_split_callback (struct ccase *c, void *aux_)
744 struct multipass_split_aux_data *aux = aux_;
746 /* Start a new series if needed. */
747 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
749 /* Pass any cases to split_func. */
750 if (aux->casefile != NULL)
751 multipass_split_output (aux);
753 /* Start a new casefile. */
754 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
756 /* Record split values. */
758 case_destroy (&aux->prev_case);
759 case_clone (&aux->prev_case, c);
762 return casefile_append (aux->casefile, c);
766 multipass_split_output (struct multipass_split_aux_data *aux)
768 assert (aux->casefile != NULL);
769 aux->split_func (aux->casefile, aux->func_aux);
770 casefile_destroy (aux->casefile);
771 aux->casefile = NULL;
775 /* Discards all the current state in preparation for a data-input
776 command like DATA LIST or GET. */
778 discard_variables (void)
780 dict_clear (default_dict);
781 fh_set_default_handle (NULL);
785 if (vfm_source != NULL)
787 free_case_source (vfm_source);
791 cancel_transformations ();
795 expr_free (process_if_expr);
796 process_if_expr = NULL;