1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
41 /* Procedure execution data. */
42 struct write_case_data
44 /* Function to call for each case. */
45 bool (*case_func) (const struct ccase *, void *);
48 struct ccase trns_case; /* Case used for transformations. */
49 struct ccase sink_case; /* Case written to sink, if
50 compacting is necessary. */
51 size_t cases_written; /* Cases output so far. */
54 /* Cases are read from proc_source,
55 pass through permanent_trns_chain (which transforms them into
56 the format described by permanent_dict),
57 are written to proc_sink,
58 pass through temporary_trns_chain (which transforms them into
59 the format described by default_dict),
60 and are finally passed to the procedure. */
61 static struct case_source *proc_source;
62 static struct trns_chain *permanent_trns_chain;
63 static struct dictionary *permanent_dict;
64 static struct case_sink *proc_sink;
65 static struct trns_chain *temporary_trns_chain;
66 struct dictionary *default_dict;
68 /* The transformation chain that the next transformation will be
70 static struct trns_chain *cur_trns_chain;
72 /* The compactor used to compact a case, if necessary;
73 otherwise a null pointer. */
74 static struct dict_compactor *compactor;
76 /* Time at which proc was last invoked. */
77 static time_t last_proc_invocation;
80 int n_lag; /* Number of cases to lag. */
81 static int lag_count; /* Number of cases in lag_queue so far. */
82 static int lag_head; /* Index where next case will be added. */
83 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
85 static void add_case_limit_trns (void);
86 static void add_filter_trns (void);
88 static bool internal_procedure (bool (*case_func) (const struct ccase *,
90 bool (*end_func) (void *),
92 static void update_last_proc_invocation (void);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static bool write_case (struct write_case_data *wc_data);
96 static void lag_case (const struct ccase *c);
97 static void clear_case (struct ccase *c);
98 static bool close_active_file (void);
100 /* Public functions. */
102 /* Returns the last time the data was read. */
104 time_of_last_procedure (void)
106 if (last_proc_invocation == 0)
107 update_last_proc_invocation ();
108 return last_proc_invocation;
111 /* Regular procedure. */
113 /* Reads the data from the input program and writes it to a new
114 active file. For each case we read from the input program, we
117 1. Execute permanent transformations. If these drop the case,
118 start the next case from step 1.
120 2. Write case to replacement active file.
122 3. Execute temporary transformations. If these drop the case,
123 start the next case from step 1.
125 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
127 Returns true if successful, false if an I/O error occurred. */
129 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
131 return internal_procedure (proc_func, NULL, aux);
134 /* Multipass procedure. */
136 struct multipass_aux_data
138 struct casefile *casefile;
140 bool (*proc_func) (const struct casefile *, void *aux);
144 /* Case processing function for multipass_procedure(). */
146 multipass_case_func (const struct ccase *c, void *aux_data_)
148 struct multipass_aux_data *aux_data = aux_data_;
149 return casefile_append (aux_data->casefile, c);
152 /* End-of-file function for multipass_procedure(). */
154 multipass_end_func (void *aux_data_)
156 struct multipass_aux_data *aux_data = aux_data_;
157 return (aux_data->proc_func == NULL
158 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
161 /* Procedure that allows multiple passes over the input data.
162 The entire active file is passed to PROC_FUNC, with the given
163 AUX as auxiliary data, as a unit. */
165 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
168 struct multipass_aux_data aux_data;
171 aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
172 aux_data.proc_func = proc_func;
175 ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
176 ok = !casefile_error (aux_data.casefile) && ok;
178 casefile_destroy (aux_data.casefile);
183 /* Procedure implementation. */
185 /* Executes a procedure.
186 Passes each case to CASE_FUNC.
187 Calls END_FUNC after the last case.
188 Returns true if successful, false if an I/O error occurred (or
189 if CASE_FUNC or END_FUNC ever returned false). */
191 internal_procedure (bool (*case_func) (const struct ccase *, void *),
192 bool (*end_func) (void *),
195 struct write_case_data wc_data;
198 assert (proc_source != NULL);
200 update_last_proc_invocation ();
202 /* Optimize the trivial case where we're not going to do
203 anything with the data, by not reading the data at all. */
204 if (case_func == NULL && end_func == NULL
205 && case_source_is_class (proc_source, &storage_source_class)
207 && (temporary_trns_chain == NULL
208 || trns_chain_is_empty (temporary_trns_chain))
209 && trns_chain_is_empty (permanent_trns_chain))
212 dict_set_case_limit (default_dict, 0);
213 dict_clear_vectors (default_dict);
219 wc_data.case_func = case_func;
221 create_trns_case (&wc_data.trns_case, default_dict);
222 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
223 wc_data.cases_written = 0;
225 ok = proc_source->class->read (proc_source,
227 write_case, &wc_data) && ok;
228 if (end_func != NULL)
229 ok = end_func (aux) && ok;
231 case_destroy (&wc_data.sink_case);
232 case_destroy (&wc_data.trns_case);
234 ok = close_active_file () && ok;
239 /* Updates last_proc_invocation. */
241 update_last_proc_invocation (void)
243 last_proc_invocation = time (NULL);
246 /* Creates and returns a case, initializing it from the vectors
247 that say which `value's need to be initialized just once, and
248 which ones need to be re-initialized before every case. */
250 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
252 size_t var_cnt = dict_get_var_cnt (dict);
255 case_create (trns_case, dict_get_next_value_idx (dict));
256 for (i = 0; i < var_cnt; i++)
258 struct variable *v = dict_get_var (dict, i);
259 union value *value = case_data_rw (trns_case, v->fv);
261 if (v->type == NUMERIC)
262 value->f = v->leave ? 0.0 : SYSMIS;
264 memset (value->s, ' ', v->width);
268 /* Makes all preparations for reading from the data source and writing
271 open_active_file (void)
273 add_case_limit_trns ();
276 /* Finalize transformations. */
277 trns_chain_finalize (cur_trns_chain);
279 /* Make permanent_dict refer to the dictionary right before
280 data reaches the sink. */
281 if (permanent_dict == NULL)
282 permanent_dict = default_dict;
284 /* Figure out whether to compact. */
285 compactor = (dict_compacting_would_shrink (permanent_dict)
286 ? dict_make_compactor (permanent_dict)
290 if (proc_sink == NULL)
291 proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
292 if (proc_sink->class->open != NULL)
293 proc_sink->class->open (proc_sink);
295 /* Allocate memory for lag queue. */
302 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
303 for (i = 0; i < n_lag; i++)
304 case_nullify (&lag_queue[i]);
308 /* Transforms trns_case and writes it to the replacement active
309 file if advisable. Returns true if more cases can be
310 accepted, false otherwise. Do not call this function again
311 after it has returned false once. */
313 write_case (struct write_case_data *wc_data)
315 enum trns_result retval;
318 /* Execute permanent transformations. */
319 case_nr = wc_data->cases_written + 1;
320 retval = trns_chain_execute (permanent_trns_chain,
321 &wc_data->trns_case, &case_nr);
322 if (retval != TRNS_CONTINUE)
325 /* Write case to LAG queue. */
327 lag_case (&wc_data->trns_case);
329 /* Write case to replacement active file. */
330 wc_data->cases_written++;
331 if (proc_sink->class->write != NULL)
333 if (compactor != NULL)
335 dict_compactor_compact (compactor, &wc_data->sink_case,
336 &wc_data->trns_case);
337 proc_sink->class->write (proc_sink, &wc_data->sink_case);
340 proc_sink->class->write (proc_sink, &wc_data->trns_case);
343 /* Execute temporary transformations. */
344 if (temporary_trns_chain != NULL)
346 retval = trns_chain_execute (temporary_trns_chain,
348 &wc_data->cases_written);
349 if (retval != TRNS_CONTINUE)
353 /* Pass case to procedure. */
354 if (wc_data->case_func != NULL)
355 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
359 clear_case (&wc_data->trns_case);
360 return retval != TRNS_ERROR;
363 /* Add C to the lag queue. */
365 lag_case (const struct ccase *c)
367 if (lag_count < n_lag)
369 case_destroy (&lag_queue[lag_head]);
370 case_clone (&lag_queue[lag_head], c);
371 if (++lag_head >= n_lag)
375 /* Clears the variables in C that need to be cleared between
378 clear_case (struct ccase *c)
380 size_t var_cnt = dict_get_var_cnt (default_dict);
383 for (i = 0; i < var_cnt; i++)
385 struct variable *v = dict_get_var (default_dict, i);
388 if (v->type == NUMERIC)
389 case_data_rw (c, v->fv)->f = SYSMIS;
391 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
396 /* Closes the active file. */
398 close_active_file (void)
400 /* Free memory for lag queue, and turn off lagging. */
405 for (i = 0; i < n_lag; i++)
406 case_destroy (&lag_queue[i]);
411 /* Dictionary from before TEMPORARY becomes permanent. */
412 proc_cancel_temporary_transformations ();
414 /* Finish compacting. */
415 if (compactor != NULL)
417 dict_compactor_destroy (compactor);
418 dict_compact_values (default_dict);
422 /* Free data source. */
423 free_case_source (proc_source);
426 /* Old data sink becomes new data source. */
427 if (proc_sink->class->make_source != NULL)
428 proc_source = proc_sink->class->make_source (proc_sink);
429 free_case_sink (proc_sink);
432 dict_clear_vectors (default_dict);
433 permanent_dict = NULL;
434 return proc_cancel_all_transformations ();
437 /* Returns a pointer to the lagged case from N_BEFORE cases before the
438 current one, or NULL if there haven't been that many cases yet. */
440 lagged_case (int n_before)
442 assert (n_before >= 1 );
443 assert (n_before <= n_lag);
445 if (n_before <= lag_count)
447 int index = lag_head - n_before;
450 return &lag_queue[index];
456 /* Procedure that separates the data into SPLIT FILE groups. */
458 /* Represents auxiliary data for handling SPLIT FILE. */
459 struct split_aux_data
461 size_t case_count; /* Number of cases so far. */
462 struct ccase prev_case; /* Data in previous case. */
464 /* Callback functions. */
465 void (*begin_func) (const struct ccase *, void *);
466 bool (*proc_func) (const struct ccase *, void *);
467 void (*end_func) (void *);
471 static int equal_splits (const struct ccase *, const struct ccase *);
472 static bool split_procedure_case_func (const struct ccase *c, void *);
473 static bool split_procedure_end_func (void *);
475 /* Like procedure(), but it automatically breaks the case stream
476 into SPLIT FILE break groups. Before each group of cases with
477 identical SPLIT FILE variable values, BEGIN_FUNC is called
478 with the first case in the group.
479 Then PROC_FUNC is called for each case in the group (including
481 END_FUNC is called when the group is finished. FUNC_AUX is
482 passed to each of the functions as auxiliary data.
484 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
485 and END_FUNC will be called at all.
487 If SPLIT FILE is not in effect, then there is one break group
488 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
491 Returns true if successful, false if an I/O error occurred. */
493 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
494 bool (*proc_func) (const struct ccase *, void *aux),
495 void (*end_func) (void *aux),
498 struct split_aux_data split_aux;
501 split_aux.case_count = 0;
502 case_nullify (&split_aux.prev_case);
503 split_aux.begin_func = begin_func;
504 split_aux.proc_func = proc_func;
505 split_aux.end_func = end_func;
506 split_aux.func_aux = func_aux;
508 ok = internal_procedure (split_procedure_case_func,
509 split_procedure_end_func, &split_aux);
511 case_destroy (&split_aux.prev_case);
516 /* Case callback used by procedure_with_splits(). */
518 split_procedure_case_func (const struct ccase *c, void *split_aux_)
520 struct split_aux_data *split_aux = split_aux_;
522 /* Start a new series if needed. */
523 if (split_aux->case_count == 0
524 || !equal_splits (c, &split_aux->prev_case))
526 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
527 split_aux->end_func (split_aux->func_aux);
529 case_destroy (&split_aux->prev_case);
530 case_clone (&split_aux->prev_case, c);
532 if (split_aux->begin_func != NULL)
533 split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
536 split_aux->case_count++;
537 return (split_aux->proc_func == NULL
538 || split_aux->proc_func (c, split_aux->func_aux));
541 /* End-of-file callback used by procedure_with_splits(). */
543 split_procedure_end_func (void *split_aux_)
545 struct split_aux_data *split_aux = split_aux_;
547 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
548 split_aux->end_func (split_aux->func_aux);
552 /* Compares the SPLIT FILE variables in cases A and B and returns
553 nonzero only if they differ. */
555 equal_splits (const struct ccase *a, const struct ccase *b)
557 return case_compare (a, b,
558 dict_get_split_vars (default_dict),
559 dict_get_split_cnt (default_dict)) == 0;
562 /* Multipass procedure that separates the data into SPLIT FILE
565 /* Represents auxiliary data for handling SPLIT FILE in a
566 multipass procedure. */
567 struct multipass_split_aux_data
569 struct ccase prev_case; /* Data in previous case. */
570 struct casefile *casefile; /* Accumulates data for a split. */
572 /* Function to call with the accumulated data. */
573 bool (*split_func) (const struct ccase *first, const struct casefile *,
575 void *func_aux; /* Auxiliary data. */
578 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
579 static bool multipass_split_end_func (void *aux_);
580 static bool multipass_split_output (struct multipass_split_aux_data *);
582 /* Returns true if successful, false if an I/O error occurred. */
584 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
585 const struct casefile *,
589 struct multipass_split_aux_data aux;
592 case_nullify (&aux.prev_case);
594 aux.split_func = split_func;
595 aux.func_aux = func_aux;
597 ok = internal_procedure (multipass_split_case_func,
598 multipass_split_end_func, &aux);
599 case_destroy (&aux.prev_case);
604 /* Case callback used by multipass_procedure_with_splits(). */
606 multipass_split_case_func (const struct ccase *c, void *aux_)
608 struct multipass_split_aux_data *aux = aux_;
611 /* Start a new series if needed. */
612 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
614 /* Record split values. */
615 case_destroy (&aux->prev_case);
616 case_clone (&aux->prev_case, c);
618 /* Pass any cases to split_func. */
619 if (aux->casefile != NULL)
620 ok = multipass_split_output (aux);
622 /* Start a new casefile. */
623 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
626 return casefile_append (aux->casefile, c) && ok;
629 /* End-of-file callback used by multipass_procedure_with_splits(). */
631 multipass_split_end_func (void *aux_)
633 struct multipass_split_aux_data *aux = aux_;
634 return (aux->casefile == NULL || multipass_split_output (aux));
638 multipass_split_output (struct multipass_split_aux_data *aux)
642 assert (aux->casefile != NULL);
643 ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
644 casefile_destroy (aux->casefile);
645 aux->casefile = NULL;
650 /* Discards all the current state in preparation for a data-input
651 command like DATA LIST or GET. */
653 discard_variables (void)
655 dict_clear (default_dict);
656 fh_set_default_handle (NULL);
660 free_case_source (proc_source);
663 proc_cancel_all_transformations ();
666 /* Returns the current set of permanent transformations,
667 and clears the permanent transformations.
668 For use by INPUT PROGRAM. */
670 proc_capture_transformations (void)
672 struct trns_chain *chain;
674 assert (temporary_trns_chain == NULL);
675 chain = permanent_trns_chain;
676 cur_trns_chain = permanent_trns_chain = trns_chain_create ();
680 /* Adds a transformation that processes a case with PROC and
681 frees itself with FREE to the current set of transformations.
682 The functions are passed AUX as auxiliary data. */
684 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
686 trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
689 /* Adds a transformation that processes a case with PROC and
690 frees itself with FREE to the current set of transformations.
691 When parsing of the block of transformations is complete,
692 FINALIZE will be called.
693 The functions are passed AUX as auxiliary data. */
695 add_transformation_with_finalizer (trns_finalize_func *finalize,
696 trns_proc_func *proc,
697 trns_free_func *free, void *aux)
699 trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
702 /* Returns the index of the next transformation.
703 This value can be returned by a transformation procedure
704 function to indicate a "jump" to that transformation. */
706 next_transformation (void)
708 return trns_chain_next (cur_trns_chain);
711 /* Returns true if the next call to add_transformation() will add
712 a temporary transformation, false if it will add a permanent
715 proc_in_temporary_transformations (void)
717 return temporary_trns_chain != NULL;
720 /* Marks the start of temporary transformations.
721 Further calls to add_transformation() will add temporary
724 proc_start_temporary_transformations (void)
726 if (!proc_in_temporary_transformations ())
728 add_case_limit_trns ();
730 permanent_dict = dict_clone (default_dict);
731 trns_chain_finalize (permanent_trns_chain);
732 temporary_trns_chain = cur_trns_chain = trns_chain_create ();
736 /* Converts all the temporary transformations, if any, to
737 permanent transformations. Further transformations will be
739 Returns true if anything changed, false otherwise. */
741 proc_make_temporary_transformations_permanent (void)
743 if (proc_in_temporary_transformations ())
745 trns_chain_finalize (temporary_trns_chain);
746 trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
747 temporary_trns_chain = NULL;
749 dict_destroy (permanent_dict);
750 permanent_dict = NULL;
758 /* Cancels all temporary transformations, if any. Further
759 transformations will be permanent.
760 Returns true if anything changed, false otherwise. */
762 proc_cancel_temporary_transformations (void)
764 if (proc_in_temporary_transformations ())
766 dict_destroy (default_dict);
767 default_dict = permanent_dict;
768 permanent_dict = NULL;
770 trns_chain_destroy (temporary_trns_chain);
771 temporary_trns_chain = NULL;
779 /* Cancels all transformations, if any.
780 Returns true if successful, false on I/O error. */
782 proc_cancel_all_transformations (void)
785 ok = trns_chain_destroy (permanent_trns_chain);
786 ok = trns_chain_destroy (temporary_trns_chain) && ok;
787 permanent_trns_chain = cur_trns_chain = trns_chain_create ();
788 temporary_trns_chain = NULL;
792 /* Initializes procedure handling. */
796 default_dict = dict_create ();
797 proc_cancel_all_transformations ();
800 /* Finishes up procedure handling. */
804 discard_variables ();
807 /* Sets SINK as the destination for procedure output from the
810 proc_set_sink (struct case_sink *sink)
812 assert (proc_sink == NULL);
816 /* Sets SOURCE as the source for procedure input for the next
819 proc_set_source (struct case_source *source)
821 assert (proc_source == NULL);
822 proc_source = source;
825 /* Returns true if a source for the next procedure has been
826 configured, false otherwise. */
828 proc_has_source (void)
830 return proc_source != NULL;
833 /* Returns the output from the previous procedure.
834 For use only immediately after executing a procedure.
835 The returned casefile is owned by the caller; it will not be
836 automatically used for the next procedure's input. */
838 proc_capture_output (void)
840 struct casefile *casefile;
842 /* Try to make sure that this function is called immediately
843 after procedure() or a similar function. */
844 assert (proc_source != NULL);
845 assert (case_source_is_class (proc_source, &storage_source_class));
846 assert (trns_chain_is_empty (permanent_trns_chain));
847 assert (!proc_in_temporary_transformations ());
849 casefile = storage_source_decapsulate (proc_source);
855 static trns_proc_func case_limit_trns_proc;
856 static trns_free_func case_limit_trns_free;
858 /* Adds a transformation that limits the number of cases that may
859 pass through, if default_dict has a case limit. */
861 add_case_limit_trns (void)
863 size_t case_limit = dict_get_case_limit (default_dict);
866 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
867 *cases_remaining = case_limit;
868 add_transformation (case_limit_trns_proc, case_limit_trns_free,
870 dict_set_case_limit (default_dict, 0);
874 /* Limits the maximum number of cases processed to
877 case_limit_trns_proc (void *cases_remaining_,
878 struct ccase *c UNUSED, int case_nr UNUSED)
880 size_t *cases_remaining = cases_remaining_;
881 if (*cases_remaining > 0)
884 return TRNS_CONTINUE;
887 return TRNS_DROP_CASE;
890 /* Frees the data associated with a case limit transformation. */
892 case_limit_trns_free (void *cases_remaining_)
894 size_t *cases_remaining = cases_remaining_;
895 free (cases_remaining);
899 static trns_proc_func filter_trns_proc;
901 /* Adds a temporary transformation to filter data according to
902 the variable specified on FILTER, if any. */
904 add_filter_trns (void)
906 struct variable *filter_var = dict_get_filter (default_dict);
907 if (filter_var != NULL)
909 proc_start_temporary_transformations ();
910 add_transformation (filter_trns_proc, NULL, filter_var);
914 /* FILTER transformation. */
916 filter_trns_proc (void *filter_var_,
917 struct ccase *c UNUSED, int case_nr UNUSED)
920 struct variable *filter_var = filter_var_;
921 double f = case_num (c, filter_var->fv);
922 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
923 ? TRNS_CONTINUE : TRNS_DROP_CASE);