1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/deque.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
44 /* An abstract factory which creates casefiles */
45 struct casefile_factory *cf_factory;
47 /* Callback which occurs when a procedure provides a new source for
49 replace_source_callback *replace_source ;
51 /* Callback which occurs whenever the DICT is replaced by a new one */
52 replace_dictionary_callback *replace_dict;
54 /* Cases are read from proc_source,
55 pass through permanent_trns_chain (which transforms them into
56 the format described by permanent_dict),
57 are written to proc_sink,
58 pass through temporary_trns_chain (which transforms them into
59 the format described by dict),
60 and are finally passed to the procedure. */
61 struct case_source *proc_source;
62 struct trns_chain *permanent_trns_chain;
63 struct dictionary *permanent_dict;
64 struct case_sink *proc_sink;
65 struct trns_chain *temporary_trns_chain;
66 struct dictionary *dict;
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The compactor used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct dict_compactor *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct deque lag; /* Deque of lagged cases. */
82 struct ccase *lag_cases; /* Lagged cases managed by deque. */
85 bool is_open; /* Procedure open? */
86 struct ccase trns_case; /* Case used for transformations. */
87 struct ccase sink_case; /* Case written to sink, if
88 compacting is necessary. */
89 size_t cases_written; /* Cases output so far. */
91 }; /* struct dataset */
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
97 static bool internal_procedure (struct dataset *ds, case_func *,
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void clear_case (const struct dataset *ds, struct ccase *c);
104 static bool close_active_file (struct dataset *ds);
106 /* Public functions. */
108 /* Returns the last time the data was read. */
110 time_of_last_procedure (struct dataset *ds)
112 if (ds->last_proc_invocation == 0)
113 update_last_proc_invocation (ds);
114 return ds->last_proc_invocation;
117 /* Regular procedure. */
121 /* Reads the data from the input program and writes it to a new
122 active file. For each case we read from the input program, we
125 1. Execute permanent transformations. If these drop the case,
126 start the next case from step 1.
128 2. Write case to replacement active file.
130 3. Execute temporary transformations. If these drop the case,
131 start the next case from step 1.
133 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
135 Returns true if successful, false if an I/O error occurred. */
137 procedure (struct dataset *ds, case_func *cf, void *aux)
139 update_last_proc_invocation (ds);
141 /* Optimize the trivial case where we're not going to do
142 anything with the data, by not reading the data at all. */
144 && case_source_is_class (ds->proc_source, &storage_source_class)
145 && ds->proc_sink == NULL
146 && (ds->temporary_trns_chain == NULL
147 || trns_chain_is_empty (ds->temporary_trns_chain))
148 && trns_chain_is_empty (ds->permanent_trns_chain))
151 dict_set_case_limit (ds->dict, 0);
152 dict_clear_vectors (ds->dict);
156 return internal_procedure (ds, cf, NULL, aux);
159 /* Multipass procedure. */
161 struct multipass_aux_data
163 struct casefile *casefile;
165 bool (*proc_func) (const struct casefile *, void *aux);
169 /* Case processing function for multipass_procedure(). */
171 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
173 struct multipass_aux_data *aux_data = aux_data_;
174 return casefile_append (aux_data->casefile, c);
177 /* End-of-file function for multipass_procedure(). */
179 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
181 struct multipass_aux_data *aux_data = aux_data_;
182 return (aux_data->proc_func == NULL
183 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
186 /* Procedure that allows multiple passes over the input data.
187 The entire active file is passed to PROC_FUNC, with the given
188 AUX as auxiliary data, as a unit. */
190 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
192 struct multipass_aux_data aux_data;
196 ds->cf_factory->create_casefile (ds->cf_factory,
197 dict_get_next_value_idx (ds->dict));
199 aux_data.proc_func = proc_func;
202 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
203 ok = !casefile_error (aux_data.casefile) && ok;
205 casefile_destroy (aux_data.casefile);
211 /* Procedure implementation. */
213 /* Executes a procedure.
214 Passes each case to CASE_FUNC.
215 Calls END_FUNC after the last case.
216 Returns true if successful, false if an I/O error occurred (or
217 if CASE_FUNC or END_FUNC ever returned false). */
219 internal_procedure (struct dataset *ds, case_func *proc,
227 while (ok && proc_read (ds, &c))
229 ok = proc (c, aux, ds) && ok;
231 ok = end (aux, ds) && ok;
233 if ( proc_close (ds) && ok )
242 /* Opens dataset DS for reading cases with proc_read.
243 proc_close must be called when done. */
245 proc_open (struct dataset *ds)
247 assert (ds->proc_source != NULL);
248 assert (!ds->is_open);
250 update_last_proc_invocation (ds);
252 open_active_file (ds);
255 create_trns_case (&ds->trns_case, ds->dict);
256 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
257 ds->cases_written = 0;
261 /* Reads the next case from dataset DS, which must have been
262 opened for reading with proc_open.
263 Returns true if successful, in which case a pointer to the
264 case is stored in *C.
265 Return false at end of file or if a read error occurs. In
266 this case a null pointer is stored in *C. */
268 proc_read (struct dataset *ds, struct ccase **c)
270 enum trns_result retval = TRNS_DROP_CASE;
272 assert (ds->is_open);
278 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
279 if (retval == TRNS_ERROR)
284 /* Read a case from proc_source. */
285 clear_case (ds, &ds->trns_case);
286 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
289 /* Execute permanent transformations. */
290 case_nr = ds->cases_written + 1;
291 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
292 &ds->trns_case, &case_nr);
293 if (retval != TRNS_CONTINUE)
296 /* Write case to collection of lagged cases. */
299 while (deque_count (&ds->lag) >= ds->n_lag)
300 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
301 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)],
305 /* Write case to replacement active file. */
307 if (ds->proc_sink->class->write != NULL)
309 if (ds->compactor != NULL)
311 dict_compactor_compact (ds->compactor, &ds->sink_case,
313 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
316 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
319 /* Execute temporary transformations. */
320 if (ds->temporary_trns_chain != NULL)
322 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
323 &ds->trns_case, &ds->cases_written);
324 if (retval != TRNS_CONTINUE)
333 /* Closes dataset DS for reading.
334 Returns true if successful, false if an I/O error occurred
335 while reading or closing the data set.
336 If DS has not been opened, returns true without doing
339 proc_close (struct dataset *ds)
344 /* Drain any remaining cases. */
348 if (!proc_read (ds, &c))
351 ds->ok = free_case_source (ds->proc_source) && ds->ok;
352 proc_set_source (ds, NULL);
354 case_destroy (&ds->sink_case);
355 case_destroy (&ds->trns_case);
357 ds->ok = close_active_file (ds) && ds->ok;
363 /* Updates last_proc_invocation. */
365 update_last_proc_invocation (struct dataset *ds)
367 ds->last_proc_invocation = time (NULL);
370 /* Creates and returns a case, initializing it from the vectors
371 that say which `value's need to be initialized just once, and
372 which ones need to be re-initialized before every case. */
374 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
376 size_t var_cnt = dict_get_var_cnt (dict);
379 case_create (trns_case, dict_get_next_value_idx (dict));
380 for (i = 0; i < var_cnt; i++)
382 struct variable *v = dict_get_var (dict, i);
383 union value *value = case_data_rw (trns_case, v);
385 if (var_is_numeric (v))
386 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
388 memset (value->s, ' ', var_get_width (v));
392 /* Makes all preparations for reading from the data source and writing
395 open_active_file (struct dataset *ds)
397 add_case_limit_trns (ds);
398 add_filter_trns (ds);
400 /* Finalize transformations. */
401 trns_chain_finalize (ds->cur_trns_chain);
403 /* Make permanent_dict refer to the dictionary right before
404 data reaches the sink. */
405 if (ds->permanent_dict == NULL)
406 ds->permanent_dict = ds->dict;
408 /* Figure out whether to compact. */
410 (dict_compacting_would_shrink (ds->permanent_dict)
411 ? dict_make_compactor (ds->permanent_dict)
415 if (ds->proc_sink == NULL)
416 ds->proc_sink = create_case_sink (&storage_sink_class,
420 if (ds->proc_sink->class->open != NULL)
421 ds->proc_sink->class->open (ds->proc_sink);
423 /* Allocate memory for lagged cases. */
424 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
427 /* Clears the variables in C that need to be cleared between
430 clear_case (const struct dataset *ds, struct ccase *c)
432 size_t var_cnt = dict_get_var_cnt (ds->dict);
435 for (i = 0; i < var_cnt; i++)
437 struct variable *v = dict_get_var (ds->dict, i);
438 if (!var_get_leave (v))
440 if (var_is_numeric (v))
441 case_data_rw (c, v)->f = SYSMIS;
443 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
448 /* Closes the active file. */
450 close_active_file (struct dataset *ds)
452 /* Free memory for lagged cases. */
453 while (!deque_is_empty (&ds->lag))
454 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
455 free (ds->lag_cases);
457 /* Dictionary from before TEMPORARY becomes permanent. */
458 proc_cancel_temporary_transformations (ds);
460 /* Finish compacting. */
461 if (ds->compactor != NULL)
463 dict_compactor_destroy (ds->compactor);
464 dict_compact_values (ds->dict);
465 ds->compactor = NULL;
468 /* Old data sink becomes new data source. */
469 if (ds->proc_sink->class->make_source != NULL)
470 proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
471 free_case_sink (ds->proc_sink);
472 ds->proc_sink = NULL;
474 dict_clear_vectors (ds->dict);
475 ds->permanent_dict = NULL;
476 return proc_cancel_all_transformations (ds);
479 /* Returns a pointer to the lagged case from N_BEFORE cases before the
480 current one, or NULL if there haven't been that many cases yet. */
482 lagged_case (const struct dataset *ds, int n_before)
484 assert (n_before >= 1);
485 assert (n_before <= ds->n_lag);
487 if (n_before <= deque_count (&ds->lag))
488 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
493 /* Procedure that separates the data into SPLIT FILE groups. */
495 /* Represents auxiliary data for handling SPLIT FILE. */
496 struct split_aux_data
498 struct dataset *dataset; /* The dataset */
499 struct ccase prev_case; /* Data in previous case. */
501 /* Callback functions. */
508 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
509 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
510 static bool split_procedure_end_func (void *, const struct dataset *);
512 /* Like procedure(), but it automatically breaks the case stream
513 into SPLIT FILE break groups. Before each group of cases with
514 identical SPLIT FILE variable values, BEGIN_FUNC is called
515 with the first case in the group.
516 Then PROC_FUNC is called for each case in the group (including
518 END_FUNC is called when the group is finished. FUNC_AUX is
519 passed to each of the functions as auxiliary data.
521 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
522 and END_FUNC will be called at all.
524 If SPLIT FILE is not in effect, then there is one break group
525 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
528 Returns true if successful, false if an I/O error occurred. */
530 procedure_with_splits (struct dataset *ds,
536 struct split_aux_data split_aux;
539 case_nullify (&split_aux.prev_case);
540 split_aux.begin = begin;
541 split_aux.proc = proc;
543 split_aux.func_aux = func_aux;
544 split_aux.dataset = ds;
546 ok = internal_procedure (ds, split_procedure_case_func,
547 split_procedure_end_func, &split_aux);
549 case_destroy (&split_aux.prev_case);
554 /* Case callback used by procedure_with_splits(). */
556 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
558 struct split_aux_data *split_aux = split_aux_;
560 /* Start a new series if needed. */
561 if (case_is_null (&split_aux->prev_case)
562 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
564 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
565 split_aux->end (split_aux->func_aux, ds);
567 case_destroy (&split_aux->prev_case);
568 case_clone (&split_aux->prev_case, c);
570 if (split_aux->begin != NULL)
571 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
574 return (split_aux->proc == NULL
575 || split_aux->proc (c, split_aux->func_aux, ds));
578 /* End-of-file callback used by procedure_with_splits(). */
580 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
582 struct split_aux_data *split_aux = split_aux_;
584 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
585 split_aux->end (split_aux->func_aux, ds);
589 /* Compares the SPLIT FILE variables in cases A and B and returns
590 nonzero only if they differ. */
592 equal_splits (const struct ccase *a, const struct ccase *b,
593 const struct dataset *ds)
595 return case_compare (a, b,
596 dict_get_split_vars (ds->dict),
597 dict_get_split_cnt (ds->dict)) == 0;
600 /* Multipass procedure that separates the data into SPLIT FILE
603 /* Represents auxiliary data for handling SPLIT FILE in a
604 multipass procedure. */
605 struct multipass_split_aux_data
607 struct dataset *dataset; /* The dataset of the split */
608 struct ccase prev_case; /* Data in previous case. */
609 struct casefile *casefile; /* Accumulates data for a split. */
610 split_func *split; /* Function to call with the accumulated
612 void *func_aux; /* Auxiliary data. */
615 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
616 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
617 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
619 /* Returns true if successful, false if an I/O error occurred. */
621 multipass_procedure_with_splits (struct dataset *ds,
625 struct multipass_split_aux_data aux;
628 case_nullify (&aux.prev_case);
631 aux.func_aux = func_aux;
634 ok = internal_procedure (ds, multipass_split_case_func,
635 multipass_split_end_func, &aux);
636 case_destroy (&aux.prev_case);
641 /* Case callback used by multipass_procedure_with_splits(). */
643 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
645 struct multipass_split_aux_data *aux = aux_;
648 /* Start a new series if needed. */
649 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
651 /* Record split values. */
652 case_destroy (&aux->prev_case);
653 case_clone (&aux->prev_case, c);
655 /* Pass any cases to split_func. */
656 if (aux->casefile != NULL)
657 ok = multipass_split_output (aux, ds);
659 /* Start a new casefile. */
661 ds->cf_factory->create_casefile (ds->cf_factory,
662 dict_get_next_value_idx (ds->dict));
665 return casefile_append (aux->casefile, c) && ok;
668 /* End-of-file callback used by multipass_procedure_with_splits(). */
670 multipass_split_end_func (void *aux_, const struct dataset *ds)
672 struct multipass_split_aux_data *aux = aux_;
673 return (aux->casefile == NULL || multipass_split_output (aux, ds));
677 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
681 assert (aux->casefile != NULL);
682 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
683 casefile_destroy (aux->casefile);
684 aux->casefile = NULL;
689 /* Discards all the current state in preparation for a data-input
690 command like DATA LIST or GET. */
692 discard_variables (struct dataset *ds)
694 dict_clear (ds->dict);
695 fh_set_default_handle (NULL);
699 free_case_source (ds->proc_source);
700 proc_set_source (ds, NULL);
702 proc_cancel_all_transformations (ds);
705 /* Returns the current set of permanent transformations,
706 and clears the permanent transformations.
707 For use by INPUT PROGRAM. */
709 proc_capture_transformations (struct dataset *ds)
711 struct trns_chain *chain;
713 assert (ds->temporary_trns_chain == NULL);
714 chain = ds->permanent_trns_chain;
715 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
719 /* Adds a transformation that processes a case with PROC and
720 frees itself with FREE to the current set of transformations.
721 The functions are passed AUX as auxiliary data. */
723 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
725 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
728 /* Adds a transformation that processes a case with PROC and
729 frees itself with FREE to the current set of transformations.
730 When parsing of the block of transformations is complete,
731 FINALIZE will be called.
732 The functions are passed AUX as auxiliary data. */
734 add_transformation_with_finalizer (struct dataset *ds,
735 trns_finalize_func *finalize,
736 trns_proc_func *proc,
737 trns_free_func *free, void *aux)
739 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
742 /* Returns the index of the next transformation.
743 This value can be returned by a transformation procedure
744 function to indicate a "jump" to that transformation. */
746 next_transformation (const struct dataset *ds)
748 return trns_chain_next (ds->cur_trns_chain);
751 /* Returns true if the next call to add_transformation() will add
752 a temporary transformation, false if it will add a permanent
755 proc_in_temporary_transformations (const struct dataset *ds)
757 return ds->temporary_trns_chain != NULL;
760 /* Marks the start of temporary transformations.
761 Further calls to add_transformation() will add temporary
764 proc_start_temporary_transformations (struct dataset *ds)
766 if (!proc_in_temporary_transformations (ds))
768 add_case_limit_trns (ds);
770 ds->permanent_dict = dict_clone (ds->dict);
772 trns_chain_finalize (ds->permanent_trns_chain);
773 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
777 /* Converts all the temporary transformations, if any, to
778 permanent transformations. Further transformations will be
780 Returns true if anything changed, false otherwise. */
782 proc_make_temporary_transformations_permanent (struct dataset *ds)
784 if (proc_in_temporary_transformations (ds))
786 trns_chain_finalize (ds->temporary_trns_chain);
787 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
788 ds->temporary_trns_chain = NULL;
790 dict_destroy (ds->permanent_dict);
791 ds->permanent_dict = NULL;
799 /* Cancels all temporary transformations, if any. Further
800 transformations will be permanent.
801 Returns true if anything changed, false otherwise. */
803 proc_cancel_temporary_transformations (struct dataset *ds)
805 if (proc_in_temporary_transformations (ds))
807 dataset_set_dict (ds, ds->permanent_dict);
808 ds->permanent_dict = NULL;
810 trns_chain_destroy (ds->temporary_trns_chain);
811 ds->temporary_trns_chain = NULL;
819 /* Cancels all transformations, if any.
820 Returns true if successful, false on I/O error. */
822 proc_cancel_all_transformations (struct dataset *ds)
825 ok = trns_chain_destroy (ds->permanent_trns_chain);
826 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
827 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
828 ds->temporary_trns_chain = NULL;
832 /* Initializes procedure handling. */
834 create_dataset (struct casefile_factory *fact,
835 replace_source_callback *rps,
836 replace_dictionary_callback *rds
839 struct dataset *ds = xzalloc (sizeof(*ds));
840 ds->dict = dict_create ();
841 ds->cf_factory = fact;
842 ds->replace_source = rps;
843 ds->replace_dict = rds;
844 proc_cancel_all_transformations (ds);
848 /* Finishes up procedure handling. */
850 destroy_dataset (struct dataset *ds)
852 discard_variables (ds);
853 dict_destroy (ds->dict);
854 trns_chain_destroy (ds->permanent_trns_chain);
858 /* Sets SINK as the destination for procedure output from the
861 proc_set_sink (struct dataset *ds, struct case_sink *sink)
863 assert (ds->proc_sink == NULL);
864 ds->proc_sink = sink;
867 /* Sets SOURCE as the source for procedure input for the next
870 proc_set_source (struct dataset *ds, struct case_source *source)
872 ds->proc_source = source;
874 if ( ds->replace_source )
875 ds->replace_source (ds->proc_source);
878 /* Returns true if a source for the next procedure has been
879 configured, false otherwise. */
881 proc_has_source (const struct dataset *ds)
883 return ds->proc_source != NULL;
886 /* Returns the output from the previous procedure.
887 For use only immediately after executing a procedure.
888 The returned casefile is owned by the caller; it will not be
889 automatically used for the next procedure's input. */
891 proc_capture_output (struct dataset *ds)
893 struct casefile *casefile;
895 /* Try to make sure that this function is called immediately
896 after procedure() or a similar function. */
897 assert (ds->proc_source != NULL);
898 assert (case_source_is_class (ds->proc_source, &storage_source_class));
899 assert (trns_chain_is_empty (ds->permanent_trns_chain));
900 assert (!proc_in_temporary_transformations (ds));
902 casefile = storage_source_decapsulate (ds->proc_source);
903 proc_set_source (ds, NULL);
908 static trns_proc_func case_limit_trns_proc;
909 static trns_free_func case_limit_trns_free;
911 /* Adds a transformation that limits the number of cases that may
912 pass through, if DS->DICT has a case limit. */
914 add_case_limit_trns (struct dataset *ds)
916 size_t case_limit = dict_get_case_limit (ds->dict);
919 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
920 *cases_remaining = case_limit;
921 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
923 dict_set_case_limit (ds->dict, 0);
927 /* Limits the maximum number of cases processed to
930 case_limit_trns_proc (void *cases_remaining_,
931 struct ccase *c UNUSED, casenumber case_nr UNUSED)
933 size_t *cases_remaining = cases_remaining_;
934 if (*cases_remaining > 0)
936 (*cases_remaining)--;
937 return TRNS_CONTINUE;
940 return TRNS_DROP_CASE;
943 /* Frees the data associated with a case limit transformation. */
945 case_limit_trns_free (void *cases_remaining_)
947 size_t *cases_remaining = cases_remaining_;
948 free (cases_remaining);
952 static trns_proc_func filter_trns_proc;
954 /* Adds a temporary transformation to filter data according to
955 the variable specified on FILTER, if any. */
957 add_filter_trns (struct dataset *ds)
959 struct variable *filter_var = dict_get_filter (ds->dict);
960 if (filter_var != NULL)
962 proc_start_temporary_transformations (ds);
963 add_transformation (ds, filter_trns_proc, NULL, filter_var);
967 /* FILTER transformation. */
969 filter_trns_proc (void *filter_var_,
970 struct ccase *c UNUSED, casenumber case_nr UNUSED)
973 struct variable *filter_var = filter_var_;
974 double f = case_num (c, filter_var);
975 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
976 ? TRNS_CONTINUE : TRNS_DROP_CASE);
981 dataset_dict (const struct dataset *ds)
987 /* Set or replace dataset DS's dictionary with DICT.
988 The old dictionary is destroyed */
990 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
992 struct dictionary *old_dict = ds->dict;
994 dict_copy_callbacks (dict, ds->dict);
997 if ( ds->replace_dict )
998 ds->replace_dict (dict);
1000 dict_destroy (old_dict);
1004 dataset_need_lag (struct dataset *ds, int n_before)
1006 ds->n_lag = MAX (ds->n_lag, n_before);
1009 struct casefile_factory *
1010 dataset_get_casefile_factory (const struct dataset *ds)
1012 return ds->cf_factory;