1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/value-labels.h>
38 #include <data/variable.h>
39 #include <language/expressions/public.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44 #include <output/manager.h>
45 #include <output/table.h>
48 #define _(msgid) gettext (msgid)
51 Virtual File Manager (vfm):
53 vfm is used to process data files. It uses the model that
54 data is read from one stream (the data source), processed,
55 then written to another (the data sink). The data source is
56 then deleted and the data sink becomes the data source for the
59 /* Procedure execution data. */
60 struct write_case_data
62 /* Function to call for each case. */
63 bool (*case_func) (struct ccase *, void *); /* Function. */
64 void *aux; /* Auxiliary data. */
66 struct ccase trns_case; /* Case used for transformations. */
67 struct ccase sink_case; /* Case written to sink, if
68 compaction is necessary. */
69 size_t cases_written; /* Cases output so far. */
72 /* Cases are read from vfm_source,
73 pass through permanent_trns_chain (which transforms them into
74 the format described by permanent_dict),
75 are written to vfm_sink,
76 pass through temporary_trns_chain (which transforms them into
77 the format described by default_dict),
78 and are finally passed to the procedure. */
79 static struct case_source *vfm_source;
80 static struct trns_chain *permanent_trns_chain;
81 static struct dictionary *permanent_dict;
82 static struct case_sink *vfm_sink;
83 static struct trns_chain *temporary_trns_chain;
84 struct dictionary *default_dict;
86 /* The transformation chain that the next transformation will be
88 static struct trns_chain *cur_trns_chain;
90 /* The compactor used to compact a case, if necessary;
91 otherwise a null pointer. */
92 static struct dict_compactor *compactor;
94 /* Time at which vfm was last invoked. */
95 static time_t last_vfm_invocation;
98 int n_lag; /* Number of cases to lag. */
99 static int lag_count; /* Number of cases in lag_queue so far. */
100 static int lag_head; /* Index where next case will be added. */
101 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
103 static void add_case_limit_trns (void);
104 static void add_filter_trns (void);
105 static void add_process_if_trns (void);
107 static bool internal_procedure (bool (*case_func) (struct ccase *, void *),
108 bool (*end_func) (void *),
110 static void update_last_vfm_invocation (void);
111 static void create_trns_case (struct ccase *, struct dictionary *);
112 static void open_active_file (void);
113 static bool write_case (struct write_case_data *wc_data);
114 static void lag_case (const struct ccase *c);
115 static void clear_case (struct ccase *c);
116 static bool close_active_file (void);
118 /* Public functions. */
120 /* Returns the last time the data was read. */
122 time_of_last_procedure (void)
124 if (last_vfm_invocation == 0)
125 update_last_vfm_invocation ();
126 return last_vfm_invocation;
129 /* Regular procedure. */
131 /* Reads the data from the input program and writes it to a new
132 active file. For each case we read from the input program, we
135 1. Execute permanent transformations. If these drop the case,
136 start the next case from step 1.
138 2. Write case to replacement active file.
140 3. Execute temporary transformations. If these drop the case,
141 start the next case from step 1.
143 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
145 Returns true if successful, false if an I/O error occurred. */
147 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
149 return internal_procedure (proc_func, NULL, aux);
152 /* Multipass procedure. */
154 struct multipass_aux_data
156 struct casefile *casefile;
158 bool (*proc_func) (const struct casefile *, void *aux);
162 /* Case processing function for multipass_procedure(). */
164 multipass_case_func (struct ccase *c, void *aux_data_)
166 struct multipass_aux_data *aux_data = aux_data_;
167 return casefile_append (aux_data->casefile, c);
170 /* End-of-file function for multipass_procedure(). */
172 multipass_end_func (void *aux_data_)
174 struct multipass_aux_data *aux_data = aux_data_;
175 return (aux_data->proc_func == NULL
176 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
179 /* Procedure that allows multiple passes over the input data.
180 The entire active file is passed to PROC_FUNC, with the given
181 AUX as auxiliary data, as a unit. */
183 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
186 struct multipass_aux_data aux_data;
189 aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
190 aux_data.proc_func = proc_func;
193 ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
194 ok = !casefile_error (aux_data.casefile) && ok;
196 casefile_destroy (aux_data.casefile);
201 /* Procedure implementation. */
203 /* Executes a procedure.
204 Passes each case to CASE_FUNC.
205 Calls END_FUNC after the last case.
206 Returns true if successful, false if an I/O error occurred (or
207 if CASE_FUNC or END_FUNC ever returned false). */
209 internal_procedure (bool (*case_func) (struct ccase *, void *),
210 bool (*end_func) (void *),
213 struct write_case_data wc_data;
216 assert (vfm_source != NULL);
218 update_last_vfm_invocation ();
220 /* Optimize the trivial case where we're not going to do
221 anything with the data, by not reading the data at all. */
222 if (case_func == NULL && end_func == NULL
223 && case_source_is_class (vfm_source, &storage_source_class)
225 && (temporary_trns_chain == NULL
226 || trns_chain_is_empty (temporary_trns_chain))
227 && trns_chain_is_empty (permanent_trns_chain))
230 expr_free (process_if_expr);
231 process_if_expr = NULL;
232 dict_set_case_limit (default_dict, 0);
233 dict_clear_vectors (default_dict);
239 wc_data.case_func = case_func;
241 create_trns_case (&wc_data.trns_case, default_dict);
242 case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
243 wc_data.cases_written = 0;
245 ok = vfm_source->class->read (vfm_source,
247 write_case, &wc_data) && ok;
248 if (end_func != NULL)
249 ok = end_func (aux) && ok;
251 case_destroy (&wc_data.sink_case);
252 case_destroy (&wc_data.trns_case);
254 ok = close_active_file () && ok;
259 /* Updates last_vfm_invocation. */
261 update_last_vfm_invocation (void)
263 last_vfm_invocation = time (NULL);
266 /* Creates and returns a case, initializing it from the vectors
267 that say which `value's need to be initialized just once, and
268 which ones need to be re-initialized before every case. */
270 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
272 size_t var_cnt = dict_get_var_cnt (dict);
275 case_create (trns_case, dict_get_next_value_idx (dict));
276 for (i = 0; i < var_cnt; i++)
278 struct variable *v = dict_get_var (dict, i);
279 union value *value = case_data_rw (trns_case, v->fv);
281 if (v->type == NUMERIC)
282 value->f = v->leave ? 0.0 : SYSMIS;
284 memset (value->s, ' ', v->width);
288 /* Makes all preparations for reading from the data source and writing
291 open_active_file (void)
293 add_case_limit_trns ();
295 add_process_if_trns ();
297 /* Finalize transformations. */
298 trns_chain_finalize (cur_trns_chain);
300 /* Make permanent_dict refer to the dictionary right before
301 data reaches the sink. */
302 if (permanent_dict == NULL)
303 permanent_dict = default_dict;
305 /* Figure out compaction. */
306 compactor = (dict_needs_compaction (permanent_dict)
307 ? dict_make_compactor (permanent_dict)
311 if (vfm_sink == NULL)
312 vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
313 if (vfm_sink->class->open != NULL)
314 vfm_sink->class->open (vfm_sink);
316 /* Allocate memory for lag queue. */
323 lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
324 for (i = 0; i < n_lag; i++)
325 case_nullify (&lag_queue[i]);
329 /* Transforms trns_case and writes it to the replacement active
330 file if advisable. Returns true if more cases can be
331 accepted, false otherwise. Do not call this function again
332 after it has returned false once. */
334 write_case (struct write_case_data *wc_data)
336 enum trns_result retval;
339 /* Execute permanent transformations. */
340 case_nr = wc_data->cases_written + 1;
341 retval = trns_chain_execute (permanent_trns_chain,
342 &wc_data->trns_case, &case_nr);
343 if (retval != TRNS_CONTINUE)
346 /* Write case to LAG queue. */
348 lag_case (&wc_data->trns_case);
350 /* Write case to replacement active file. */
351 wc_data->cases_written++;
352 if (vfm_sink->class->write != NULL)
354 if (compactor != NULL)
356 dict_compactor_compact (compactor, &wc_data->sink_case,
357 &wc_data->trns_case);
358 vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
361 vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
364 /* Execute temporary transformations. */
365 if (temporary_trns_chain != NULL)
367 retval = trns_chain_execute (temporary_trns_chain,
369 &wc_data->cases_written);
370 if (retval != TRNS_CONTINUE)
374 /* Pass case to procedure. */
375 if (wc_data->case_func != NULL)
376 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
380 clear_case (&wc_data->trns_case);
381 return retval != TRNS_ERROR;
384 /* Add C to the lag queue. */
386 lag_case (const struct ccase *c)
388 if (lag_count < n_lag)
390 case_destroy (&lag_queue[lag_head]);
391 case_clone (&lag_queue[lag_head], c);
392 if (++lag_head >= n_lag)
396 /* Clears the variables in C that need to be cleared between
399 clear_case (struct ccase *c)
401 size_t var_cnt = dict_get_var_cnt (default_dict);
404 for (i = 0; i < var_cnt; i++)
406 struct variable *v = dict_get_var (default_dict, i);
409 if (v->type == NUMERIC)
410 case_data_rw (c, v->fv)->f = SYSMIS;
412 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
417 /* Closes the active file. */
419 close_active_file (void)
421 /* Free memory for lag queue, and turn off lagging. */
426 for (i = 0; i < n_lag; i++)
427 case_destroy (&lag_queue[i]);
432 /* Dictionary from before TEMPORARY becomes permanent. */
433 proc_cancel_temporary_transformations ();
435 /* Finish compaction. */
436 if (compactor != NULL)
438 dict_compactor_destroy (compactor);
439 dict_compact_values (default_dict);
443 /* Free data source. */
444 free_case_source (vfm_source);
447 /* Old data sink becomes new data source. */
448 if (vfm_sink->class->make_source != NULL)
449 vfm_source = vfm_sink->class->make_source (vfm_sink);
450 free_case_sink (vfm_sink);
453 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
454 and get rid of all the transformations. */
455 dict_clear_vectors (default_dict);
456 permanent_dict = NULL;
457 return proc_cancel_all_transformations ();
460 /* Returns a pointer to the lagged case from N_BEFORE cases before the
461 current one, or NULL if there haven't been that many cases yet. */
463 lagged_case (int n_before)
465 assert (n_before >= 1 );
466 assert (n_before <= n_lag);
468 if (n_before <= lag_count)
470 int index = lag_head - n_before;
473 return &lag_queue[index];
479 /* Procedure that separates the data into SPLIT FILE groups. */
481 /* Represents auxiliary data for handling SPLIT FILE. */
482 struct split_aux_data
484 size_t case_count; /* Number of cases so far. */
485 struct ccase prev_case; /* Data in previous case. */
487 /* Functions to call... */
488 void (*begin_func) (void *); /* ...before data. */
489 bool (*proc_func) (struct ccase *, void *); /* ...with data. */
490 void (*end_func) (void *); /* ...after data. */
491 void *func_aux; /* Auxiliary data. */
494 static int equal_splits (const struct ccase *, const struct ccase *);
495 static bool split_procedure_case_func (struct ccase *c, void *split_aux_);
496 static bool split_procedure_end_func (void *split_aux_);
497 static void dump_splits (struct ccase *);
499 /* Like procedure(), but it automatically breaks the case stream
500 into SPLIT FILE break groups. Before each group of cases with
501 identical SPLIT FILE variable values, BEGIN_FUNC is called.
502 Then PROC_FUNC is called with each case in the group.
503 END_FUNC is called when the group is finished. FUNC_AUX is
504 passed to each of the functions as auxiliary data.
506 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
507 and END_FUNC will be called at all.
509 If SPLIT FILE is not in effect, then there is one break group
510 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
513 Returns true if successful, false if an I/O error occurred. */
515 procedure_with_splits (void (*begin_func) (void *aux),
516 bool (*proc_func) (struct ccase *, void *aux),
517 void (*end_func) (void *aux),
520 struct split_aux_data split_aux;
523 split_aux.case_count = 0;
524 case_nullify (&split_aux.prev_case);
525 split_aux.begin_func = begin_func;
526 split_aux.proc_func = proc_func;
527 split_aux.end_func = end_func;
528 split_aux.func_aux = func_aux;
530 ok = internal_procedure (split_procedure_case_func,
531 split_procedure_end_func, &split_aux);
533 case_destroy (&split_aux.prev_case);
538 /* Case callback used by procedure_with_splits(). */
540 split_procedure_case_func (struct ccase *c, void *split_aux_)
542 struct split_aux_data *split_aux = split_aux_;
544 /* Start a new series if needed. */
545 if (split_aux->case_count == 0
546 || !equal_splits (c, &split_aux->prev_case))
548 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
549 split_aux->end_func (split_aux->func_aux);
552 case_destroy (&split_aux->prev_case);
553 case_clone (&split_aux->prev_case, c);
555 if (split_aux->begin_func != NULL)
556 split_aux->begin_func (split_aux->func_aux);
559 split_aux->case_count++;
560 return (split_aux->proc_func == NULL
561 || split_aux->proc_func (c, split_aux->func_aux));
564 /* End-of-file callback used by procedure_with_splits(). */
566 split_procedure_end_func (void *split_aux_)
568 struct split_aux_data *split_aux = split_aux_;
570 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
571 split_aux->end_func (split_aux->func_aux);
575 /* Compares the SPLIT FILE variables in cases A and B and returns
576 nonzero only if they differ. */
578 equal_splits (const struct ccase *a, const struct ccase *b)
580 return case_compare (a, b,
581 dict_get_split_vars (default_dict),
582 dict_get_split_cnt (default_dict)) == 0;
585 /* Dumps out the values of all the split variables for the case C. */
587 dump_splits (struct ccase *c)
589 struct variable *const *split;
594 split_cnt = dict_get_split_cnt (default_dict);
598 t = tab_create (3, split_cnt + 1, 0);
599 tab_dim (t, tab_natural_dimensions);
600 tab_vline (t, TAL_GAP, 1, 0, split_cnt);
601 tab_vline (t, TAL_GAP, 2, 0, split_cnt);
602 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
603 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
604 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
605 split = dict_get_split_vars (default_dict);
606 for (i = 0; i < split_cnt; i++)
608 struct variable *v = split[i];
612 assert (v->type == NUMERIC || v->type == ALPHA);
613 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
615 data_out (temp_buf, &v->print, case_data (c, v->fv));
617 temp_buf[v->print.w] = 0;
618 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
620 val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
622 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
624 tab_flags (t, SOMF_NO_TITLE);
628 /* Multipass procedure that separates the data into SPLIT FILE
631 /* Represents auxiliary data for handling SPLIT FILE in a
632 multipass procedure. */
633 struct multipass_split_aux_data
635 struct ccase prev_case; /* Data in previous case. */
636 struct casefile *casefile; /* Accumulates data for a split. */
638 /* Function to call with the accumulated data. */
639 bool (*split_func) (const struct casefile *, void *);
640 void *func_aux; /* Auxiliary data. */
643 static bool multipass_split_case_func (struct ccase *c, void *aux_);
644 static bool multipass_split_end_func (void *aux_);
645 static bool multipass_split_output (struct multipass_split_aux_data *);
647 /* Returns true if successful, false if an I/O error occurred. */
649 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
653 struct multipass_split_aux_data aux;
656 case_nullify (&aux.prev_case);
658 aux.split_func = split_func;
659 aux.func_aux = func_aux;
661 ok = internal_procedure (multipass_split_case_func,
662 multipass_split_end_func, &aux);
663 case_destroy (&aux.prev_case);
668 /* Case callback used by multipass_procedure_with_splits(). */
670 multipass_split_case_func (struct ccase *c, void *aux_)
672 struct multipass_split_aux_data *aux = aux_;
675 /* Start a new series if needed. */
676 if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
678 /* Pass any cases to split_func. */
679 if (aux->casefile != NULL)
680 ok = multipass_split_output (aux);
682 /* Start a new casefile. */
683 aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
685 /* Record split values. */
687 case_destroy (&aux->prev_case);
688 case_clone (&aux->prev_case, c);
691 return casefile_append (aux->casefile, c) && ok;
694 /* End-of-file callback used by multipass_procedure_with_splits(). */
696 multipass_split_end_func (void *aux_)
698 struct multipass_split_aux_data *aux = aux_;
699 return (aux->casefile == NULL || multipass_split_output (aux));
703 multipass_split_output (struct multipass_split_aux_data *aux)
707 assert (aux->casefile != NULL);
708 ok = aux->split_func (aux->casefile, aux->func_aux);
709 casefile_destroy (aux->casefile);
710 aux->casefile = NULL;
715 /* Discards all the current state in preparation for a data-input
716 command like DATA LIST or GET. */
718 discard_variables (void)
720 dict_clear (default_dict);
721 fh_set_default_handle (NULL);
725 free_case_source (vfm_source);
728 proc_cancel_all_transformations ();
730 expr_free (process_if_expr);
731 process_if_expr = NULL;
733 proc_cancel_temporary_transformations ();
736 /* Returns the current set of permanent transformations,
737 and clears the permanent transformations.
738 For use by INPUT PROGRAM. */
740 proc_capture_transformations (void)
742 struct trns_chain *chain;
744 assert (temporary_trns_chain == NULL);
745 chain = permanent_trns_chain;
746 cur_trns_chain = permanent_trns_chain = trns_chain_create ();
750 /* Adds a transformation that processes a case with PROC and
751 frees itself with FREE to the current set of transformations.
752 The functions are passed AUX as auxiliary data. */
754 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
756 trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
759 /* Adds a transformation that processes a case with PROC and
760 frees itself with FREE to the current set of transformations.
761 When parsing of the block of transformations is complete,
762 FINALIZE will be called.
763 The functions are passed AUX as auxiliary data. */
765 add_transformation_with_finalizer (trns_finalize_func *finalize,
766 trns_proc_func *proc,
767 trns_free_func *free, void *aux)
769 trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
772 /* Returns the index of the next transformation.
773 This value can be returned by a transformation procedure
774 function to indicate a "jump" to that transformation. */
776 next_transformation (void)
778 return trns_chain_next (cur_trns_chain);
781 /* Returns true if the next call to add_transformation() will add
782 a temporary transformation, false if it will add a permanent
785 proc_in_temporary_transformations (void)
787 return temporary_trns_chain != NULL;
790 /* Marks the start of temporary transformations.
791 Further calls to add_transformation() will add temporary
794 proc_start_temporary_transformations (void)
796 if (!proc_in_temporary_transformations ())
798 add_case_limit_trns ();
800 permanent_dict = dict_clone (default_dict);
801 trns_chain_finalize (permanent_trns_chain);
802 temporary_trns_chain = cur_trns_chain = trns_chain_create ();
806 /* Converts all the temporary transformations, if any, to
807 permanent transformations. Further transformations will be
809 Returns true if anything changed, false otherwise. */
811 proc_make_temporary_transformations_permanent (void)
813 if (proc_in_temporary_transformations ())
815 trns_chain_finalize (temporary_trns_chain);
816 trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
817 temporary_trns_chain = NULL;
819 dict_destroy (permanent_dict);
820 permanent_dict = NULL;
828 /* Cancels all temporary transformations, if any. Further
829 transformations will be permanent.
830 Returns true if anything changed, false otherwise. */
832 proc_cancel_temporary_transformations (void)
834 if (proc_in_temporary_transformations ())
836 dict_destroy (default_dict);
837 default_dict = permanent_dict;
838 permanent_dict = NULL;
840 trns_chain_destroy (temporary_trns_chain);
841 temporary_trns_chain = NULL;
849 /* Cancels all transformations, if any.
850 Returns true if successful, false on I/O error. */
852 proc_cancel_all_transformations (void)
855 ok = trns_chain_destroy (permanent_trns_chain);
856 ok = trns_chain_destroy (temporary_trns_chain) && ok;
857 permanent_trns_chain = cur_trns_chain = trns_chain_create ();
858 temporary_trns_chain = NULL;
862 /* Initializes procedure handling. */
866 default_dict = dict_create ();
867 proc_cancel_all_transformations ();
870 /* Finishes up procedure handling. */
874 discard_variables ();
877 /* Sets SINK as the destination for procedure output from the
880 proc_set_sink (struct case_sink *sink)
882 assert (vfm_sink == NULL);
886 /* Sets SOURCE as the source for procedure input for the next
889 proc_set_source (struct case_source *source)
891 assert (vfm_source == NULL);
895 /* Returns true if a source for the next procedure has been
896 configured, false otherwise. */
898 proc_has_source (void)
900 return vfm_source != NULL;
903 /* Returns the output from the previous procedure.
904 For use only immediately after executing a procedure.
905 The returned casefile is owned by the caller; it will not be
906 automatically used for the next procedure's input. */
908 proc_capture_output (void)
910 struct casefile *casefile;
912 /* Try to make sure that this function is called immediately
913 after procedure() or a similar function. */
914 assert (vfm_source != NULL);
915 assert (case_source_is_class (vfm_source, &storage_source_class));
916 assert (trns_chain_is_empty (permanent_trns_chain));
917 assert (!proc_in_temporary_transformations ());
919 casefile = storage_source_decapsulate (vfm_source);
925 static trns_proc_func case_limit_trns_proc;
926 static trns_free_func case_limit_trns_free;
928 /* Adds a transformation that limits the number of cases that may
929 pass through, if default_dict has a case limit. */
931 add_case_limit_trns (void)
933 size_t case_limit = dict_get_case_limit (default_dict);
936 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
937 *cases_remaining = case_limit;
938 add_transformation (case_limit_trns_proc, case_limit_trns_free,
940 dict_set_case_limit (default_dict, 0);
944 /* Limits the maximum number of cases processed to
947 case_limit_trns_proc (void *cases_remaining_,
948 struct ccase *c UNUSED, int case_nr UNUSED)
950 size_t *cases_remaining = cases_remaining_;
951 if (*cases_remaining > 0)
954 return TRNS_CONTINUE;
957 return TRNS_DROP_CASE;
960 /* Frees the data associated with a case limit transformation. */
962 case_limit_trns_free (void *cases_remaining_)
964 size_t *cases_remaining = cases_remaining_;
965 free (cases_remaining);
969 static trns_proc_func filter_trns_proc;
971 /* Adds a temporary transformation to filter data according to
972 the variable specified on FILTER, if any. */
974 add_filter_trns (void)
976 struct variable *filter_var = dict_get_filter (default_dict);
977 if (filter_var != NULL)
979 proc_start_temporary_transformations ();
980 add_transformation (filter_trns_proc, NULL, filter_var);
984 /* FILTER transformation. */
986 filter_trns_proc (void *filter_var_,
987 struct ccase *c UNUSED, int case_nr UNUSED)
990 struct variable *filter_var = filter_var_;
991 double f = case_num (c, filter_var->fv);
992 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
993 ? TRNS_CONTINUE : TRNS_DROP_CASE);
996 static trns_proc_func process_if_trns_proc;
997 static trns_free_func process_if_trns_free;
999 /* Adds a temporary transformation to filter data according to
1000 the expression specified on PROCESS IF, if any. */
1002 add_process_if_trns (void)
1004 if (process_if_expr != NULL)
1006 proc_start_temporary_transformations ();
1007 add_transformation (process_if_trns_proc, process_if_trns_free,
1009 process_if_expr = NULL;
1013 /* PROCESS IF transformation. */
1015 process_if_trns_proc (void *expression_,
1016 struct ccase *c UNUSED, int case_nr UNUSED)
1019 struct expression *expression = expression_;
1020 return (expr_evaluate_num (expression, c, case_nr) == 1.0
1021 ? TRNS_CONTINUE : TRNS_DROP_CASE);
1024 /* Frees a PROCESS IF transformation. */
1026 process_if_trns_free (void *expression_)
1028 struct expression *expression = expression_;
1029 expr_free (expression);