1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
42 Virtual File Manager (vfm):
44 vfm is used to process data files. It uses the model that
45 data is read from one stream (the data source), processed,
46 then written to another (the data sink). The data source is
47 then deleted and the data sink becomes the data source for the
50 /* Procedure execution data. */
51 struct write_case_data
53 /* Function to call for each case. */
54 bool (*case_func) (const struct ccase *, void *);
57 struct ccase trns_case; /* Case used for transformations. */
58 struct ccase sink_case; /* Case written to sink, if
59 compaction is necessary. */
60 size_t cases_written; /* Cases output so far. */
63 /* Cases are read from vfm_source,
64 pass through permanent_trns_chain (which transforms them into
65 the format described by permanent_dict),
66 are written to vfm_sink,
67 pass through temporary_trns_chain (which transforms them into
68 the format described by default_dict),
69 and are finally passed to the procedure. */
70 static struct case_source *vfm_source;
71 static struct trns_chain *permanent_trns_chain;
72 static struct dictionary *permanent_dict;
73 static struct case_sink *vfm_sink;
74 static struct trns_chain *temporary_trns_chain;
75 struct dictionary *default_dict;
77 /* The transformation chain that the next transformation will be
79 static struct trns_chain *cur_trns_chain;
81 /* The compactor used to compact a case, if necessary;
82 otherwise a null pointer. */
83 static struct dict_compactor *compactor;
85 /* Time at which vfm was last invoked. */
86 static time_t last_vfm_invocation;
89 int n_lag; /* Number of cases to lag. */
90 static int lag_count; /* Number of cases in lag_queue so far. */
91 static int lag_head; /* Index where next case will be added. */
92 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
94 static void add_case_limit_trns (void);
95 static void add_filter_trns (void);
97 static bool internal_procedure (bool (*case_func) (const struct ccase *,
99 bool (*end_func) (void *),
101 static void update_last_vfm_invocation (void);
102 static void create_trns_case (struct ccase *, struct dictionary *);
103 static void open_active_file (void);
104 static bool write_case (struct write_case_data *wc_data);
105 static void lag_case (const struct ccase *c);
106 static void clear_case (struct ccase *c);
107 static bool close_active_file (void);
109 /* Public functions. */
111 /* Returns the last time the data was read. */
113 time_of_last_procedure (void)
115 if (last_vfm_invocation == 0)
116 update_last_vfm_invocation ();
117 return last_vfm_invocation;
120 /* Regular procedure. */
122 /* Reads the data from the input program and writes it to a new
123 active file. For each case we read from the input program, we
126 1. Execute permanent transformations. If these drop the case,
127 start the next case from step 1.
129 2. Write case to replacement active file.
131 3. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
136 Returns true if successful, false if an I/O error occurred. */
138 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
140 return internal_procedure (proc_func, NULL, aux);
143 /* Multipass procedure. */
145 struct multipass_aux_data
147 struct casefile *casefile;
149 bool (*proc_func) (const struct casefile *, void *aux);
153 /* Case processing function for multipass_procedure(). */
155 multipass_case_func (const struct ccase *c, void *aux_data_)
157 struct multipass_aux_data *aux_data = aux_data_;
158 return casefile_append (aux_data->casefile, c);
161 /* End-of-file function for multipass_procedure(). */
163 multipass_end_func (void *aux_data_)
165 struct multipass_aux_data *aux_data = aux_data_;
166 return (aux_data->proc_func == NULL
167 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
170 /* Procedure that allows multiple passes over the input data.
171 The entire active file is passed to PROC_FUNC, with the given
172 AUX as auxiliary data, as a unit. */
174 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
177 struct multipass_aux_data aux_data;
180 aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
181 aux_data.proc_func = proc_func;
184 ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
185 ok = !casefile_error (aux_data.casefile) && ok;
187 casefile_destroy (aux_data.casefile);
192 /* Procedure implementation. */
194 /* Executes a procedure.
195 Passes each case to CASE_FUNC.
196 Calls END_FUNC after the last case.
197 Returns true if successful, false if an I/O error occurred (or
198 if CASE_FUNC or END_FUNC ever returned false). */
200 internal_procedure (bool (*case_func) (const struct ccase *, void *),
201 bool (*end_func) (void *),
204 struct write_case_data wc_data;
207 assert (vfm_source != NULL);
209 update_last_vfm_invocation ();
211 /* Optimize the trivial case where we're not going to do
212 anything with the data, by not reading the data at all. */
213 if (case_func == NULL && end_func == NULL
214 && case_source_is_class (vfm_source, &storage_source_class)
216 && (temporary_trns_chain == NULL
217 || trns_chain_is_empty (temporary_trns_chain))
218 && trns_chain_is_empty (permanent_trns_chain))
221 dict_set_case_limit (default_dict, 0);
222 dict_clear_vectors (default_dict);
228 wc_data.case_func = case_func;
230 create_trns_case (&wc_data.trns_case, default_dict);
231 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
232 wc_data.cases_written = 0;
234 ok = vfm_source->class->read (vfm_source,
236 write_case, &wc_data) && ok;
237 if (end_func != NULL)
238 ok = end_func (aux) && ok;
240 case_destroy (&wc_data.sink_case);
241 case_destroy (&wc_data.trns_case);
243 ok = close_active_file () && ok;
248 /* Updates last_vfm_invocation. */
250 update_last_vfm_invocation (void)
252 last_vfm_invocation = time (NULL);
255 /* Creates and returns a case, initializing it from the vectors
256 that say which `value's need to be initialized just once, and
257 which ones need to be re-initialized before every case. */
259 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
261 size_t var_cnt = dict_get_var_cnt (dict);
264 case_create (trns_case, dict_get_next_value_idx (dict));
265 for (i = 0; i < var_cnt; i++)
267 struct variable *v = dict_get_var (dict, i);
268 union value *value = case_data_rw (trns_case, v->fv);
270 if (v->type == NUMERIC)
271 value->f = v->leave ? 0.0 : SYSMIS;
273 memset (value->s, ' ', v->width);
277 /* Makes all preparations for reading from the data source and writing
280 open_active_file (void)
282 add_case_limit_trns ();
285 /* Finalize transformations. */
286 trns_chain_finalize (cur_trns_chain);
288 /* Make permanent_dict refer to the dictionary right before
289 data reaches the sink. */
290 if (permanent_dict == NULL)
291 permanent_dict = default_dict;
293 /* Figure out compaction. */
294 compactor = (dict_needs_compaction (permanent_dict)
295 ? dict_make_compactor (permanent_dict)
299 if (vfm_sink == NULL)
300 vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
301 if (vfm_sink->class->open != NULL)
302 vfm_sink->class->open (vfm_sink);
304 /* Allocate memory for lag queue. */
311 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
312 for (i = 0; i < n_lag; i++)
313 case_nullify (&lag_queue[i]);
317 /* Transforms trns_case and writes it to the replacement active
318 file if advisable. Returns true if more cases can be
319 accepted, false otherwise. Do not call this function again
320 after it has returned false once. */
322 write_case (struct write_case_data *wc_data)
324 enum trns_result retval;
327 /* Execute permanent transformations. */
328 case_nr = wc_data->cases_written + 1;
329 retval = trns_chain_execute (permanent_trns_chain,
330 &wc_data->trns_case, &case_nr);
331 if (retval != TRNS_CONTINUE)
334 /* Write case to LAG queue. */
336 lag_case (&wc_data->trns_case);
338 /* Write case to replacement active file. */
339 wc_data->cases_written++;
340 if (vfm_sink->class->write != NULL)
342 if (compactor != NULL)
344 dict_compactor_compact (compactor, &wc_data->sink_case,
345 &wc_data->trns_case);
346 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
349 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
352 /* Execute temporary transformations. */
353 if (temporary_trns_chain != NULL)
355 retval = trns_chain_execute (temporary_trns_chain,
357 &wc_data->cases_written);
358 if (retval != TRNS_CONTINUE)
362 /* Pass case to procedure. */
363 if (wc_data->case_func != NULL)
364 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
368 clear_case (&wc_data->trns_case);
369 return retval != TRNS_ERROR;
372 /* Add C to the lag queue. */
374 lag_case (const struct ccase *c)
376 if (lag_count < n_lag)
378 case_destroy (&lag_queue[lag_head]);
379 case_clone (&lag_queue[lag_head], c);
380 if (++lag_head >= n_lag)
384 /* Clears the variables in C that need to be cleared between
387 clear_case (struct ccase *c)
389 size_t var_cnt = dict_get_var_cnt (default_dict);
392 for (i = 0; i < var_cnt; i++)
394 struct variable *v = dict_get_var (default_dict, i);
397 if (v->type == NUMERIC)
398 case_data_rw (c, v->fv)->f = SYSMIS;
400 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
405 /* Closes the active file. */
407 close_active_file (void)
409 /* Free memory for lag queue, and turn off lagging. */
414 for (i = 0; i < n_lag; i++)
415 case_destroy (&lag_queue[i]);
420 /* Dictionary from before TEMPORARY becomes permanent. */
421 proc_cancel_temporary_transformations ();
423 /* Finish compaction. */
424 if (compactor != NULL)
426 dict_compactor_destroy (compactor);
427 dict_compact_values (default_dict);
431 /* Free data source. */
432 free_case_source (vfm_source);
435 /* Old data sink becomes new data source. */
436 if (vfm_sink->class->make_source != NULL)
437 vfm_source = vfm_sink->class->make_source (vfm_sink);
438 free_case_sink (vfm_sink);
441 dict_clear_vectors (default_dict);
442 permanent_dict = NULL;
443 return proc_cancel_all_transformations ();
446 /* Returns a pointer to the lagged case from N_BEFORE cases before the
447 current one, or NULL if there haven't been that many cases yet. */
449 lagged_case (int n_before)
451 assert (n_before >= 1 );
452 assert (n_before <= n_lag);
454 if (n_before <= lag_count)
456 int index = lag_head - n_before;
459 return &lag_queue[index];
465 /* Procedure that separates the data into SPLIT FILE groups. */
467 /* Represents auxiliary data for handling SPLIT FILE. */
468 struct split_aux_data
470 size_t case_count; /* Number of cases so far. */
471 struct ccase prev_case; /* Data in previous case. */
473 /* Callback functions. */
474 void (*begin_func) (const struct ccase *, void *);
475 bool (*proc_func) (const struct ccase *, void *);
476 void (*end_func) (void *);
480 static int equal_splits (const struct ccase *, const struct ccase *);
481 static bool split_procedure_case_func (const struct ccase *c, void *);
482 static bool split_procedure_end_func (void *);
484 /* Like procedure(), but it automatically breaks the case stream
485 into SPLIT FILE break groups. Before each group of cases with
486 identical SPLIT FILE variable values, BEGIN_FUNC is called
487 with the first case in the group.
488 Then PROC_FUNC is called for each case in the group (including
490 END_FUNC is called when the group is finished. FUNC_AUX is
491 passed to each of the functions as auxiliary data.
493 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
494 and END_FUNC will be called at all.
496 If SPLIT FILE is not in effect, then there is one break group
497 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
500 Returns true if successful, false if an I/O error occurred. */
502 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
503 bool (*proc_func) (const struct ccase *, void *aux),
504 void (*end_func) (void *aux),
507 struct split_aux_data split_aux;
510 split_aux.case_count = 0;
511 case_nullify (&split_aux.prev_case);
512 split_aux.begin_func = begin_func;
513 split_aux.proc_func = proc_func;
514 split_aux.end_func = end_func;
515 split_aux.func_aux = func_aux;
517 ok = internal_procedure (split_procedure_case_func,
518 split_procedure_end_func, &split_aux);
520 case_destroy (&split_aux.prev_case);
525 /* Case callback used by procedure_with_splits(). */
527 split_procedure_case_func (const struct ccase *c, void *split_aux_)
529 struct split_aux_data *split_aux = split_aux_;
531 /* Start a new series if needed. */
532 if (split_aux->case_count == 0
533 || !equal_splits (c, &split_aux->prev_case))
535 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
536 split_aux->end_func (split_aux->func_aux);
538 case_destroy (&split_aux->prev_case);
539 case_clone (&split_aux->prev_case, c);
541 if (split_aux->begin_func != NULL)
542 split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
545 split_aux->case_count++;
546 return (split_aux->proc_func == NULL
547 || split_aux->proc_func (c, split_aux->func_aux));
550 /* End-of-file callback used by procedure_with_splits(). */
552 split_procedure_end_func (void *split_aux_)
554 struct split_aux_data *split_aux = split_aux_;
556 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
557 split_aux->end_func (split_aux->func_aux);
561 /* Compares the SPLIT FILE variables in cases A and B and returns
562 nonzero only if they differ. */
564 equal_splits (const struct ccase *a, const struct ccase *b)
566 return case_compare (a, b,
567 dict_get_split_vars (default_dict),
568 dict_get_split_cnt (default_dict)) == 0;
571 /* Multipass procedure that separates the data into SPLIT FILE
574 /* Represents auxiliary data for handling SPLIT FILE in a
575 multipass procedure. */
576 struct multipass_split_aux_data
578 struct ccase prev_case; /* Data in previous case. */
579 struct casefile *casefile; /* Accumulates data for a split. */
581 /* Function to call with the accumulated data. */
582 bool (*split_func) (const struct ccase *first, const struct casefile *,
584 void *func_aux; /* Auxiliary data. */
587 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
588 static bool multipass_split_end_func (void *aux_);
589 static bool multipass_split_output (struct multipass_split_aux_data *);
591 /* Returns true if successful, false if an I/O error occurred. */
593 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
594 const struct casefile *,
598 struct multipass_split_aux_data aux;
601 case_nullify (&aux.prev_case);
603 aux.split_func = split_func;
604 aux.func_aux = func_aux;
606 ok = internal_procedure (multipass_split_case_func,
607 multipass_split_end_func, &aux);
608 case_destroy (&aux.prev_case);
613 /* Case callback used by multipass_procedure_with_splits(). */
615 multipass_split_case_func (const struct ccase *c, void *aux_)
617 struct multipass_split_aux_data *aux = aux_;
620 /* Start a new series if needed. */
621 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
623 /* Record split values. */
624 case_destroy (&aux->prev_case);
625 case_clone (&aux->prev_case, c);
627 /* Pass any cases to split_func. */
628 if (aux->casefile != NULL)
629 ok = multipass_split_output (aux);
631 /* Start a new casefile. */
632 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
635 return casefile_append (aux->casefile, c) && ok;
638 /* End-of-file callback used by multipass_procedure_with_splits(). */
640 multipass_split_end_func (void *aux_)
642 struct multipass_split_aux_data *aux = aux_;
643 return (aux->casefile == NULL || multipass_split_output (aux));
647 multipass_split_output (struct multipass_split_aux_data *aux)
651 assert (aux->casefile != NULL);
652 ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
653 casefile_destroy (aux->casefile);
654 aux->casefile = NULL;
659 /* Discards all the current state in preparation for a data-input
660 command like DATA LIST or GET. */
662 discard_variables (void)
664 dict_clear (default_dict);
665 fh_set_default_handle (NULL);
669 free_case_source (vfm_source);
672 proc_cancel_all_transformations ();
675 /* Returns the current set of permanent transformations,
676 and clears the permanent transformations.
677 For use by INPUT PROGRAM. */
679 proc_capture_transformations (void)
681 struct trns_chain *chain;
683 assert (temporary_trns_chain == NULL);
684 chain = permanent_trns_chain;
685 cur_trns_chain = permanent_trns_chain = trns_chain_create ();
689 /* Adds a transformation that processes a case with PROC and
690 frees itself with FREE to the current set of transformations.
691 The functions are passed AUX as auxiliary data. */
693 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
695 trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
698 /* Adds a transformation that processes a case with PROC and
699 frees itself with FREE to the current set of transformations.
700 When parsing of the block of transformations is complete,
701 FINALIZE will be called.
702 The functions are passed AUX as auxiliary data. */
704 add_transformation_with_finalizer (trns_finalize_func *finalize,
705 trns_proc_func *proc,
706 trns_free_func *free, void *aux)
708 trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
711 /* Returns the index of the next transformation.
712 This value can be returned by a transformation procedure
713 function to indicate a "jump" to that transformation. */
715 next_transformation (void)
717 return trns_chain_next (cur_trns_chain);
720 /* Returns true if the next call to add_transformation() will add
721 a temporary transformation, false if it will add a permanent
724 proc_in_temporary_transformations (void)
726 return temporary_trns_chain != NULL;
729 /* Marks the start of temporary transformations.
730 Further calls to add_transformation() will add temporary
733 proc_start_temporary_transformations (void)
735 if (!proc_in_temporary_transformations ())
737 add_case_limit_trns ();
739 permanent_dict = dict_clone (default_dict);
740 trns_chain_finalize (permanent_trns_chain);
741 temporary_trns_chain = cur_trns_chain = trns_chain_create ();
745 /* Converts all the temporary transformations, if any, to
746 permanent transformations. Further transformations will be
748 Returns true if anything changed, false otherwise. */
750 proc_make_temporary_transformations_permanent (void)
752 if (proc_in_temporary_transformations ())
754 trns_chain_finalize (temporary_trns_chain);
755 trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
756 temporary_trns_chain = NULL;
758 dict_destroy (permanent_dict);
759 permanent_dict = NULL;
767 /* Cancels all temporary transformations, if any. Further
768 transformations will be permanent.
769 Returns true if anything changed, false otherwise. */
771 proc_cancel_temporary_transformations (void)
773 if (proc_in_temporary_transformations ())
775 dict_destroy (default_dict);
776 default_dict = permanent_dict;
777 permanent_dict = NULL;
779 trns_chain_destroy (temporary_trns_chain);
780 temporary_trns_chain = NULL;
788 /* Cancels all transformations, if any.
789 Returns true if successful, false on I/O error. */
791 proc_cancel_all_transformations (void)
794 ok = trns_chain_destroy (permanent_trns_chain);
795 ok = trns_chain_destroy (temporary_trns_chain) && ok;
796 permanent_trns_chain = cur_trns_chain = trns_chain_create ();
797 temporary_trns_chain = NULL;
801 /* Initializes procedure handling. */
805 default_dict = dict_create ();
806 proc_cancel_all_transformations ();
809 /* Finishes up procedure handling. */
813 discard_variables ();
816 /* Sets SINK as the destination for procedure output from the
819 proc_set_sink (struct case_sink *sink)
821 assert (vfm_sink == NULL);
825 /* Sets SOURCE as the source for procedure input for the next
828 proc_set_source (struct case_source *source)
830 assert (vfm_source == NULL);
834 /* Returns true if a source for the next procedure has been
835 configured, false otherwise. */
837 proc_has_source (void)
839 return vfm_source != NULL;
842 /* Returns the output from the previous procedure.
843 For use only immediately after executing a procedure.
844 The returned casefile is owned by the caller; it will not be
845 automatically used for the next procedure's input. */
847 proc_capture_output (void)
849 struct casefile *casefile;
851 /* Try to make sure that this function is called immediately
852 after procedure() or a similar function. */
853 assert (vfm_source != NULL);
854 assert (case_source_is_class (vfm_source, &storage_source_class));
855 assert (trns_chain_is_empty (permanent_trns_chain));
856 assert (!proc_in_temporary_transformations ());
858 casefile = storage_source_decapsulate (vfm_source);
864 static trns_proc_func case_limit_trns_proc;
865 static trns_free_func case_limit_trns_free;
867 /* Adds a transformation that limits the number of cases that may
868 pass through, if default_dict has a case limit. */
870 add_case_limit_trns (void)
872 size_t case_limit = dict_get_case_limit (default_dict);
875 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
876 *cases_remaining = case_limit;
877 add_transformation (case_limit_trns_proc, case_limit_trns_free,
879 dict_set_case_limit (default_dict, 0);
883 /* Limits the maximum number of cases processed to
886 case_limit_trns_proc (void *cases_remaining_,
887 struct ccase *c UNUSED, int case_nr UNUSED)
889 size_t *cases_remaining = cases_remaining_;
890 if (*cases_remaining > 0)
893 return TRNS_CONTINUE;
896 return TRNS_DROP_CASE;
899 /* Frees the data associated with a case limit transformation. */
901 case_limit_trns_free (void *cases_remaining_)
903 size_t *cases_remaining = cases_remaining_;
904 free (cases_remaining);
908 static trns_proc_func filter_trns_proc;
910 /* Adds a temporary transformation to filter data according to
911 the variable specified on FILTER, if any. */
913 add_filter_trns (void)
915 struct variable *filter_var = dict_get_filter (default_dict);
916 if (filter_var != NULL)
918 proc_start_temporary_transformations ();
919 add_transformation (filter_trns_proc, NULL, filter_var);
923 /* FILTER transformation. */
925 filter_trns_proc (void *filter_var_,
926 struct ccase *c UNUSED, int case_nr UNUSED)
929 struct variable *filter_var = filter_var_;
930 double f = case_num (c, filter_var->fv);
931 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
932 ? TRNS_CONTINUE : TRNS_DROP_CASE);