1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
42 /* Procedure execution data. */
43 struct write_case_data
45 /* Function to call for each case. */
46 bool (*case_func) (const struct ccase *, void *);
49 struct ccase trns_case; /* Case used for transformations. */
50 struct ccase sink_case; /* Case written to sink, if
51 compacting is necessary. */
52 size_t cases_written; /* Cases output so far. */
55 /* Cases are read from proc_source,
56 pass through permanent_trns_chain (which transforms them into
57 the format described by permanent_dict),
58 are written to proc_sink,
59 pass through temporary_trns_chain (which transforms them into
60 the format described by default_dict),
61 and are finally passed to the procedure. */
62 static struct case_source *proc_source;
63 static struct trns_chain *permanent_trns_chain;
64 static struct dictionary *permanent_dict;
65 static struct case_sink *proc_sink;
66 static struct trns_chain *temporary_trns_chain;
67 struct dictionary *default_dict;
69 /* The transformation chain that the next transformation will be
71 static struct trns_chain *cur_trns_chain;
73 /* The compactor used to compact a case, if necessary;
74 otherwise a null pointer. */
75 static struct dict_compactor *compactor;
77 /* Time at which proc was last invoked. */
78 static time_t last_proc_invocation;
81 int n_lag; /* Number of cases to lag. */
82 static int lag_count; /* Number of cases in lag_queue so far. */
83 static int lag_head; /* Index where next case will be added. */
84 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86 static void add_case_limit_trns (void);
87 static void add_filter_trns (void);
89 static bool internal_procedure (bool (*case_func) (const struct ccase *,
91 bool (*end_func) (void *),
93 static void update_last_proc_invocation (void);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (void);
96 static bool write_case (struct write_case_data *wc_data);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static bool close_active_file (void);
101 /* Public functions. */
103 /* Returns the last time the data was read. */
105 time_of_last_procedure (void)
107 if (last_proc_invocation == 0)
108 update_last_proc_invocation ();
109 return last_proc_invocation;
112 /* Regular procedure. */
114 /* Reads the data from the input program and writes it to a new
115 active file. For each case we read from the input program, we
118 1. Execute permanent transformations. If these drop the case,
119 start the next case from step 1.
121 2. Write case to replacement active file.
123 3. Execute temporary transformations. If these drop the case,
124 start the next case from step 1.
126 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
128 Returns true if successful, false if an I/O error occurred. */
130 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
132 return internal_procedure (proc_func, NULL, aux);
135 /* Multipass procedure. */
137 struct multipass_aux_data
139 struct casefile *casefile;
141 bool (*proc_func) (const struct casefile *, void *aux);
145 /* Case processing function for multipass_procedure(). */
147 multipass_case_func (const struct ccase *c, void *aux_data_)
149 struct multipass_aux_data *aux_data = aux_data_;
150 return casefile_append (aux_data->casefile, c);
153 /* End-of-file function for multipass_procedure(). */
155 multipass_end_func (void *aux_data_)
157 struct multipass_aux_data *aux_data = aux_data_;
158 return (aux_data->proc_func == NULL
159 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
162 /* Procedure that allows multiple passes over the input data.
163 The entire active file is passed to PROC_FUNC, with the given
164 AUX as auxiliary data, as a unit. */
166 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
169 struct multipass_aux_data aux_data;
172 aux_data.casefile = fastfile_create (dict_get_next_value_idx (default_dict));
173 aux_data.proc_func = proc_func;
176 ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
177 ok = !casefile_error (aux_data.casefile) && ok;
179 casefile_destroy (aux_data.casefile);
184 /* Procedure implementation. */
186 /* Executes a procedure.
187 Passes each case to CASE_FUNC.
188 Calls END_FUNC after the last case.
189 Returns true if successful, false if an I/O error occurred (or
190 if CASE_FUNC or END_FUNC ever returned false). */
192 internal_procedure (bool (*case_func) (const struct ccase *, void *),
193 bool (*end_func) (void *),
196 struct write_case_data wc_data;
199 assert (proc_source != NULL);
201 update_last_proc_invocation ();
203 /* Optimize the trivial case where we're not going to do
204 anything with the data, by not reading the data at all. */
205 if (case_func == NULL && end_func == NULL
206 && case_source_is_class (proc_source, &storage_source_class)
208 && (temporary_trns_chain == NULL
209 || trns_chain_is_empty (temporary_trns_chain))
210 && trns_chain_is_empty (permanent_trns_chain))
213 dict_set_case_limit (default_dict, 0);
214 dict_clear_vectors (default_dict);
220 wc_data.case_func = case_func;
222 create_trns_case (&wc_data.trns_case, default_dict);
223 case_create (&wc_data.sink_case,
224 dict_get_compacted_value_cnt (default_dict));
225 wc_data.cases_written = 0;
227 ok = proc_source->class->read (proc_source,
229 write_case, &wc_data) && ok;
230 if (end_func != NULL)
231 ok = end_func (aux) && ok;
233 case_destroy (&wc_data.sink_case);
234 case_destroy (&wc_data.trns_case);
236 ok = close_active_file () && ok;
241 /* Updates last_proc_invocation. */
243 update_last_proc_invocation (void)
245 last_proc_invocation = time (NULL);
248 /* Creates and returns a case, initializing it from the vectors
249 that say which `value's need to be initialized just once, and
250 which ones need to be re-initialized before every case. */
252 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
254 size_t var_cnt = dict_get_var_cnt (dict);
257 case_create (trns_case, dict_get_next_value_idx (dict));
258 for (i = 0; i < var_cnt; i++)
260 struct variable *v = dict_get_var (dict, i);
261 union value *value = case_data_rw (trns_case, v->fv);
263 if (v->type == NUMERIC)
264 value->f = v->leave ? 0.0 : SYSMIS;
266 memset (value->s, ' ', v->width);
270 /* Makes all preparations for reading from the data source and writing
273 open_active_file (void)
275 add_case_limit_trns ();
278 /* Finalize transformations. */
279 trns_chain_finalize (cur_trns_chain);
281 /* Make permanent_dict refer to the dictionary right before
282 data reaches the sink. */
283 if (permanent_dict == NULL)
284 permanent_dict = default_dict;
286 /* Figure out whether to compact. */
287 compactor = (dict_compacting_would_shrink (permanent_dict)
288 ? dict_make_compactor (permanent_dict)
292 if (proc_sink == NULL)
293 proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
294 if (proc_sink->class->open != NULL)
295 proc_sink->class->open (proc_sink);
297 /* Allocate memory for lag queue. */
304 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
305 for (i = 0; i < n_lag; i++)
306 case_nullify (&lag_queue[i]);
310 /* Transforms trns_case and writes it to the replacement active
311 file if advisable. Returns true if more cases can be
312 accepted, false otherwise. Do not call this function again
313 after it has returned false once. */
315 write_case (struct write_case_data *wc_data)
317 enum trns_result retval;
320 /* Execute permanent transformations. */
321 case_nr = wc_data->cases_written + 1;
322 retval = trns_chain_execute (permanent_trns_chain,
323 &wc_data->trns_case, &case_nr);
324 if (retval != TRNS_CONTINUE)
327 /* Write case to LAG queue. */
329 lag_case (&wc_data->trns_case);
331 /* Write case to replacement active file. */
332 wc_data->cases_written++;
333 if (proc_sink->class->write != NULL)
335 if (compactor != NULL)
337 dict_compactor_compact (compactor, &wc_data->sink_case,
338 &wc_data->trns_case);
339 proc_sink->class->write (proc_sink, &wc_data->sink_case);
342 proc_sink->class->write (proc_sink, &wc_data->trns_case);
345 /* Execute temporary transformations. */
346 if (temporary_trns_chain != NULL)
348 retval = trns_chain_execute (temporary_trns_chain,
350 &wc_data->cases_written);
351 if (retval != TRNS_CONTINUE)
355 /* Pass case to procedure. */
356 if (wc_data->case_func != NULL)
357 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
361 clear_case (&wc_data->trns_case);
362 return retval != TRNS_ERROR;
365 /* Add C to the lag queue. */
367 lag_case (const struct ccase *c)
369 if (lag_count < n_lag)
371 case_destroy (&lag_queue[lag_head]);
372 case_clone (&lag_queue[lag_head], c);
373 if (++lag_head >= n_lag)
377 /* Clears the variables in C that need to be cleared between
380 clear_case (struct ccase *c)
382 size_t var_cnt = dict_get_var_cnt (default_dict);
385 for (i = 0; i < var_cnt; i++)
387 struct variable *v = dict_get_var (default_dict, i);
390 if (v->type == NUMERIC)
391 case_data_rw (c, v->fv)->f = SYSMIS;
393 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
398 /* Closes the active file. */
400 close_active_file (void)
402 /* Free memory for lag queue, and turn off lagging. */
407 for (i = 0; i < n_lag; i++)
408 case_destroy (&lag_queue[i]);
413 /* Dictionary from before TEMPORARY becomes permanent. */
414 proc_cancel_temporary_transformations ();
416 /* Finish compacting. */
417 if (compactor != NULL)
419 dict_compactor_destroy (compactor);
420 dict_compact_values (default_dict);
424 /* Free data source. */
425 free_case_source (proc_source);
428 /* Old data sink becomes new data source. */
429 if (proc_sink->class->make_source != NULL)
430 proc_source = proc_sink->class->make_source (proc_sink);
431 free_case_sink (proc_sink);
434 dict_clear_vectors (default_dict);
435 permanent_dict = NULL;
436 return proc_cancel_all_transformations ();
439 /* Returns a pointer to the lagged case from N_BEFORE cases before the
440 current one, or NULL if there haven't been that many cases yet. */
442 lagged_case (int n_before)
444 assert (n_before >= 1 );
445 assert (n_before <= n_lag);
447 if (n_before <= lag_count)
449 int index = lag_head - n_before;
452 return &lag_queue[index];
458 /* Procedure that separates the data into SPLIT FILE groups. */
460 /* Represents auxiliary data for handling SPLIT FILE. */
461 struct split_aux_data
463 struct ccase prev_case; /* Data in previous case. */
465 /* Callback functions. */
466 void (*begin_func) (const struct ccase *, void *);
467 bool (*proc_func) (const struct ccase *, void *);
468 void (*end_func) (void *);
472 static int equal_splits (const struct ccase *, const struct ccase *);
473 static bool split_procedure_case_func (const struct ccase *c, void *);
474 static bool split_procedure_end_func (void *);
476 /* Like procedure(), but it automatically breaks the case stream
477 into SPLIT FILE break groups. Before each group of cases with
478 identical SPLIT FILE variable values, BEGIN_FUNC is called
479 with the first case in the group.
480 Then PROC_FUNC is called for each case in the group (including
482 END_FUNC is called when the group is finished. FUNC_AUX is
483 passed to each of the functions as auxiliary data.
485 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
486 and END_FUNC will be called at all.
488 If SPLIT FILE is not in effect, then there is one break group
489 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
492 Returns true if successful, false if an I/O error occurred. */
494 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
495 bool (*proc_func) (const struct ccase *, void *aux),
496 void (*end_func) (void *aux),
499 struct split_aux_data split_aux;
502 case_nullify (&split_aux.prev_case);
503 split_aux.begin_func = begin_func;
504 split_aux.proc_func = proc_func;
505 split_aux.end_func = end_func;
506 split_aux.func_aux = func_aux;
508 ok = internal_procedure (split_procedure_case_func,
509 split_procedure_end_func, &split_aux);
511 case_destroy (&split_aux.prev_case);
516 /* Case callback used by procedure_with_splits(). */
518 split_procedure_case_func (const struct ccase *c, void *split_aux_)
520 struct split_aux_data *split_aux = split_aux_;
522 /* Start a new series if needed. */
523 if (case_is_null (&split_aux->prev_case)
524 || !equal_splits (c, &split_aux->prev_case))
526 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
527 split_aux->end_func (split_aux->func_aux);
529 case_destroy (&split_aux->prev_case);
530 case_clone (&split_aux->prev_case, c);
532 if (split_aux->begin_func != NULL)
533 split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
536 return (split_aux->proc_func == NULL
537 || split_aux->proc_func (c, split_aux->func_aux));
540 /* End-of-file callback used by procedure_with_splits(). */
542 split_procedure_end_func (void *split_aux_)
544 struct split_aux_data *split_aux = split_aux_;
546 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
547 split_aux->end_func (split_aux->func_aux);
551 /* Compares the SPLIT FILE variables in cases A and B and returns
552 nonzero only if they differ. */
554 equal_splits (const struct ccase *a, const struct ccase *b)
556 return case_compare (a, b,
557 dict_get_split_vars (default_dict),
558 dict_get_split_cnt (default_dict)) == 0;
561 /* Multipass procedure that separates the data into SPLIT FILE
564 /* Represents auxiliary data for handling SPLIT FILE in a
565 multipass procedure. */
566 struct multipass_split_aux_data
568 struct ccase prev_case; /* Data in previous case. */
569 struct casefile *casefile; /* Accumulates data for a split. */
571 /* Function to call with the accumulated data. */
572 bool (*split_func) (const struct ccase *first, const struct casefile *,
574 void *func_aux; /* Auxiliary data. */
577 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
578 static bool multipass_split_end_func (void *aux_);
579 static bool multipass_split_output (struct multipass_split_aux_data *);
581 /* Returns true if successful, false if an I/O error occurred. */
583 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
584 const struct casefile *,
588 struct multipass_split_aux_data aux;
591 case_nullify (&aux.prev_case);
593 aux.split_func = split_func;
594 aux.func_aux = func_aux;
596 ok = internal_procedure (multipass_split_case_func,
597 multipass_split_end_func, &aux);
598 case_destroy (&aux.prev_case);
603 /* Case callback used by multipass_procedure_with_splits(). */
605 multipass_split_case_func (const struct ccase *c, void *aux_)
607 struct multipass_split_aux_data *aux = aux_;
610 /* Start a new series if needed. */
611 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
613 /* Record split values. */
614 case_destroy (&aux->prev_case);
615 case_clone (&aux->prev_case, c);
617 /* Pass any cases to split_func. */
618 if (aux->casefile != NULL)
619 ok = multipass_split_output (aux);
621 /* Start a new casefile. */
622 aux->casefile = fastfile_create (dict_get_next_value_idx (default_dict));
625 return casefile_append (aux->casefile, c) && ok;
628 /* End-of-file callback used by multipass_procedure_with_splits(). */
630 multipass_split_end_func (void *aux_)
632 struct multipass_split_aux_data *aux = aux_;
633 return (aux->casefile == NULL || multipass_split_output (aux));
637 multipass_split_output (struct multipass_split_aux_data *aux)
641 assert (aux->casefile != NULL);
642 ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
643 casefile_destroy (aux->casefile);
644 aux->casefile = NULL;
649 /* Discards all the current state in preparation for a data-input
650 command like DATA LIST or GET. */
652 discard_variables (void)
654 dict_clear (default_dict);
655 fh_set_default_handle (NULL);
659 free_case_source (proc_source);
662 proc_cancel_all_transformations ();
665 /* Returns the current set of permanent transformations,
666 and clears the permanent transformations.
667 For use by INPUT PROGRAM. */
669 proc_capture_transformations (void)
671 struct trns_chain *chain;
673 assert (temporary_trns_chain == NULL);
674 chain = permanent_trns_chain;
675 cur_trns_chain = permanent_trns_chain = trns_chain_create ();
679 /* Adds a transformation that processes a case with PROC and
680 frees itself with FREE to the current set of transformations.
681 The functions are passed AUX as auxiliary data. */
683 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
685 trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
688 /* Adds a transformation that processes a case with PROC and
689 frees itself with FREE to the current set of transformations.
690 When parsing of the block of transformations is complete,
691 FINALIZE will be called.
692 The functions are passed AUX as auxiliary data. */
694 add_transformation_with_finalizer (trns_finalize_func *finalize,
695 trns_proc_func *proc,
696 trns_free_func *free, void *aux)
698 trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
701 /* Returns the index of the next transformation.
702 This value can be returned by a transformation procedure
703 function to indicate a "jump" to that transformation. */
705 next_transformation (void)
707 return trns_chain_next (cur_trns_chain);
710 /* Returns true if the next call to add_transformation() will add
711 a temporary transformation, false if it will add a permanent
714 proc_in_temporary_transformations (void)
716 return temporary_trns_chain != NULL;
719 /* Marks the start of temporary transformations.
720 Further calls to add_transformation() will add temporary
723 proc_start_temporary_transformations (void)
725 if (!proc_in_temporary_transformations ())
727 add_case_limit_trns ();
729 permanent_dict = dict_clone (default_dict);
730 trns_chain_finalize (permanent_trns_chain);
731 temporary_trns_chain = cur_trns_chain = trns_chain_create ();
735 /* Converts all the temporary transformations, if any, to
736 permanent transformations. Further transformations will be
738 Returns true if anything changed, false otherwise. */
740 proc_make_temporary_transformations_permanent (void)
742 if (proc_in_temporary_transformations ())
744 trns_chain_finalize (temporary_trns_chain);
745 trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
746 temporary_trns_chain = NULL;
748 dict_destroy (permanent_dict);
749 permanent_dict = NULL;
757 /* Cancels all temporary transformations, if any. Further
758 transformations will be permanent.
759 Returns true if anything changed, false otherwise. */
761 proc_cancel_temporary_transformations (void)
763 if (proc_in_temporary_transformations ())
765 dict_destroy (default_dict);
766 default_dict = permanent_dict;
767 permanent_dict = NULL;
769 trns_chain_destroy (temporary_trns_chain);
770 temporary_trns_chain = NULL;
778 /* Cancels all transformations, if any.
779 Returns true if successful, false on I/O error. */
781 proc_cancel_all_transformations (void)
784 ok = trns_chain_destroy (permanent_trns_chain);
785 ok = trns_chain_destroy (temporary_trns_chain) && ok;
786 permanent_trns_chain = cur_trns_chain = trns_chain_create ();
787 temporary_trns_chain = NULL;
791 /* Initializes procedure handling. */
795 default_dict = dict_create ();
796 proc_cancel_all_transformations ();
799 /* Finishes up procedure handling. */
803 discard_variables ();
804 dict_destroy (default_dict);
807 /* Sets SINK as the destination for procedure output from the
810 proc_set_sink (struct case_sink *sink)
812 assert (proc_sink == NULL);
816 /* Sets SOURCE as the source for procedure input for the next
819 proc_set_source (struct case_source *source)
821 assert (proc_source == NULL);
822 proc_source = source;
825 /* Returns true if a source for the next procedure has been
826 configured, false otherwise. */
828 proc_has_source (void)
830 return proc_source != NULL;
833 /* Returns the output from the previous procedure.
834 For use only immediately after executing a procedure.
835 The returned casefile is owned by the caller; it will not be
836 automatically used for the next procedure's input. */
838 proc_capture_output (void)
840 struct casefile *casefile;
842 /* Try to make sure that this function is called immediately
843 after procedure() or a similar function. */
844 assert (proc_source != NULL);
845 assert (case_source_is_class (proc_source, &storage_source_class));
846 assert (trns_chain_is_empty (permanent_trns_chain));
847 assert (!proc_in_temporary_transformations ());
849 casefile = storage_source_decapsulate (proc_source);
855 static trns_proc_func case_limit_trns_proc;
856 static trns_free_func case_limit_trns_free;
858 /* Adds a transformation that limits the number of cases that may
859 pass through, if default_dict has a case limit. */
861 add_case_limit_trns (void)
863 size_t case_limit = dict_get_case_limit (default_dict);
866 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
867 *cases_remaining = case_limit;
868 add_transformation (case_limit_trns_proc, case_limit_trns_free,
870 dict_set_case_limit (default_dict, 0);
874 /* Limits the maximum number of cases processed to
877 case_limit_trns_proc (void *cases_remaining_,
878 struct ccase *c UNUSED, casenum_t case_nr UNUSED)
880 size_t *cases_remaining = cases_remaining_;
881 if (*cases_remaining > 0)
883 (*cases_remaining)--;
884 return TRNS_CONTINUE;
887 return TRNS_DROP_CASE;
890 /* Frees the data associated with a case limit transformation. */
892 case_limit_trns_free (void *cases_remaining_)
894 size_t *cases_remaining = cases_remaining_;
895 free (cases_remaining);
899 static trns_proc_func filter_trns_proc;
901 /* Adds a temporary transformation to filter data according to
902 the variable specified on FILTER, if any. */
904 add_filter_trns (void)
906 struct variable *filter_var = dict_get_filter (default_dict);
907 if (filter_var != NULL)
909 proc_start_temporary_transformations ();
910 add_transformation (filter_trns_proc, NULL, filter_var);
914 /* FILTER transformation. */
916 filter_trns_proc (void *filter_var_,
917 struct ccase *c UNUSED, casenum_t case_nr UNUSED)
920 struct variable *filter_var = filter_var_;
921 double f = case_num (c, filter_var->fv);
922 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
923 ? TRNS_CONTINUE : TRNS_DROP_CASE);