1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <procedure.h>
29 #include "expressions/public.h"
30 #include <data/case-source.h>
31 #include <data/case-sink.h>
32 #include <data/case.h>
33 #include <data/casefile.h>
34 #include <data/dictionary.h>
35 #include <data/file-handle-def.h>
36 #include <data/settings.h>
37 #include <data/storage-stream.h>
38 #include <data/value-labels.h>
39 #include <data/variable.h>
40 #include <language/control/control-stack.h>
41 #include <libpspp/alloc.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
46 #include <output/manager.h>
47 #include <output/table.h>
50 #define _(msgid) gettext (msgid)
53 Virtual File Manager (vfm):
55 vfm is used to process data files. It uses the model that
56 data is read from one stream (the data source), processed,
57 then written to another (the data sink). The data source is
58 then deleted and the data sink becomes the data source for the
61 /* Procedure execution data. */
62 struct write_case_data
64 /* Function to call for each case. */
65 bool (*proc_func) (struct ccase *, void *); /* Function. */
66 void *aux; /* Auxiliary data. */
68 struct ccase trns_case; /* Case used for transformations. */
69 struct ccase sink_case; /* Case written to sink, if
70 compaction is necessary. */
71 size_t cases_written; /* Cases output so far. */
72 size_t cases_analyzed; /* Cases passed to procedure so far. */
75 /* The current active file, from which cases are read. */
76 struct case_source *vfm_source;
78 /* The replacement active file, to which cases are written. */
79 struct case_sink *vfm_sink;
81 /* The compactor used to compact a compact, if necessary;
82 otherwise a null pointer. */
83 static struct dict_compactor *compactor;
85 /* Time at which vfm was last invoked. */
86 static time_t last_vfm_invocation;
88 /* Whether we're inside a procedure.
89 For debugging purposes only. */
90 static bool in_procedure;
93 int n_lag; /* Number of cases to lag. */
94 static int lag_count; /* Number of cases in lag_queue so far. */
95 static int lag_head; /* Index where next case will be added. */
96 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
98 /* Active transformations. */
99 struct transformation *t_trns;
100 size_t n_trns, m_trns, f_trns;
102 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
104 static void update_last_vfm_invocation (void);
105 static void create_trns_case (struct ccase *, struct dictionary *);
106 static void open_active_file (void);
107 static bool write_case (struct write_case_data *wc_data);
108 static int execute_transformations (struct ccase *c,
109 struct transformation *trns,
110 int first_idx, int last_idx,
112 static int filter_case (const struct ccase *c, int case_num);
113 static void lag_case (const struct ccase *c);
114 static void clear_case (struct ccase *c);
115 static bool close_active_file (void);
117 /* Public functions. */
119 /* Returns the last time the data was read. */
121 time_of_last_procedure (void)
123 if (last_vfm_invocation == 0)
124 update_last_vfm_invocation ();
125 return last_vfm_invocation;
128 /* Reads the data from the input program and writes it to a new
129 active file. For each case we read from the input program, we
132 1. Execute permanent transformations. If these drop the case,
133 start the next case from step 1.
135 2. N OF CASES. If we have already written N cases, start the
136 next case from step 1.
138 3. Write case to replacement active file.
140 4. Execute temporary transformations. If these drop the case,
141 start the next case from step 1.
143 5. FILTER, PROCESS IF. If these drop the case, start the next
146 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
147 cases, start the next case from step 1.
149 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
151 Returns true if successful, false if an I/O error occurred. */
153 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
155 if (proc_func == NULL
156 && case_source_is_class (vfm_source, &storage_source_class)
162 update_last_vfm_invocation ();
170 ok = internal_procedure (proc_func, aux);
171 if (!close_active_file ())
178 /* Executes a procedure, as procedure(), except that the caller
179 is responsible for calling open_active_file() and
181 Returns true if successful, false if an I/O error occurred. */
183 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
185 struct write_case_data wc_data;
188 wc_data.proc_func = proc_func;
190 create_trns_case (&wc_data.trns_case, default_dict);
191 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
192 wc_data.cases_written = 0;
194 update_last_vfm_invocation ();
196 ok = (vfm_source == NULL
197 || vfm_source->class->read (vfm_source,
199 write_case, &wc_data));
201 case_destroy (&wc_data.sink_case);
202 case_destroy (&wc_data.trns_case);
207 /* Updates last_vfm_invocation. */
209 update_last_vfm_invocation (void)
211 last_vfm_invocation = time (NULL);
214 /* Creates and returns a case, initializing it from the vectors
215 that say which `value's need to be initialized just once, and
216 which ones need to be re-initialized before every case. */
218 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
220 size_t var_cnt = dict_get_var_cnt (dict);
223 case_create (trns_case, dict_get_next_value_idx (dict));
224 for (i = 0; i < var_cnt; i++)
226 struct variable *v = dict_get_var (dict, i);
227 union value *value = case_data_rw (trns_case, v->fv);
229 if (v->type == NUMERIC)
230 value->f = v->leave ? 0.0 : SYSMIS;
232 memset (value->s, ' ', v->width);
236 /* Makes all preparations for reading from the data source and writing
239 open_active_file (void)
241 assert (!in_procedure);
244 /* Make temp_dict refer to the dictionary right before data
249 temp_dict = default_dict;
252 /* Figure out compaction. */
253 compactor = (dict_needs_compaction (temp_dict)
254 ? dict_make_compactor (temp_dict)
258 if (vfm_sink == NULL)
259 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
260 if (vfm_sink->class->open != NULL)
261 vfm_sink->class->open (vfm_sink);
263 /* Allocate memory for lag queue. */
270 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
271 for (i = 0; i < n_lag; i++)
272 case_nullify (&lag_queue[i]);
275 /* Close any unclosed DO IF or LOOP constructs. */
279 /* Transforms trns_case and writes it to the replacement active
280 file if advisable. Returns true if more cases can be
281 accepted, false otherwise. Do not call this function again
282 after it has returned false once. */
284 write_case (struct write_case_data *wc_data)
288 /* Execute permanent transformations. */
289 retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
290 temp_trns, wc_data->cases_written + 1);
295 if (dict_get_case_limit (default_dict)
296 && wc_data->cases_written >= dict_get_case_limit (default_dict))
298 wc_data->cases_written++;
300 /* Write case to LAG queue. */
302 lag_case (&wc_data->trns_case);
304 /* Write case to replacement active file. */
305 if (vfm_sink->class->write != NULL)
307 if (compactor != NULL)
309 dict_compactor_compact (compactor, &wc_data->sink_case,
310 &wc_data->trns_case);
311 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
314 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
317 /* Execute temporary transformations. */
318 retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
319 n_trns, wc_data->cases_written);
323 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
324 if (filter_case (&wc_data->trns_case, wc_data->cases_written)
325 || (dict_get_case_limit (temp_dict)
326 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
328 wc_data->cases_analyzed++;
330 /* Pass case to procedure. */
331 if (wc_data->proc_func != NULL)
332 if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
336 clear_case (&wc_data->trns_case);
340 /* Transforms case C using the transformations in TRNS[] with
341 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
342 become case CASE_NUM (1-based) in the output file. Returns 1
343 if the case was successfully transformed, 0 if it was filtered
344 out by one of the transformations, or -1 if the procedure
345 should be abandoned due to a fatal error. */
347 execute_transformations (struct ccase *c,
348 struct transformation *trns,
349 int first_idx, int last_idx,
354 for (idx = first_idx; idx != last_idx; )
356 struct transformation *t = &trns[idx];
357 int retval = t->proc (t->private, c, case_num);
385 /* Returns nonzero if case C with case number CASE_NUM should be
386 excluded as specified on FILTER or PROCESS IF, otherwise
389 filter_case (const struct ccase *c, int case_idx)
392 struct variable *filter_var = dict_get_filter (default_dict);
393 if (filter_var != NULL)
395 double f = case_num (c, filter_var->fv);
396 if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
401 if (process_if_expr != NULL
402 && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
408 /* Add C to the lag queue. */
410 lag_case (const struct ccase *c)
412 if (lag_count < n_lag)
414 case_destroy (&lag_queue[lag_head]);
415 case_clone (&lag_queue[lag_head], c);
416 if (++lag_head >= n_lag)
420 /* Clears the variables in C that need to be cleared between
423 clear_case (struct ccase *c)
425 size_t var_cnt = dict_get_var_cnt (default_dict);
428 for (i = 0; i < var_cnt; i++)
430 struct variable *v = dict_get_var (default_dict, i);
433 if (v->type == NUMERIC)
434 case_data_rw (c, v->fv)->f = SYSMIS;
436 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
441 /* Closes the active file. */
443 close_active_file (void)
445 /* Free memory for lag queue, and turn off lagging. */
450 for (i = 0; i < n_lag; i++)
451 case_destroy (&lag_queue[i]);
456 /* Dictionary from before TEMPORARY becomes permanent.. */
459 dict_destroy (default_dict);
460 default_dict = temp_dict;
464 /* Finish compaction. */
465 if (compactor != NULL)
467 dict_compactor_destroy (compactor);
468 dict_compact_values (default_dict);
471 /* Free data source. */
472 free_case_source (vfm_source);
475 /* Old data sink becomes new data source. */
476 if (vfm_sink->class->make_source != NULL)
477 vfm_source = vfm_sink->class->make_source (vfm_sink);
478 free_case_sink (vfm_sink);
481 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
482 and get rid of all the transformations. */
484 expr_free (process_if_expr);
485 process_if_expr = NULL;
486 dict_set_case_limit (default_dict, 0);
487 dict_clear_vectors (default_dict);
489 assert (in_procedure);
490 in_procedure = false;
492 return cancel_transformations ();
495 /* Returns a pointer to the lagged case from N_BEFORE cases before the
496 current one, or NULL if there haven't been that many cases yet. */
498 lagged_case (int n_before)
500 assert (n_before >= 1 );
501 assert (n_before <= n_lag);
503 if (n_before <= lag_count)
505 int index = lag_head - n_before;
508 return &lag_queue[index];
514 /* Appends TRNS to t_trns[], the list of all transformations to be
515 performed on data as it is read from the active file. */
517 add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
519 struct transformation *trns;
521 assert (!in_procedure);
523 if (n_trns >= m_trns)
524 t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
525 trns = &t_trns[n_trns++];
528 trns->private = private;
531 /* Returns the index number that the next transformation added by
532 add_transformation() will receive. A trns_proc_func that
533 returns this index causes control flow to jump to it. */
535 next_transformation (void)
540 /* Cancels all active transformations, including any transformations
541 created by the input program.
542 Returns true if successful, false if an I/O error occurred. */
544 cancel_transformations (void)
548 for (i = 0; i < n_trns; i++)
550 struct transformation *t = &t_trns[i];
553 if (!t->free (t->private))
564 /* Represents auxiliary data for handling SPLIT FILE. */
565 struct split_aux_data
567 size_t case_count; /* Number of cases so far. */
568 struct ccase prev_case; /* Data in previous case. */
570 /* Functions to call... */
571 void (*begin_func) (void *); /* ...before data. */
572 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
573 void (*end_func) (void *); /* ...after data. */
574 void *func_aux; /* Auxiliary data. */
577 static int equal_splits (const struct ccase *, const struct ccase *);
578 static bool procedure_with_splits_callback (struct ccase *, void *);
579 static void dump_splits (struct ccase *);
581 /* Like procedure(), but it automatically breaks the case stream
582 into SPLIT FILE break groups. Before each group of cases with
583 identical SPLIT FILE variable values, BEGIN_FUNC is called.
584 Then PROC_FUNC is called with each case in the group.
585 END_FUNC is called when the group is finished. FUNC_AUX is
586 passed to each of the functions as auxiliary data.
588 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
589 and END_FUNC will be called at all.
591 If SPLIT FILE is not in effect, then there is one break group
592 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
595 Returns true if successful, false if an I/O error occurred. */
597 procedure_with_splits (void (*begin_func) (void *aux),
598 bool (*proc_func) (struct ccase *, void *aux),
599 void (*end_func) (void *aux),
602 struct split_aux_data split_aux;
605 split_aux.case_count = 0;
606 case_nullify (&split_aux.prev_case);
607 split_aux.begin_func = begin_func;
608 split_aux.proc_func = proc_func;
609 split_aux.end_func = end_func;
610 split_aux.func_aux = func_aux;
613 ok = internal_procedure (procedure_with_splits_callback, &split_aux);
614 if (split_aux.case_count > 0 && end_func != NULL)
616 if (!close_active_file ())
619 case_destroy (&split_aux.prev_case);
624 /* procedure() callback used by procedure_with_splits(). */
626 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
628 struct split_aux_data *split_aux = split_aux_;
630 /* Start a new series if needed. */
631 if (split_aux->case_count == 0
632 || !equal_splits (c, &split_aux->prev_case))
634 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
635 split_aux->end_func (split_aux->func_aux);
638 case_destroy (&split_aux->prev_case);
639 case_clone (&split_aux->prev_case, c);
641 if (split_aux->begin_func != NULL)
642 split_aux->begin_func (split_aux->func_aux);
645 split_aux->case_count++;
646 if (split_aux->proc_func != NULL)
647 return split_aux->proc_func (c, split_aux->func_aux);
652 /* Compares the SPLIT FILE variables in cases A and B and returns
653 nonzero only if they differ. */
655 equal_splits (const struct ccase *a, const struct ccase *b)
657 return case_compare (a, b,
658 dict_get_split_vars (default_dict),
659 dict_get_split_cnt (default_dict)) == 0;
662 /* Dumps out the values of all the split variables for the case C. */
664 dump_splits (struct ccase *c)
666 struct variable *const *split;
671 split_cnt = dict_get_split_cnt (default_dict);
675 t = tab_create (3, split_cnt + 1, 0);
676 tab_dim (t, tab_natural_dimensions);
677 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
678 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
679 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
680 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
681 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
682 split = dict_get_split_vars (default_dict);
683 for (i = 0; i < split_cnt; i++)
685 struct variable *v = split[i];
689 assert (v->type == NUMERIC || v->type == ALPHA);
690 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
692 data_out (temp_buf, &v->print, case_data (c, v->fv));
694 temp_buf[v->print.w] = 0;
695 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
697 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
699 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
701 tab_flags (t, SOMF_NO_TITLE);
705 /* Represents auxiliary data for handling SPLIT FILE in a
706 multipass procedure. */
707 struct multipass_split_aux_data
709 struct ccase prev_case; /* Data in previous case. */
710 struct casefile *casefile; /* Accumulates data for a split. */
712 /* Function to call with the accumulated data. */
713 bool (*split_func) (const struct casefile *, void *);
714 void *func_aux; /* Auxiliary data. */
717 static bool multipass_split_callback (struct ccase *c, void *aux_);
718 static void multipass_split_output (struct multipass_split_aux_data *);
720 /* Returns true if successful, false if an I/O error occurred. */
722 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
726 struct multipass_split_aux_data aux;
729 assert (split_func != NULL);
733 case_nullify (&aux.prev_case);
735 aux.split_func = split_func;
736 aux.func_aux = func_aux;
738 ok = internal_procedure (multipass_split_callback, &aux);
739 if (aux.casefile != NULL)
740 multipass_split_output (&aux);
741 case_destroy (&aux.prev_case);
743 if (!close_active_file ())
749 /* procedure() callback used by multipass_procedure_with_splits(). */
751 multipass_split_callback (struct ccase *c, void *aux_)
753 struct multipass_split_aux_data *aux = aux_;
755 /* Start a new series if needed. */
756 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
758 /* Pass any cases to split_func. */
759 if (aux->casefile != NULL)
760 multipass_split_output (aux);
762 /* Start a new casefile. */
763 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
765 /* Record split values. */
767 case_destroy (&aux->prev_case);
768 case_clone (&aux->prev_case, c);
771 return casefile_append (aux->casefile, c);
775 multipass_split_output (struct multipass_split_aux_data *aux)
777 assert (aux->casefile != NULL);
778 aux->split_func (aux->casefile, aux->func_aux);
779 casefile_destroy (aux->casefile);
780 aux->casefile = NULL;
784 /* Discards all the current state in preparation for a data-input
785 command like DATA LIST or GET. */
787 discard_variables (void)
789 dict_clear (default_dict);
790 fh_set_default_handle (NULL);
794 if (vfm_source != NULL)
796 free_case_source (vfm_source);
800 cancel_transformations ();
804 expr_free (process_if_expr);
805 process_if_expr = NULL;