1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
41 /* Procedure execution data. */
42 struct write_case_data
44 /* Function to call for each case. */
45 bool (*case_func) (const struct ccase *, void *);
48 struct ccase trns_case; /* Case used for transformations. */
49 struct ccase sink_case; /* Case written to sink, if
50 compacting is necessary. */
51 size_t cases_written; /* Cases output so far. */
54 /* Cases are read from proc_source,
55 pass through permanent_trns_chain (which transforms them into
56 the format described by permanent_dict),
57 are written to proc_sink,
58 pass through temporary_trns_chain (which transforms them into
59 the format described by default_dict),
60 and are finally passed to the procedure. */
61 static struct case_source *proc_source;
62 static struct trns_chain *permanent_trns_chain;
63 static struct dictionary *permanent_dict;
64 static struct case_sink *proc_sink;
65 static struct trns_chain *temporary_trns_chain;
66 struct dictionary *default_dict;
68 /* The transformation chain that the next transformation will be
70 static struct trns_chain *cur_trns_chain;
72 /* The compactor used to compact a case, if necessary;
73 otherwise a null pointer. */
74 static struct dict_compactor *compactor;
76 /* Time at which proc was last invoked. */
77 static time_t last_proc_invocation;
80 int n_lag; /* Number of cases to lag. */
81 static int lag_count; /* Number of cases in lag_queue so far. */
82 static int lag_head; /* Index where next case will be added. */
83 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
85 static void add_case_limit_trns (void);
86 static void add_filter_trns (void);
88 static bool internal_procedure (bool (*case_func) (const struct ccase *,
90 bool (*end_func) (void *),
92 static void update_last_proc_invocation (void);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static bool write_case (struct write_case_data *wc_data);
96 static void lag_case (const struct ccase *c);
97 static void clear_case (struct ccase *c);
98 static bool close_active_file (void);
100 /* Public functions. */
102 /* Returns the last time the data was read. */
104 time_of_last_procedure (void)
106 if (last_proc_invocation == 0)
107 update_last_proc_invocation ();
108 return last_proc_invocation;
111 /* Regular procedure. */
113 /* Reads the data from the input program and writes it to a new
114 active file. For each case we read from the input program, we
117 1. Execute permanent transformations. If these drop the case,
118 start the next case from step 1.
120 2. Write case to replacement active file.
122 3. Execute temporary transformations. If these drop the case,
123 start the next case from step 1.
125 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
127 Returns true if successful, false if an I/O error occurred. */
129 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
131 return internal_procedure (proc_func, NULL, aux);
134 /* Multipass procedure. */
136 struct multipass_aux_data
138 struct casefile *casefile;
140 bool (*proc_func) (const struct casefile *, void *aux);
144 /* Case processing function for multipass_procedure(). */
146 multipass_case_func (const struct ccase *c, void *aux_data_)
148 struct multipass_aux_data *aux_data = aux_data_;
149 return casefile_append (aux_data->casefile, c);
152 /* End-of-file function for multipass_procedure(). */
154 multipass_end_func (void *aux_data_)
156 struct multipass_aux_data *aux_data = aux_data_;
157 return (aux_data->proc_func == NULL
158 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
161 /* Procedure that allows multiple passes over the input data.
162 The entire active file is passed to PROC_FUNC, with the given
163 AUX as auxiliary data, as a unit. */
165 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
168 struct multipass_aux_data aux_data;
171 aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
172 aux_data.proc_func = proc_func;
175 ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
176 ok = !casefile_error (aux_data.casefile) && ok;
178 casefile_destroy (aux_data.casefile);
183 /* Procedure implementation. */
185 /* Executes a procedure.
186 Passes each case to CASE_FUNC.
187 Calls END_FUNC after the last case.
188 Returns true if successful, false if an I/O error occurred (or
189 if CASE_FUNC or END_FUNC ever returned false). */
191 internal_procedure (bool (*case_func) (const struct ccase *, void *),
192 bool (*end_func) (void *),
195 struct write_case_data wc_data;
198 assert (proc_source != NULL);
200 update_last_proc_invocation ();
202 /* Optimize the trivial case where we're not going to do
203 anything with the data, by not reading the data at all. */
204 if (case_func == NULL && end_func == NULL
205 && case_source_is_class (proc_source, &storage_source_class)
207 && (temporary_trns_chain == NULL
208 || trns_chain_is_empty (temporary_trns_chain))
209 && trns_chain_is_empty (permanent_trns_chain))
212 dict_set_case_limit (default_dict, 0);
213 dict_clear_vectors (default_dict);
219 wc_data.case_func = case_func;
221 create_trns_case (&wc_data.trns_case, default_dict);
222 case_create (&wc_data.sink_case,
223 dict_get_compacted_value_cnt (default_dict));
224 wc_data.cases_written = 0;
226 ok = proc_source->class->read (proc_source,
228 write_case, &wc_data) && ok;
229 if (end_func != NULL)
230 ok = end_func (aux) && ok;
232 case_destroy (&wc_data.sink_case);
233 case_destroy (&wc_data.trns_case);
235 ok = close_active_file () && ok;
240 /* Updates last_proc_invocation. */
242 update_last_proc_invocation (void)
244 last_proc_invocation = time (NULL);
247 /* Creates and returns a case, initializing it from the vectors
248 that say which `value's need to be initialized just once, and
249 which ones need to be re-initialized before every case. */
251 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
253 size_t var_cnt = dict_get_var_cnt (dict);
256 case_create (trns_case, dict_get_next_value_idx (dict));
257 for (i = 0; i < var_cnt; i++)
259 struct variable *v = dict_get_var (dict, i);
260 union value *value = case_data_rw (trns_case, v->fv);
262 if (v->type == NUMERIC)
263 value->f = v->leave ? 0.0 : SYSMIS;
265 memset (value->s, ' ', v->width);
269 /* Makes all preparations for reading from the data source and writing
272 open_active_file (void)
274 add_case_limit_trns ();
277 /* Finalize transformations. */
278 trns_chain_finalize (cur_trns_chain);
280 /* Make permanent_dict refer to the dictionary right before
281 data reaches the sink. */
282 if (permanent_dict == NULL)
283 permanent_dict = default_dict;
285 /* Figure out whether to compact. */
286 compactor = (dict_compacting_would_shrink (permanent_dict)
287 ? dict_make_compactor (permanent_dict)
291 if (proc_sink == NULL)
292 proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
293 if (proc_sink->class->open != NULL)
294 proc_sink->class->open (proc_sink);
296 /* Allocate memory for lag queue. */
303 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
304 for (i = 0; i < n_lag; i++)
305 case_nullify (&lag_queue[i]);
309 /* Transforms trns_case and writes it to the replacement active
310 file if advisable. Returns true if more cases can be
311 accepted, false otherwise. Do not call this function again
312 after it has returned false once. */
314 write_case (struct write_case_data *wc_data)
316 enum trns_result retval;
319 /* Execute permanent transformations. */
320 case_nr = wc_data->cases_written + 1;
321 retval = trns_chain_execute (permanent_trns_chain,
322 &wc_data->trns_case, &case_nr);
323 if (retval != TRNS_CONTINUE)
326 /* Write case to LAG queue. */
328 lag_case (&wc_data->trns_case);
330 /* Write case to replacement active file. */
331 wc_data->cases_written++;
332 if (proc_sink->class->write != NULL)
334 if (compactor != NULL)
336 dict_compactor_compact (compactor, &wc_data->sink_case,
337 &wc_data->trns_case);
338 proc_sink->class->write (proc_sink, &wc_data->sink_case);
341 proc_sink->class->write (proc_sink, &wc_data->trns_case);
344 /* Execute temporary transformations. */
345 if (temporary_trns_chain != NULL)
347 retval = trns_chain_execute (temporary_trns_chain,
349 &wc_data->cases_written);
350 if (retval != TRNS_CONTINUE)
354 /* Pass case to procedure. */
355 if (wc_data->case_func != NULL)
356 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
360 clear_case (&wc_data->trns_case);
361 return retval != TRNS_ERROR;
364 /* Add C to the lag queue. */
366 lag_case (const struct ccase *c)
368 if (lag_count < n_lag)
370 case_destroy (&lag_queue[lag_head]);
371 case_clone (&lag_queue[lag_head], c);
372 if (++lag_head >= n_lag)
376 /* Clears the variables in C that need to be cleared between
379 clear_case (struct ccase *c)
381 size_t var_cnt = dict_get_var_cnt (default_dict);
384 for (i = 0; i < var_cnt; i++)
386 struct variable *v = dict_get_var (default_dict, i);
389 if (v->type == NUMERIC)
390 case_data_rw (c, v->fv)->f = SYSMIS;
392 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
397 /* Closes the active file. */
399 close_active_file (void)
401 /* Free memory for lag queue, and turn off lagging. */
406 for (i = 0; i < n_lag; i++)
407 case_destroy (&lag_queue[i]);
412 /* Dictionary from before TEMPORARY becomes permanent. */
413 proc_cancel_temporary_transformations ();
415 /* Finish compacting. */
416 if (compactor != NULL)
418 dict_compactor_destroy (compactor);
419 dict_compact_values (default_dict);
423 /* Free data source. */
424 free_case_source (proc_source);
427 /* Old data sink becomes new data source. */
428 if (proc_sink->class->make_source != NULL)
429 proc_source = proc_sink->class->make_source (proc_sink);
430 free_case_sink (proc_sink);
433 dict_clear_vectors (default_dict);
434 permanent_dict = NULL;
435 return proc_cancel_all_transformations ();
438 /* Returns a pointer to the lagged case from N_BEFORE cases before the
439 current one, or NULL if there haven't been that many cases yet. */
441 lagged_case (int n_before)
443 assert (n_before >= 1 );
444 assert (n_before <= n_lag);
446 if (n_before <= lag_count)
448 int index = lag_head - n_before;
451 return &lag_queue[index];
457 /* Procedure that separates the data into SPLIT FILE groups. */
459 /* Represents auxiliary data for handling SPLIT FILE. */
460 struct split_aux_data
462 struct ccase prev_case; /* Data in previous case. */
464 /* Callback functions. */
465 void (*begin_func) (const struct ccase *, void *);
466 bool (*proc_func) (const struct ccase *, void *);
467 void (*end_func) (void *);
471 static int equal_splits (const struct ccase *, const struct ccase *);
472 static bool split_procedure_case_func (const struct ccase *c, void *);
473 static bool split_procedure_end_func (void *);
475 /* Like procedure(), but it automatically breaks the case stream
476 into SPLIT FILE break groups. Before each group of cases with
477 identical SPLIT FILE variable values, BEGIN_FUNC is called
478 with the first case in the group.
479 Then PROC_FUNC is called for each case in the group (including
481 END_FUNC is called when the group is finished. FUNC_AUX is
482 passed to each of the functions as auxiliary data.
484 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
485 and END_FUNC will be called at all.
487 If SPLIT FILE is not in effect, then there is one break group
488 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
491 Returns true if successful, false if an I/O error occurred. */
493 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
494 bool (*proc_func) (const struct ccase *, void *aux),
495 void (*end_func) (void *aux),
498 struct split_aux_data split_aux;
501 case_nullify (&split_aux.prev_case);
502 split_aux.begin_func = begin_func;
503 split_aux.proc_func = proc_func;
504 split_aux.end_func = end_func;
505 split_aux.func_aux = func_aux;
507 ok = internal_procedure (split_procedure_case_func,
508 split_procedure_end_func, &split_aux);
510 case_destroy (&split_aux.prev_case);
515 /* Case callback used by procedure_with_splits(). */
517 split_procedure_case_func (const struct ccase *c, void *split_aux_)
519 struct split_aux_data *split_aux = split_aux_;
521 /* Start a new series if needed. */
522 if (case_is_null (&split_aux->prev_case)
523 || !equal_splits (c, &split_aux->prev_case))
525 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
526 split_aux->end_func (split_aux->func_aux);
528 case_destroy (&split_aux->prev_case);
529 case_clone (&split_aux->prev_case, c);
531 if (split_aux->begin_func != NULL)
532 split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
535 return (split_aux->proc_func == NULL
536 || split_aux->proc_func (c, split_aux->func_aux));
539 /* End-of-file callback used by procedure_with_splits(). */
541 split_procedure_end_func (void *split_aux_)
543 struct split_aux_data *split_aux = split_aux_;
545 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
546 split_aux->end_func (split_aux->func_aux);
550 /* Compares the SPLIT FILE variables in cases A and B and returns
551 nonzero only if they differ. */
553 equal_splits (const struct ccase *a, const struct ccase *b)
555 return case_compare (a, b,
556 dict_get_split_vars (default_dict),
557 dict_get_split_cnt (default_dict)) == 0;
560 /* Multipass procedure that separates the data into SPLIT FILE
563 /* Represents auxiliary data for handling SPLIT FILE in a
564 multipass procedure. */
565 struct multipass_split_aux_data
567 struct ccase prev_case; /* Data in previous case. */
568 struct casefile *casefile; /* Accumulates data for a split. */
570 /* Function to call with the accumulated data. */
571 bool (*split_func) (const struct ccase *first, const struct casefile *,
573 void *func_aux; /* Auxiliary data. */
576 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
577 static bool multipass_split_end_func (void *aux_);
578 static bool multipass_split_output (struct multipass_split_aux_data *);
580 /* Returns true if successful, false if an I/O error occurred. */
582 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
583 const struct casefile *,
587 struct multipass_split_aux_data aux;
590 case_nullify (&aux.prev_case);
592 aux.split_func = split_func;
593 aux.func_aux = func_aux;
595 ok = internal_procedure (multipass_split_case_func,
596 multipass_split_end_func, &aux);
597 case_destroy (&aux.prev_case);
602 /* Case callback used by multipass_procedure_with_splits(). */
604 multipass_split_case_func (const struct ccase *c, void *aux_)
606 struct multipass_split_aux_data *aux = aux_;
609 /* Start a new series if needed. */
610 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
612 /* Record split values. */
613 case_destroy (&aux->prev_case);
614 case_clone (&aux->prev_case, c);
616 /* Pass any cases to split_func. */
617 if (aux->casefile != NULL)
618 ok = multipass_split_output (aux);
620 /* Start a new casefile. */
621 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
624 return casefile_append (aux->casefile, c) && ok;
627 /* End-of-file callback used by multipass_procedure_with_splits(). */
629 multipass_split_end_func (void *aux_)
631 struct multipass_split_aux_data *aux = aux_;
632 return (aux->casefile == NULL || multipass_split_output (aux));
636 multipass_split_output (struct multipass_split_aux_data *aux)
640 assert (aux->casefile != NULL);
641 ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
642 casefile_destroy (aux->casefile);
643 aux->casefile = NULL;
648 /* Discards all the current state in preparation for a data-input
649 command like DATA LIST or GET. */
651 discard_variables (void)
653 dict_clear (default_dict);
654 fh_set_default_handle (NULL);
658 free_case_source (proc_source);
661 proc_cancel_all_transformations ();
664 /* Returns the current set of permanent transformations,
665 and clears the permanent transformations.
666 For use by INPUT PROGRAM. */
668 proc_capture_transformations (void)
670 struct trns_chain *chain;
672 assert (temporary_trns_chain == NULL);
673 chain = permanent_trns_chain;
674 cur_trns_chain = permanent_trns_chain = trns_chain_create ();
678 /* Adds a transformation that processes a case with PROC and
679 frees itself with FREE to the current set of transformations.
680 The functions are passed AUX as auxiliary data. */
682 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
684 trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
687 /* Adds a transformation that processes a case with PROC and
688 frees itself with FREE to the current set of transformations.
689 When parsing of the block of transformations is complete,
690 FINALIZE will be called.
691 The functions are passed AUX as auxiliary data. */
693 add_transformation_with_finalizer (trns_finalize_func *finalize,
694 trns_proc_func *proc,
695 trns_free_func *free, void *aux)
697 trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
700 /* Returns the index of the next transformation.
701 This value can be returned by a transformation procedure
702 function to indicate a "jump" to that transformation. */
704 next_transformation (void)
706 return trns_chain_next (cur_trns_chain);
709 /* Returns true if the next call to add_transformation() will add
710 a temporary transformation, false if it will add a permanent
713 proc_in_temporary_transformations (void)
715 return temporary_trns_chain != NULL;
718 /* Marks the start of temporary transformations.
719 Further calls to add_transformation() will add temporary
722 proc_start_temporary_transformations (void)
724 if (!proc_in_temporary_transformations ())
726 add_case_limit_trns ();
728 permanent_dict = dict_clone (default_dict);
729 trns_chain_finalize (permanent_trns_chain);
730 temporary_trns_chain = cur_trns_chain = trns_chain_create ();
734 /* Converts all the temporary transformations, if any, to
735 permanent transformations. Further transformations will be
737 Returns true if anything changed, false otherwise. */
739 proc_make_temporary_transformations_permanent (void)
741 if (proc_in_temporary_transformations ())
743 trns_chain_finalize (temporary_trns_chain);
744 trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
745 temporary_trns_chain = NULL;
747 dict_destroy (permanent_dict);
748 permanent_dict = NULL;
756 /* Cancels all temporary transformations, if any. Further
757 transformations will be permanent.
758 Returns true if anything changed, false otherwise. */
760 proc_cancel_temporary_transformations (void)
762 if (proc_in_temporary_transformations ())
764 dict_destroy (default_dict);
765 default_dict = permanent_dict;
766 permanent_dict = NULL;
768 trns_chain_destroy (temporary_trns_chain);
769 temporary_trns_chain = NULL;
777 /* Cancels all transformations, if any.
778 Returns true if successful, false on I/O error. */
780 proc_cancel_all_transformations (void)
783 ok = trns_chain_destroy (permanent_trns_chain);
784 ok = trns_chain_destroy (temporary_trns_chain) && ok;
785 permanent_trns_chain = cur_trns_chain = trns_chain_create ();
786 temporary_trns_chain = NULL;
790 /* Initializes procedure handling. */
794 default_dict = dict_create ();
795 proc_cancel_all_transformations ();
798 /* Finishes up procedure handling. */
802 discard_variables ();
803 dict_destroy (default_dict);
806 /* Sets SINK as the destination for procedure output from the
809 proc_set_sink (struct case_sink *sink)
811 assert (proc_sink == NULL);
815 /* Sets SOURCE as the source for procedure input for the next
818 proc_set_source (struct case_source *source)
820 assert (proc_source == NULL);
821 proc_source = source;
824 /* Returns true if a source for the next procedure has been
825 configured, false otherwise. */
827 proc_has_source (void)
829 return proc_source != NULL;
832 /* Returns the output from the previous procedure.
833 For use only immediately after executing a procedure.
834 The returned casefile is owned by the caller; it will not be
835 automatically used for the next procedure's input. */
837 proc_capture_output (void)
839 struct casefile *casefile;
841 /* Try to make sure that this function is called immediately
842 after procedure() or a similar function. */
843 assert (proc_source != NULL);
844 assert (case_source_is_class (proc_source, &storage_source_class));
845 assert (trns_chain_is_empty (permanent_trns_chain));
846 assert (!proc_in_temporary_transformations ());
848 casefile = storage_source_decapsulate (proc_source);
854 static trns_proc_func case_limit_trns_proc;
855 static trns_free_func case_limit_trns_free;
857 /* Adds a transformation that limits the number of cases that may
858 pass through, if default_dict has a case limit. */
860 add_case_limit_trns (void)
862 size_t case_limit = dict_get_case_limit (default_dict);
865 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
866 *cases_remaining = case_limit;
867 add_transformation (case_limit_trns_proc, case_limit_trns_free,
869 dict_set_case_limit (default_dict, 0);
873 /* Limits the maximum number of cases processed to
876 case_limit_trns_proc (void *cases_remaining_,
877 struct ccase *c UNUSED, int case_nr UNUSED)
879 size_t *cases_remaining = cases_remaining_;
880 if (*cases_remaining > 0)
883 return TRNS_CONTINUE;
886 return TRNS_DROP_CASE;
889 /* Frees the data associated with a case limit transformation. */
891 case_limit_trns_free (void *cases_remaining_)
893 size_t *cases_remaining = cases_remaining_;
894 free (cases_remaining);
898 static trns_proc_func filter_trns_proc;
900 /* Adds a temporary transformation to filter data according to
901 the variable specified on FILTER, if any. */
903 add_filter_trns (void)
905 struct variable *filter_var = dict_get_filter (default_dict);
906 if (filter_var != NULL)
908 proc_start_temporary_transformations ();
909 add_transformation (filter_trns_proc, NULL, filter_var);
913 /* FILTER transformation. */
915 filter_trns_proc (void *filter_var_,
916 struct ccase *c UNUSED, int case_nr UNUSED)
919 struct variable *filter_var = filter_var_;
920 double f = case_num (c, filter_var->fv);
921 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
922 ? TRNS_CONTINUE : TRNS_DROP_CASE);