1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
43 /* An abstract factory which creates casefiles */
44 struct casefile_factory *cf_factory;
46 /* Cases are read from proc_source,
47 pass through permanent_trns_chain (which transforms them into
48 the format described by permanent_dict),
49 are written to proc_sink,
50 pass through temporary_trns_chain (which transforms them into
51 the format described by dict),
52 and are finally passed to the procedure. */
53 struct case_source *proc_source;
54 struct trns_chain *permanent_trns_chain;
55 struct dictionary *permanent_dict;
56 struct case_sink *proc_sink;
57 struct trns_chain *temporary_trns_chain;
58 struct dictionary *dict;
60 /* The transformation chain that the next transformation will be
62 struct trns_chain *cur_trns_chain;
64 /* The compactor used to compact a case, if necessary;
65 otherwise a null pointer. */
66 struct dict_compactor *compactor;
68 /* Time at which proc was last invoked. */
69 time_t last_proc_invocation;
72 int n_lag; /* Number of cases to lag. */
73 int lag_count; /* Number of cases in lag_queue so far. */
74 int lag_head; /* Index where next case will be added. */
75 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
78 bool is_open; /* Procedure open? */
79 struct ccase trns_case; /* Case used for transformations. */
80 struct ccase sink_case; /* Case written to sink, if
81 compacting is necessary. */
82 size_t cases_written; /* Cases output so far. */
84 }; /* struct dataset */
87 static void add_case_limit_trns (struct dataset *ds);
88 static void add_filter_trns (struct dataset *ds);
90 static bool internal_procedure (struct dataset *ds, case_func *,
93 static void update_last_proc_invocation (struct dataset *ds);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (struct dataset *ds);
96 static void lag_case (struct dataset *ds, const struct ccase *c);
97 static void clear_case (const struct dataset *ds, struct ccase *c);
98 static bool close_active_file (struct dataset *ds);
100 /* Public functions. */
102 /* Returns the last time the data was read. */
104 time_of_last_procedure (struct dataset *ds)
106 if (ds->last_proc_invocation == 0)
107 update_last_proc_invocation (ds);
108 return ds->last_proc_invocation;
111 /* Regular procedure. */
115 /* Reads the data from the input program and writes it to a new
116 active file. For each case we read from the input program, we
119 1. Execute permanent transformations. If these drop the case,
120 start the next case from step 1.
122 2. Write case to replacement active file.
124 3. Execute temporary transformations. If these drop the case,
125 start the next case from step 1.
127 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
129 Returns true if successful, false if an I/O error occurred. */
131 procedure (struct dataset *ds, case_func *cf, void *aux)
133 update_last_proc_invocation (ds);
135 /* Optimize the trivial case where we're not going to do
136 anything with the data, by not reading the data at all. */
138 && case_source_is_class (ds->proc_source, &storage_source_class)
139 && ds->proc_sink == NULL
140 && (ds->temporary_trns_chain == NULL
141 || trns_chain_is_empty (ds->temporary_trns_chain))
142 && trns_chain_is_empty (ds->permanent_trns_chain))
145 dict_set_case_limit (ds->dict, 0);
146 dict_clear_vectors (ds->dict);
150 return internal_procedure (ds, cf, NULL, aux);
153 /* Multipass procedure. */
155 struct multipass_aux_data
157 struct casefile *casefile;
159 bool (*proc_func) (const struct casefile *, void *aux);
163 /* Case processing function for multipass_procedure(). */
165 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
167 struct multipass_aux_data *aux_data = aux_data_;
168 return casefile_append (aux_data->casefile, c);
171 /* End-of-file function for multipass_procedure(). */
173 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
175 struct multipass_aux_data *aux_data = aux_data_;
176 return (aux_data->proc_func == NULL
177 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
180 /* Procedure that allows multiple passes over the input data.
181 The entire active file is passed to PROC_FUNC, with the given
182 AUX as auxiliary data, as a unit. */
184 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
186 struct multipass_aux_data aux_data;
190 ds->cf_factory->create_casefile (ds->cf_factory,
191 dict_get_next_value_idx (ds->dict));
193 aux_data.proc_func = proc_func;
196 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
197 ok = !casefile_error (aux_data.casefile) && ok;
199 casefile_destroy (aux_data.casefile);
204 /* Procedure implementation. */
206 /* Executes a procedure.
207 Passes each case to CASE_FUNC.
208 Calls END_FUNC after the last case.
209 Returns true if successful, false if an I/O error occurred (or
210 if CASE_FUNC or END_FUNC ever returned false). */
212 internal_procedure (struct dataset *ds, case_func *proc,
220 while (ok && proc_read (ds, &c))
222 ok = proc (c, aux, ds) && ok;
224 ok = end (aux, ds) && ok;
225 return proc_close (ds) && ok;
228 /* Opens dataset DS for reading cases with proc_read.
229 proc_close must be called when done. */
231 proc_open (struct dataset *ds)
233 assert (ds->proc_source != NULL);
234 assert (!ds->is_open);
236 update_last_proc_invocation (ds);
238 open_active_file (ds);
241 create_trns_case (&ds->trns_case, ds->dict);
242 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
243 ds->cases_written = 0;
247 /* Reads the next case from dataset DS, which must have been
248 opened for reading with proc_open.
249 Returns true if successful, in which case a pointer to the
250 case is stored in *C.
251 Return false at end of file or if a read error occurs. In
252 this case a null pointer is stored in *C. */
254 proc_read (struct dataset *ds, struct ccase **c)
256 enum trns_result retval = TRNS_DROP_CASE;
258 assert (ds->is_open);
264 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
265 if (retval == TRNS_ERROR)
270 /* Read a case from proc_source. */
271 clear_case (ds, &ds->trns_case);
272 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
275 /* Execute permanent transformations. */
276 case_nr = ds->cases_written + 1;
277 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
278 &ds->trns_case, &case_nr);
279 if (retval != TRNS_CONTINUE)
282 /* Write case to LAG queue. */
284 lag_case (ds, &ds->trns_case);
286 /* Write case to replacement active file. */
288 if (ds->proc_sink->class->write != NULL)
290 if (ds->compactor != NULL)
292 dict_compactor_compact (ds->compactor, &ds->sink_case,
294 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
297 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
300 /* Execute temporary transformations. */
301 if (ds->temporary_trns_chain != NULL)
303 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
304 &ds->trns_case, &ds->cases_written);
305 if (retval != TRNS_CONTINUE)
314 /* Closes dataset DS for reading.
315 Returns true if successful, false if an I/O error occurred
316 while reading or closing the data set.
317 If DS has not been opened, returns true without doing
320 proc_close (struct dataset *ds)
325 /* Drain any remaining cases. */
329 if (!proc_read (ds, &c))
332 ds->ok = free_case_source (ds->proc_source) && ds->ok;
333 ds->proc_source = NULL;
335 case_destroy (&ds->sink_case);
336 case_destroy (&ds->trns_case);
338 ds->ok = close_active_file (ds) && ds->ok;
344 /* Updates last_proc_invocation. */
346 update_last_proc_invocation (struct dataset *ds)
348 ds->last_proc_invocation = time (NULL);
351 /* Creates and returns a case, initializing it from the vectors
352 that say which `value's need to be initialized just once, and
353 which ones need to be re-initialized before every case. */
355 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
357 size_t var_cnt = dict_get_var_cnt (dict);
360 case_create (trns_case, dict_get_next_value_idx (dict));
361 for (i = 0; i < var_cnt; i++)
363 struct variable *v = dict_get_var (dict, i);
364 union value *value = case_data_rw (trns_case, v);
366 if (var_is_numeric (v))
367 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
369 memset (value->s, ' ', var_get_width (v));
373 /* Makes all preparations for reading from the data source and writing
376 open_active_file (struct dataset *ds)
378 add_case_limit_trns (ds);
379 add_filter_trns (ds);
381 /* Finalize transformations. */
382 trns_chain_finalize (ds->cur_trns_chain);
384 /* Make permanent_dict refer to the dictionary right before
385 data reaches the sink. */
386 if (ds->permanent_dict == NULL)
387 ds->permanent_dict = ds->dict;
389 /* Figure out whether to compact. */
391 (dict_compacting_would_shrink (ds->permanent_dict)
392 ? dict_make_compactor (ds->permanent_dict)
396 if (ds->proc_sink == NULL)
397 ds->proc_sink = create_case_sink (&storage_sink_class,
401 if (ds->proc_sink->class->open != NULL)
402 ds->proc_sink->class->open (ds->proc_sink);
404 /* Allocate memory for lag queue. */
411 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
412 for (i = 0; i < ds->n_lag; i++)
413 case_nullify (&ds->lag_queue[i]);
417 /* Add C to the lag queue. */
419 lag_case (struct dataset *ds, const struct ccase *c)
421 if (ds->lag_count < ds->n_lag)
423 case_destroy (&ds->lag_queue[ds->lag_head]);
424 case_clone (&ds->lag_queue[ds->lag_head], c);
425 if (++ds->lag_head >= ds->n_lag)
429 /* Clears the variables in C that need to be cleared between
432 clear_case (const struct dataset *ds, struct ccase *c)
434 size_t var_cnt = dict_get_var_cnt (ds->dict);
437 for (i = 0; i < var_cnt; i++)
439 struct variable *v = dict_get_var (ds->dict, i);
440 if (!var_get_leave (v))
442 if (var_is_numeric (v))
443 case_data_rw (c, v)->f = SYSMIS;
445 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
450 /* Closes the active file. */
452 close_active_file (struct dataset *ds)
454 /* Free memory for lag queue, and turn off lagging. */
459 for (i = 0; i < ds->n_lag; i++)
460 case_destroy (&ds->lag_queue[i]);
461 free (ds->lag_queue);
465 /* Dictionary from before TEMPORARY becomes permanent. */
466 proc_cancel_temporary_transformations (ds);
468 /* Finish compacting. */
469 if (ds->compactor != NULL)
471 dict_compactor_destroy (ds->compactor);
472 dict_compact_values (ds->dict);
473 ds->compactor = NULL;
476 /* Old data sink becomes new data source. */
477 if (ds->proc_sink->class->make_source != NULL)
478 ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
479 free_case_sink (ds->proc_sink);
480 ds->proc_sink = NULL;
482 dict_clear_vectors (ds->dict);
483 ds->permanent_dict = NULL;
484 return proc_cancel_all_transformations (ds);
487 /* Returns a pointer to the lagged case from N_BEFORE cases before the
488 current one, or NULL if there haven't been that many cases yet. */
490 lagged_case (const struct dataset *ds, int n_before)
492 assert (n_before >= 1 );
493 assert (n_before <= ds->n_lag);
495 if (n_before <= ds->lag_count)
497 int index = ds->lag_head - n_before;
500 return &ds->lag_queue[index];
506 /* Procedure that separates the data into SPLIT FILE groups. */
508 /* Represents auxiliary data for handling SPLIT FILE. */
509 struct split_aux_data
511 struct dataset *dataset; /* The dataset */
512 struct ccase prev_case; /* Data in previous case. */
514 /* Callback functions. */
521 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
522 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
523 static bool split_procedure_end_func (void *, const struct dataset *);
525 /* Like procedure(), but it automatically breaks the case stream
526 into SPLIT FILE break groups. Before each group of cases with
527 identical SPLIT FILE variable values, BEGIN_FUNC is called
528 with the first case in the group.
529 Then PROC_FUNC is called for each case in the group (including
531 END_FUNC is called when the group is finished. FUNC_AUX is
532 passed to each of the functions as auxiliary data.
534 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
535 and END_FUNC will be called at all.
537 If SPLIT FILE is not in effect, then there is one break group
538 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
541 Returns true if successful, false if an I/O error occurred. */
543 procedure_with_splits (struct dataset *ds,
549 struct split_aux_data split_aux;
552 case_nullify (&split_aux.prev_case);
553 split_aux.begin = begin;
554 split_aux.proc = proc;
556 split_aux.func_aux = func_aux;
557 split_aux.dataset = ds;
559 ok = internal_procedure (ds, split_procedure_case_func,
560 split_procedure_end_func, &split_aux);
562 case_destroy (&split_aux.prev_case);
567 /* Case callback used by procedure_with_splits(). */
569 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
571 struct split_aux_data *split_aux = split_aux_;
573 /* Start a new series if needed. */
574 if (case_is_null (&split_aux->prev_case)
575 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
577 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
578 split_aux->end (split_aux->func_aux, ds);
580 case_destroy (&split_aux->prev_case);
581 case_clone (&split_aux->prev_case, c);
583 if (split_aux->begin != NULL)
584 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
587 return (split_aux->proc == NULL
588 || split_aux->proc (c, split_aux->func_aux, ds));
591 /* End-of-file callback used by procedure_with_splits(). */
593 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
595 struct split_aux_data *split_aux = split_aux_;
597 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
598 split_aux->end (split_aux->func_aux, ds);
602 /* Compares the SPLIT FILE variables in cases A and B and returns
603 nonzero only if they differ. */
605 equal_splits (const struct ccase *a, const struct ccase *b,
606 const struct dataset *ds)
608 return case_compare (a, b,
609 dict_get_split_vars (ds->dict),
610 dict_get_split_cnt (ds->dict)) == 0;
613 /* Multipass procedure that separates the data into SPLIT FILE
616 /* Represents auxiliary data for handling SPLIT FILE in a
617 multipass procedure. */
618 struct multipass_split_aux_data
620 struct dataset *dataset; /* The dataset of the split */
621 struct ccase prev_case; /* Data in previous case. */
622 struct casefile *casefile; /* Accumulates data for a split. */
623 split_func *split; /* Function to call with the accumulated
625 void *func_aux; /* Auxiliary data. */
628 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
629 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
630 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
632 /* Returns true if successful, false if an I/O error occurred. */
634 multipass_procedure_with_splits (struct dataset *ds,
638 struct multipass_split_aux_data aux;
641 case_nullify (&aux.prev_case);
644 aux.func_aux = func_aux;
647 ok = internal_procedure (ds, multipass_split_case_func,
648 multipass_split_end_func, &aux);
649 case_destroy (&aux.prev_case);
654 /* Case callback used by multipass_procedure_with_splits(). */
656 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
658 struct multipass_split_aux_data *aux = aux_;
661 /* Start a new series if needed. */
662 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
664 /* Record split values. */
665 case_destroy (&aux->prev_case);
666 case_clone (&aux->prev_case, c);
668 /* Pass any cases to split_func. */
669 if (aux->casefile != NULL)
670 ok = multipass_split_output (aux, ds);
672 /* Start a new casefile. */
674 ds->cf_factory->create_casefile (ds->cf_factory,
675 dict_get_next_value_idx (ds->dict));
678 return casefile_append (aux->casefile, c) && ok;
681 /* End-of-file callback used by multipass_procedure_with_splits(). */
683 multipass_split_end_func (void *aux_, const struct dataset *ds)
685 struct multipass_split_aux_data *aux = aux_;
686 return (aux->casefile == NULL || multipass_split_output (aux, ds));
690 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
694 assert (aux->casefile != NULL);
695 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
696 casefile_destroy (aux->casefile);
697 aux->casefile = NULL;
702 /* Discards all the current state in preparation for a data-input
703 command like DATA LIST or GET. */
705 discard_variables (struct dataset *ds)
707 dict_clear (ds->dict);
708 fh_set_default_handle (NULL);
712 free_case_source (ds->proc_source);
713 ds->proc_source = NULL;
715 proc_cancel_all_transformations (ds);
718 /* Returns the current set of permanent transformations,
719 and clears the permanent transformations.
720 For use by INPUT PROGRAM. */
722 proc_capture_transformations (struct dataset *ds)
724 struct trns_chain *chain;
726 assert (ds->temporary_trns_chain == NULL);
727 chain = ds->permanent_trns_chain;
728 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
732 /* Adds a transformation that processes a case with PROC and
733 frees itself with FREE to the current set of transformations.
734 The functions are passed AUX as auxiliary data. */
736 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
738 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
741 /* Adds a transformation that processes a case with PROC and
742 frees itself with FREE to the current set of transformations.
743 When parsing of the block of transformations is complete,
744 FINALIZE will be called.
745 The functions are passed AUX as auxiliary data. */
747 add_transformation_with_finalizer (struct dataset *ds,
748 trns_finalize_func *finalize,
749 trns_proc_func *proc,
750 trns_free_func *free, void *aux)
752 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
755 /* Returns the index of the next transformation.
756 This value can be returned by a transformation procedure
757 function to indicate a "jump" to that transformation. */
759 next_transformation (const struct dataset *ds)
761 return trns_chain_next (ds->cur_trns_chain);
764 /* Returns true if the next call to add_transformation() will add
765 a temporary transformation, false if it will add a permanent
768 proc_in_temporary_transformations (const struct dataset *ds)
770 return ds->temporary_trns_chain != NULL;
773 /* Marks the start of temporary transformations.
774 Further calls to add_transformation() will add temporary
777 proc_start_temporary_transformations (struct dataset *ds)
779 if (!proc_in_temporary_transformations (ds))
781 add_case_limit_trns (ds);
783 ds->permanent_dict = dict_clone (ds->dict);
784 trns_chain_finalize (ds->permanent_trns_chain);
785 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
789 /* Converts all the temporary transformations, if any, to
790 permanent transformations. Further transformations will be
792 Returns true if anything changed, false otherwise. */
794 proc_make_temporary_transformations_permanent (struct dataset *ds)
796 if (proc_in_temporary_transformations (ds))
798 trns_chain_finalize (ds->temporary_trns_chain);
799 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
800 ds->temporary_trns_chain = NULL;
802 dict_destroy (ds->permanent_dict);
803 ds->permanent_dict = NULL;
811 /* Cancels all temporary transformations, if any. Further
812 transformations will be permanent.
813 Returns true if anything changed, false otherwise. */
815 proc_cancel_temporary_transformations (struct dataset *ds)
817 if (proc_in_temporary_transformations (ds))
819 dict_destroy (ds->dict);
820 ds->dict = ds->permanent_dict;
821 ds->permanent_dict = NULL;
823 trns_chain_destroy (ds->temporary_trns_chain);
824 ds->temporary_trns_chain = NULL;
832 /* Cancels all transformations, if any.
833 Returns true if successful, false on I/O error. */
835 proc_cancel_all_transformations (struct dataset *ds)
838 ok = trns_chain_destroy (ds->permanent_trns_chain);
839 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
840 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
841 ds->temporary_trns_chain = NULL;
845 /* Initializes procedure handling. */
847 create_dataset (struct casefile_factory *fact)
849 struct dataset *ds = xzalloc (sizeof(*ds));
850 ds->dict = dict_create ();
851 ds->cf_factory = fact;
852 proc_cancel_all_transformations (ds);
856 /* Finishes up procedure handling. */
858 destroy_dataset (struct dataset *ds)
860 discard_variables (ds);
861 dict_destroy (ds->dict);
862 trns_chain_destroy (ds->permanent_trns_chain);
866 /* Sets SINK as the destination for procedure output from the
869 proc_set_sink (struct dataset *ds, struct case_sink *sink)
871 assert (ds->proc_sink == NULL);
872 ds->proc_sink = sink;
875 /* Sets SOURCE as the source for procedure input for the next
878 proc_set_source (struct dataset *ds, struct case_source *source)
880 assert (ds->proc_source == NULL);
881 ds->proc_source = source;
884 /* Returns true if a source for the next procedure has been
885 configured, false otherwise. */
887 proc_has_source (const struct dataset *ds)
889 return ds->proc_source != NULL;
892 /* Returns the output from the previous procedure.
893 For use only immediately after executing a procedure.
894 The returned casefile is owned by the caller; it will not be
895 automatically used for the next procedure's input. */
897 proc_capture_output (struct dataset *ds)
899 struct casefile *casefile;
901 /* Try to make sure that this function is called immediately
902 after procedure() or a similar function. */
903 assert (ds->proc_source != NULL);
904 assert (case_source_is_class (ds->proc_source, &storage_source_class));
905 assert (trns_chain_is_empty (ds->permanent_trns_chain));
906 assert (!proc_in_temporary_transformations (ds));
908 casefile = storage_source_decapsulate (ds->proc_source);
909 ds->proc_source = NULL;
914 static trns_proc_func case_limit_trns_proc;
915 static trns_free_func case_limit_trns_free;
917 /* Adds a transformation that limits the number of cases that may
918 pass through, if DS->DICT has a case limit. */
920 add_case_limit_trns (struct dataset *ds)
922 size_t case_limit = dict_get_case_limit (ds->dict);
925 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
926 *cases_remaining = case_limit;
927 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
929 dict_set_case_limit (ds->dict, 0);
933 /* Limits the maximum number of cases processed to
936 case_limit_trns_proc (void *cases_remaining_,
937 struct ccase *c UNUSED, casenumber case_nr UNUSED)
939 size_t *cases_remaining = cases_remaining_;
940 if (*cases_remaining > 0)
942 (*cases_remaining)--;
943 return TRNS_CONTINUE;
946 return TRNS_DROP_CASE;
949 /* Frees the data associated with a case limit transformation. */
951 case_limit_trns_free (void *cases_remaining_)
953 size_t *cases_remaining = cases_remaining_;
954 free (cases_remaining);
958 static trns_proc_func filter_trns_proc;
960 /* Adds a temporary transformation to filter data according to
961 the variable specified on FILTER, if any. */
963 add_filter_trns (struct dataset *ds)
965 struct variable *filter_var = dict_get_filter (ds->dict);
966 if (filter_var != NULL)
968 proc_start_temporary_transformations (ds);
969 add_transformation (ds, filter_trns_proc, NULL, filter_var);
973 /* FILTER transformation. */
975 filter_trns_proc (void *filter_var_,
976 struct ccase *c UNUSED, casenumber case_nr UNUSED)
979 struct variable *filter_var = filter_var_;
980 double f = case_num (c, filter_var);
981 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
982 ? TRNS_CONTINUE : TRNS_DROP_CASE);
987 dataset_dict (const struct dataset *ds)
994 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
1000 dataset_n_lag (const struct dataset *ds)
1006 dataset_set_n_lag (struct dataset *ds, int n_lag)
1012 struct casefile_factory *
1013 dataset_get_casefile_factory (const struct dataset *ds)
1015 return ds->cf_factory;