1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casedeque.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
44 /* An abstract factory which creates casefiles */
45 struct casefile_factory *cf_factory;
47 /* Callback which occurs when a procedure provides a new source for
49 replace_source_callback *replace_source ;
51 /* Callback which occurs whenever the DICT is replaced by a new one */
52 replace_dictionary_callback *replace_dict;
54 /* Cases are read from proc_source,
55 pass through permanent_trns_chain (which transforms them into
56 the format described by permanent_dict),
57 are written to proc_sink,
58 pass through temporary_trns_chain (which transforms them into
59 the format described by dict),
60 and are finally passed to the procedure. */
61 struct case_source *proc_source;
62 struct trns_chain *permanent_trns_chain;
63 struct dictionary *permanent_dict;
64 struct case_sink *proc_sink;
65 struct trns_chain *temporary_trns_chain;
66 struct dictionary *dict;
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The compactor used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct dict_compactor *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct casedeque lagged_cases; /* Lagged cases. */
84 bool is_open; /* Procedure open? */
85 struct ccase trns_case; /* Case used for transformations. */
86 struct ccase sink_case; /* Case written to sink, if
87 compacting is necessary. */
88 size_t cases_written; /* Cases output so far. */
90 }; /* struct dataset */
93 static void add_case_limit_trns (struct dataset *ds);
94 static void add_filter_trns (struct dataset *ds);
96 static bool internal_procedure (struct dataset *ds, case_func *,
99 static void update_last_proc_invocation (struct dataset *ds);
100 static void create_trns_case (struct ccase *, struct dictionary *);
101 static void open_active_file (struct dataset *ds);
102 static void clear_case (const struct dataset *ds, struct ccase *c);
103 static bool close_active_file (struct dataset *ds);
105 /* Public functions. */
107 /* Returns the last time the data was read. */
109 time_of_last_procedure (struct dataset *ds)
111 if (ds->last_proc_invocation == 0)
112 update_last_proc_invocation (ds);
113 return ds->last_proc_invocation;
116 /* Regular procedure. */
120 /* Reads the data from the input program and writes it to a new
121 active file. For each case we read from the input program, we
124 1. Execute permanent transformations. If these drop the case,
125 start the next case from step 1.
127 2. Write case to replacement active file.
129 3. Execute temporary transformations. If these drop the case,
130 start the next case from step 1.
132 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
134 Returns true if successful, false if an I/O error occurred. */
136 procedure (struct dataset *ds, case_func *cf, void *aux)
138 update_last_proc_invocation (ds);
140 /* Optimize the trivial case where we're not going to do
141 anything with the data, by not reading the data at all. */
143 && case_source_is_class (ds->proc_source, &storage_source_class)
144 && ds->proc_sink == NULL
145 && (ds->temporary_trns_chain == NULL
146 || trns_chain_is_empty (ds->temporary_trns_chain))
147 && trns_chain_is_empty (ds->permanent_trns_chain))
150 dict_set_case_limit (ds->dict, 0);
151 dict_clear_vectors (ds->dict);
155 return internal_procedure (ds, cf, NULL, aux);
158 /* Multipass procedure. */
160 struct multipass_aux_data
162 struct casefile *casefile;
164 bool (*proc_func) (const struct casefile *, void *aux);
168 /* Case processing function for multipass_procedure(). */
170 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
172 struct multipass_aux_data *aux_data = aux_data_;
173 return casefile_append (aux_data->casefile, c);
176 /* End-of-file function for multipass_procedure(). */
178 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
180 struct multipass_aux_data *aux_data = aux_data_;
181 return (aux_data->proc_func == NULL
182 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
185 /* Procedure that allows multiple passes over the input data.
186 The entire active file is passed to PROC_FUNC, with the given
187 AUX as auxiliary data, as a unit. */
189 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
191 struct multipass_aux_data aux_data;
195 ds->cf_factory->create_casefile (ds->cf_factory,
196 dict_get_next_value_idx (ds->dict));
198 aux_data.proc_func = proc_func;
201 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
202 ok = !casefile_error (aux_data.casefile) && ok;
204 casefile_destroy (aux_data.casefile);
210 /* Procedure implementation. */
212 /* Executes a procedure.
213 Passes each case to CASE_FUNC.
214 Calls END_FUNC after the last case.
215 Returns true if successful, false if an I/O error occurred (or
216 if CASE_FUNC or END_FUNC ever returned false). */
218 internal_procedure (struct dataset *ds, case_func *proc,
226 while (ok && proc_read (ds, &c))
228 ok = proc (c, aux, ds) && ok;
230 ok = end (aux, ds) && ok;
232 if ( proc_close (ds) && ok )
241 /* Opens dataset DS for reading cases with proc_read.
242 proc_close must be called when done. */
244 proc_open (struct dataset *ds)
246 assert (ds->proc_source != NULL);
247 assert (!ds->is_open);
249 update_last_proc_invocation (ds);
251 open_active_file (ds);
254 create_trns_case (&ds->trns_case, ds->dict);
255 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
256 ds->cases_written = 0;
260 /* Reads the next case from dataset DS, which must have been
261 opened for reading with proc_open.
262 Returns true if successful, in which case a pointer to the
263 case is stored in *C.
264 Return false at end of file or if a read error occurs. In
265 this case a null pointer is stored in *C. */
267 proc_read (struct dataset *ds, struct ccase **c)
269 enum trns_result retval = TRNS_DROP_CASE;
271 assert (ds->is_open);
277 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
278 if (retval == TRNS_ERROR)
283 /* Read a case from proc_source. */
284 clear_case (ds, &ds->trns_case);
285 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
288 /* Execute permanent transformations. */
289 case_nr = ds->cases_written + 1;
290 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
291 &ds->trns_case, &case_nr);
292 if (retval != TRNS_CONTINUE)
295 /* Write case to collection of lagged cases. */
298 while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
299 case_destroy (casedeque_pop_back (&ds->lagged_cases));
300 case_clone (casedeque_push_front (&ds->lagged_cases),
304 /* Write case to replacement active file. */
306 if (ds->proc_sink->class->write != NULL)
308 if (ds->compactor != NULL)
310 dict_compactor_compact (ds->compactor, &ds->sink_case,
312 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
315 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
318 /* Execute temporary transformations. */
319 if (ds->temporary_trns_chain != NULL)
321 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
322 &ds->trns_case, &ds->cases_written);
323 if (retval != TRNS_CONTINUE)
332 /* Closes dataset DS for reading.
333 Returns true if successful, false if an I/O error occurred
334 while reading or closing the data set.
335 If DS has not been opened, returns true without doing
338 proc_close (struct dataset *ds)
343 /* Drain any remaining cases. */
347 if (!proc_read (ds, &c))
350 ds->ok = free_case_source (ds->proc_source) && ds->ok;
351 proc_set_source (ds, NULL);
353 case_destroy (&ds->sink_case);
354 case_destroy (&ds->trns_case);
356 ds->ok = close_active_file (ds) && ds->ok;
362 /* Updates last_proc_invocation. */
364 update_last_proc_invocation (struct dataset *ds)
366 ds->last_proc_invocation = time (NULL);
369 /* Creates and returns a case, initializing it from the vectors
370 that say which `value's need to be initialized just once, and
371 which ones need to be re-initialized before every case. */
373 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
375 size_t var_cnt = dict_get_var_cnt (dict);
378 case_create (trns_case, dict_get_next_value_idx (dict));
379 for (i = 0; i < var_cnt; i++)
381 struct variable *v = dict_get_var (dict, i);
382 union value *value = case_data_rw (trns_case, v);
384 if (var_is_numeric (v))
385 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
387 memset (value->s, ' ', var_get_width (v));
391 /* Makes all preparations for reading from the data source and writing
394 open_active_file (struct dataset *ds)
396 add_case_limit_trns (ds);
397 add_filter_trns (ds);
399 /* Finalize transformations. */
400 trns_chain_finalize (ds->cur_trns_chain);
402 /* Make permanent_dict refer to the dictionary right before
403 data reaches the sink. */
404 if (ds->permanent_dict == NULL)
405 ds->permanent_dict = ds->dict;
407 /* Figure out whether to compact. */
409 (dict_compacting_would_shrink (ds->permanent_dict)
410 ? dict_make_compactor (ds->permanent_dict)
414 if (ds->proc_sink == NULL)
415 ds->proc_sink = create_case_sink (&storage_sink_class,
419 if (ds->proc_sink->class->open != NULL)
420 ds->proc_sink->class->open (ds->proc_sink);
422 /* Allocate memory for lagged cases. */
423 casedeque_init (&ds->lagged_cases, ds->n_lag);
426 /* Clears the variables in C that need to be cleared between
429 clear_case (const struct dataset *ds, struct ccase *c)
431 size_t var_cnt = dict_get_var_cnt (ds->dict);
434 for (i = 0; i < var_cnt; i++)
436 struct variable *v = dict_get_var (ds->dict, i);
437 if (!var_get_leave (v))
439 if (var_is_numeric (v))
440 case_data_rw (c, v)->f = SYSMIS;
442 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
447 /* Closes the active file. */
449 close_active_file (struct dataset *ds)
451 /* Free memory for lagged cases. */
452 while (!casedeque_is_empty (&ds->lagged_cases))
453 case_destroy (casedeque_pop_back (&ds->lagged_cases));
454 casedeque_destroy (&ds->lagged_cases);
456 /* Dictionary from before TEMPORARY becomes permanent. */
457 proc_cancel_temporary_transformations (ds);
459 /* Finish compacting. */
460 if (ds->compactor != NULL)
462 dict_compactor_destroy (ds->compactor);
463 dict_compact_values (ds->dict);
464 ds->compactor = NULL;
467 /* Old data sink becomes new data source. */
468 if (ds->proc_sink->class->make_source != NULL)
469 proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
470 free_case_sink (ds->proc_sink);
471 ds->proc_sink = NULL;
473 dict_clear_vectors (ds->dict);
474 ds->permanent_dict = NULL;
475 return proc_cancel_all_transformations (ds);
478 /* Returns a pointer to the lagged case from N_BEFORE cases before the
479 current one, or NULL if there haven't been that many cases yet. */
481 lagged_case (const struct dataset *ds, int n_before)
483 assert (n_before >= 1);
484 assert (n_before <= ds->n_lag);
486 if (n_before <= casedeque_count (&ds->lagged_cases))
487 return casedeque_front (&ds->lagged_cases, n_before - 1);
492 /* Procedure that separates the data into SPLIT FILE groups. */
494 /* Represents auxiliary data for handling SPLIT FILE. */
495 struct split_aux_data
497 struct dataset *dataset; /* The dataset */
498 struct ccase prev_case; /* Data in previous case. */
500 /* Callback functions. */
507 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
508 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
509 static bool split_procedure_end_func (void *, const struct dataset *);
511 /* Like procedure(), but it automatically breaks the case stream
512 into SPLIT FILE break groups. Before each group of cases with
513 identical SPLIT FILE variable values, BEGIN_FUNC is called
514 with the first case in the group.
515 Then PROC_FUNC is called for each case in the group (including
517 END_FUNC is called when the group is finished. FUNC_AUX is
518 passed to each of the functions as auxiliary data.
520 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
521 and END_FUNC will be called at all.
523 If SPLIT FILE is not in effect, then there is one break group
524 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
527 Returns true if successful, false if an I/O error occurred. */
529 procedure_with_splits (struct dataset *ds,
535 struct split_aux_data split_aux;
538 case_nullify (&split_aux.prev_case);
539 split_aux.begin = begin;
540 split_aux.proc = proc;
542 split_aux.func_aux = func_aux;
543 split_aux.dataset = ds;
545 ok = internal_procedure (ds, split_procedure_case_func,
546 split_procedure_end_func, &split_aux);
548 case_destroy (&split_aux.prev_case);
553 /* Case callback used by procedure_with_splits(). */
555 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
557 struct split_aux_data *split_aux = split_aux_;
559 /* Start a new series if needed. */
560 if (case_is_null (&split_aux->prev_case)
561 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
563 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
564 split_aux->end (split_aux->func_aux, ds);
566 case_destroy (&split_aux->prev_case);
567 case_clone (&split_aux->prev_case, c);
569 if (split_aux->begin != NULL)
570 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
573 return (split_aux->proc == NULL
574 || split_aux->proc (c, split_aux->func_aux, ds));
577 /* End-of-file callback used by procedure_with_splits(). */
579 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
581 struct split_aux_data *split_aux = split_aux_;
583 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
584 split_aux->end (split_aux->func_aux, ds);
588 /* Compares the SPLIT FILE variables in cases A and B and returns
589 nonzero only if they differ. */
591 equal_splits (const struct ccase *a, const struct ccase *b,
592 const struct dataset *ds)
594 return case_compare (a, b,
595 dict_get_split_vars (ds->dict),
596 dict_get_split_cnt (ds->dict)) == 0;
599 /* Multipass procedure that separates the data into SPLIT FILE
602 /* Represents auxiliary data for handling SPLIT FILE in a
603 multipass procedure. */
604 struct multipass_split_aux_data
606 struct dataset *dataset; /* The dataset of the split */
607 struct ccase prev_case; /* Data in previous case. */
608 struct casefile *casefile; /* Accumulates data for a split. */
609 split_func *split; /* Function to call with the accumulated
611 void *func_aux; /* Auxiliary data. */
614 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
615 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
616 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
618 /* Returns true if successful, false if an I/O error occurred. */
620 multipass_procedure_with_splits (struct dataset *ds,
624 struct multipass_split_aux_data aux;
627 case_nullify (&aux.prev_case);
630 aux.func_aux = func_aux;
633 ok = internal_procedure (ds, multipass_split_case_func,
634 multipass_split_end_func, &aux);
635 case_destroy (&aux.prev_case);
640 /* Case callback used by multipass_procedure_with_splits(). */
642 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
644 struct multipass_split_aux_data *aux = aux_;
647 /* Start a new series if needed. */
648 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
650 /* Record split values. */
651 case_destroy (&aux->prev_case);
652 case_clone (&aux->prev_case, c);
654 /* Pass any cases to split_func. */
655 if (aux->casefile != NULL)
656 ok = multipass_split_output (aux, ds);
658 /* Start a new casefile. */
660 ds->cf_factory->create_casefile (ds->cf_factory,
661 dict_get_next_value_idx (ds->dict));
664 return casefile_append (aux->casefile, c) && ok;
667 /* End-of-file callback used by multipass_procedure_with_splits(). */
669 multipass_split_end_func (void *aux_, const struct dataset *ds)
671 struct multipass_split_aux_data *aux = aux_;
672 return (aux->casefile == NULL || multipass_split_output (aux, ds));
676 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
680 assert (aux->casefile != NULL);
681 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
682 casefile_destroy (aux->casefile);
683 aux->casefile = NULL;
688 /* Discards all the current state in preparation for a data-input
689 command like DATA LIST or GET. */
691 discard_variables (struct dataset *ds)
693 dict_clear (ds->dict);
694 fh_set_default_handle (NULL);
698 free_case_source (ds->proc_source);
699 proc_set_source (ds, NULL);
701 proc_cancel_all_transformations (ds);
704 /* Returns the current set of permanent transformations,
705 and clears the permanent transformations.
706 For use by INPUT PROGRAM. */
708 proc_capture_transformations (struct dataset *ds)
710 struct trns_chain *chain;
712 assert (ds->temporary_trns_chain == NULL);
713 chain = ds->permanent_trns_chain;
714 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
718 /* Adds a transformation that processes a case with PROC and
719 frees itself with FREE to the current set of transformations.
720 The functions are passed AUX as auxiliary data. */
722 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
724 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
727 /* Adds a transformation that processes a case with PROC and
728 frees itself with FREE to the current set of transformations.
729 When parsing of the block of transformations is complete,
730 FINALIZE will be called.
731 The functions are passed AUX as auxiliary data. */
733 add_transformation_with_finalizer (struct dataset *ds,
734 trns_finalize_func *finalize,
735 trns_proc_func *proc,
736 trns_free_func *free, void *aux)
738 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
741 /* Returns the index of the next transformation.
742 This value can be returned by a transformation procedure
743 function to indicate a "jump" to that transformation. */
745 next_transformation (const struct dataset *ds)
747 return trns_chain_next (ds->cur_trns_chain);
750 /* Returns true if the next call to add_transformation() will add
751 a temporary transformation, false if it will add a permanent
754 proc_in_temporary_transformations (const struct dataset *ds)
756 return ds->temporary_trns_chain != NULL;
759 /* Marks the start of temporary transformations.
760 Further calls to add_transformation() will add temporary
763 proc_start_temporary_transformations (struct dataset *ds)
765 if (!proc_in_temporary_transformations (ds))
767 add_case_limit_trns (ds);
769 ds->permanent_dict = dict_clone (ds->dict);
771 trns_chain_finalize (ds->permanent_trns_chain);
772 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
776 /* Converts all the temporary transformations, if any, to
777 permanent transformations. Further transformations will be
779 Returns true if anything changed, false otherwise. */
781 proc_make_temporary_transformations_permanent (struct dataset *ds)
783 if (proc_in_temporary_transformations (ds))
785 trns_chain_finalize (ds->temporary_trns_chain);
786 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
787 ds->temporary_trns_chain = NULL;
789 dict_destroy (ds->permanent_dict);
790 ds->permanent_dict = NULL;
798 /* Cancels all temporary transformations, if any. Further
799 transformations will be permanent.
800 Returns true if anything changed, false otherwise. */
802 proc_cancel_temporary_transformations (struct dataset *ds)
804 if (proc_in_temporary_transformations (ds))
806 dataset_set_dict (ds, ds->permanent_dict);
807 ds->permanent_dict = NULL;
809 trns_chain_destroy (ds->temporary_trns_chain);
810 ds->temporary_trns_chain = NULL;
818 /* Cancels all transformations, if any.
819 Returns true if successful, false on I/O error. */
821 proc_cancel_all_transformations (struct dataset *ds)
824 ok = trns_chain_destroy (ds->permanent_trns_chain);
825 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
826 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
827 ds->temporary_trns_chain = NULL;
831 /* Initializes procedure handling. */
833 create_dataset (struct casefile_factory *fact,
834 replace_source_callback *rps,
835 replace_dictionary_callback *rds
838 struct dataset *ds = xzalloc (sizeof(*ds));
839 ds->dict = dict_create ();
840 ds->cf_factory = fact;
841 ds->replace_source = rps;
842 ds->replace_dict = rds;
843 proc_cancel_all_transformations (ds);
847 /* Finishes up procedure handling. */
849 destroy_dataset (struct dataset *ds)
851 discard_variables (ds);
852 dict_destroy (ds->dict);
853 trns_chain_destroy (ds->permanent_trns_chain);
857 /* Sets SINK as the destination for procedure output from the
860 proc_set_sink (struct dataset *ds, struct case_sink *sink)
862 assert (ds->proc_sink == NULL);
863 ds->proc_sink = sink;
866 /* Sets SOURCE as the source for procedure input for the next
869 proc_set_source (struct dataset *ds, struct case_source *source)
871 ds->proc_source = source;
873 if ( ds->replace_source )
874 ds->replace_source (ds->proc_source);
877 /* Returns true if a source for the next procedure has been
878 configured, false otherwise. */
880 proc_has_source (const struct dataset *ds)
882 return ds->proc_source != NULL;
885 /* Returns the output from the previous procedure.
886 For use only immediately after executing a procedure.
887 The returned casefile is owned by the caller; it will not be
888 automatically used for the next procedure's input. */
890 proc_capture_output (struct dataset *ds)
892 struct casefile *casefile;
894 /* Try to make sure that this function is called immediately
895 after procedure() or a similar function. */
896 assert (ds->proc_source != NULL);
897 assert (case_source_is_class (ds->proc_source, &storage_source_class));
898 assert (trns_chain_is_empty (ds->permanent_trns_chain));
899 assert (!proc_in_temporary_transformations (ds));
901 casefile = storage_source_decapsulate (ds->proc_source);
902 proc_set_source (ds, NULL);
907 static trns_proc_func case_limit_trns_proc;
908 static trns_free_func case_limit_trns_free;
910 /* Adds a transformation that limits the number of cases that may
911 pass through, if DS->DICT has a case limit. */
913 add_case_limit_trns (struct dataset *ds)
915 size_t case_limit = dict_get_case_limit (ds->dict);
918 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
919 *cases_remaining = case_limit;
920 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
922 dict_set_case_limit (ds->dict, 0);
926 /* Limits the maximum number of cases processed to
929 case_limit_trns_proc (void *cases_remaining_,
930 struct ccase *c UNUSED, casenumber case_nr UNUSED)
932 size_t *cases_remaining = cases_remaining_;
933 if (*cases_remaining > 0)
935 (*cases_remaining)--;
936 return TRNS_CONTINUE;
939 return TRNS_DROP_CASE;
942 /* Frees the data associated with a case limit transformation. */
944 case_limit_trns_free (void *cases_remaining_)
946 size_t *cases_remaining = cases_remaining_;
947 free (cases_remaining);
951 static trns_proc_func filter_trns_proc;
953 /* Adds a temporary transformation to filter data according to
954 the variable specified on FILTER, if any. */
956 add_filter_trns (struct dataset *ds)
958 struct variable *filter_var = dict_get_filter (ds->dict);
959 if (filter_var != NULL)
961 proc_start_temporary_transformations (ds);
962 add_transformation (ds, filter_trns_proc, NULL, filter_var);
966 /* FILTER transformation. */
968 filter_trns_proc (void *filter_var_,
969 struct ccase *c UNUSED, casenumber case_nr UNUSED)
972 struct variable *filter_var = filter_var_;
973 double f = case_num (c, filter_var);
974 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
975 ? TRNS_CONTINUE : TRNS_DROP_CASE);
980 dataset_dict (const struct dataset *ds)
986 /* Set or replace dataset DS's dictionary with DICT.
987 The old dictionary is destroyed */
989 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
991 struct dictionary *old_dict = ds->dict;
993 dict_copy_callbacks (dict, ds->dict);
996 if ( ds->replace_dict )
997 ds->replace_dict (dict);
999 dict_destroy (old_dict);
1003 dataset_need_lag (struct dataset *ds, int n_before)
1005 ds->n_lag = MAX (ds->n_lag, n_before);
1008 struct casefile_factory *
1009 dataset_get_casefile_factory (const struct dataset *ds)
1011 return ds->cf_factory;