1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
42 /* Cases are read from proc_source,
43 pass through permanent_trns_chain (which transforms them into
44 the format described by permanent_dict),
45 are written to proc_sink,
46 pass through temporary_trns_chain (which transforms them into
47 the format described by dict),
48 and are finally passed to the procedure. */
49 struct case_source *proc_source;
50 struct trns_chain *permanent_trns_chain;
51 struct dictionary *permanent_dict;
52 struct case_sink *proc_sink;
53 struct trns_chain *temporary_trns_chain;
54 struct dictionary *dict;
56 /* The transformation chain that the next transformation will be
58 struct trns_chain *cur_trns_chain;
60 /* The compactor used to compact a case, if necessary;
61 otherwise a null pointer. */
62 struct dict_compactor *compactor;
64 /* Time at which proc was last invoked. */
65 time_t last_proc_invocation;
68 int n_lag; /* Number of cases to lag. */
69 int lag_count; /* Number of cases in lag_queue so far. */
70 int lag_head; /* Index where next case will be added. */
71 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
74 bool is_open; /* Procedure open? */
75 struct ccase trns_case; /* Case used for transformations. */
76 struct ccase sink_case; /* Case written to sink, if
77 compacting is necessary. */
78 size_t cases_written; /* Cases output so far. */
80 }; /* struct dataset */
83 static void add_case_limit_trns (struct dataset *ds);
84 static void add_filter_trns (struct dataset *ds);
86 static bool internal_procedure (struct dataset *ds, case_func *,
89 static void update_last_proc_invocation (struct dataset *ds);
90 static void create_trns_case (struct ccase *, struct dictionary *);
91 static void open_active_file (struct dataset *ds);
92 static void lag_case (struct dataset *ds, const struct ccase *c);
93 static void clear_case (const struct dataset *ds, struct ccase *c);
94 static bool close_active_file (struct dataset *ds);
96 /* Public functions. */
98 /* Returns the last time the data was read. */
100 time_of_last_procedure (struct dataset *ds)
102 if (ds->last_proc_invocation == 0)
103 update_last_proc_invocation (ds);
104 return ds->last_proc_invocation;
107 /* Regular procedure. */
111 /* Reads the data from the input program and writes it to a new
112 active file. For each case we read from the input program, we
115 1. Execute permanent transformations. If these drop the case,
116 start the next case from step 1.
118 2. Write case to replacement active file.
120 3. Execute temporary transformations. If these drop the case,
121 start the next case from step 1.
123 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
125 Returns true if successful, false if an I/O error occurred. */
127 procedure (struct dataset *ds, case_func *cf, void *aux)
129 update_last_proc_invocation (ds);
131 /* Optimize the trivial case where we're not going to do
132 anything with the data, by not reading the data at all. */
134 && case_source_is_class (ds->proc_source, &storage_source_class)
135 && ds->proc_sink == NULL
136 && (ds->temporary_trns_chain == NULL
137 || trns_chain_is_empty (ds->temporary_trns_chain))
138 && trns_chain_is_empty (ds->permanent_trns_chain))
141 dict_set_case_limit (ds->dict, 0);
142 dict_clear_vectors (ds->dict);
146 return internal_procedure (ds, cf, NULL, aux);
149 /* Multipass procedure. */
151 struct multipass_aux_data
153 struct casefile *casefile;
155 bool (*proc_func) (const struct casefile *, void *aux);
159 /* Case processing function for multipass_procedure(). */
161 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
163 struct multipass_aux_data *aux_data = aux_data_;
164 return casefile_append (aux_data->casefile, c);
167 /* End-of-file function for multipass_procedure(). */
169 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
171 struct multipass_aux_data *aux_data = aux_data_;
172 return (aux_data->proc_func == NULL
173 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
176 /* Procedure that allows multiple passes over the input data.
177 The entire active file is passed to PROC_FUNC, with the given
178 AUX as auxiliary data, as a unit. */
180 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
182 struct multipass_aux_data aux_data;
185 aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
186 aux_data.proc_func = proc_func;
189 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
190 ok = !casefile_error (aux_data.casefile) && ok;
192 casefile_destroy (aux_data.casefile);
197 /* Procedure implementation. */
199 /* Executes a procedure.
200 Passes each case to CASE_FUNC.
201 Calls END_FUNC after the last case.
202 Returns true if successful, false if an I/O error occurred (or
203 if CASE_FUNC or END_FUNC ever returned false). */
205 internal_procedure (struct dataset *ds, case_func *proc,
213 while (ok && proc_read (ds, &c))
215 ok = proc (c, aux, ds) && ok;
217 ok = end (aux, ds) && ok;
218 return proc_close (ds) && ok;
221 /* Opens dataset DS for reading cases with proc_read.
222 proc_close must be called when done. */
224 proc_open (struct dataset *ds)
226 assert (ds->proc_source != NULL);
227 assert (!ds->is_open);
229 update_last_proc_invocation (ds);
231 open_active_file (ds);
234 create_trns_case (&ds->trns_case, ds->dict);
235 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
236 ds->cases_written = 0;
240 /* Reads the next case from dataset DS, which must have been
241 opened for reading with proc_open.
242 Returns true if successful, in which case a pointer to the
243 case is stored in *C.
244 Return false at end of file or if a read error occurs. In
245 this case a null pointer is stored in *C. */
247 proc_read (struct dataset *ds, struct ccase **c)
249 enum trns_result retval = TRNS_DROP_CASE;
251 assert (ds->is_open);
257 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
258 if (retval == TRNS_ERROR)
263 /* Read a case from proc_source. */
264 clear_case (ds, &ds->trns_case);
265 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
268 /* Execute permanent transformations. */
269 case_nr = ds->cases_written + 1;
270 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
271 &ds->trns_case, &case_nr);
272 if (retval != TRNS_CONTINUE)
275 /* Write case to LAG queue. */
277 lag_case (ds, &ds->trns_case);
279 /* Write case to replacement active file. */
281 if (ds->proc_sink->class->write != NULL)
283 if (ds->compactor != NULL)
285 dict_compactor_compact (ds->compactor, &ds->sink_case,
287 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
290 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
293 /* Execute temporary transformations. */
294 if (ds->temporary_trns_chain != NULL)
296 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
297 &ds->trns_case, &ds->cases_written);
298 if (retval != TRNS_CONTINUE)
307 /* Closes dataset DS for reading.
308 Returns true if successful, false if an I/O error occurred
309 while reading or closing the data set.
310 If DS has not been opened, returns true without doing
313 proc_close (struct dataset *ds)
318 /* Drain any remaining cases. */
322 if (!proc_read (ds, &c))
326 ds->ok = free_case_source (ds->proc_source) && ds->ok;
327 ds->proc_source = NULL;
329 case_destroy (&ds->sink_case);
330 case_destroy (&ds->trns_case);
332 ds->ok = close_active_file (ds) && ds->ok;
338 /* Updates last_proc_invocation. */
340 update_last_proc_invocation (struct dataset *ds)
342 ds->last_proc_invocation = time (NULL);
345 /* Creates and returns a case, initializing it from the vectors
346 that say which `value's need to be initialized just once, and
347 which ones need to be re-initialized before every case. */
349 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
351 size_t var_cnt = dict_get_var_cnt (dict);
354 case_create (trns_case, dict_get_next_value_idx (dict));
355 for (i = 0; i < var_cnt; i++)
357 struct variable *v = dict_get_var (dict, i);
358 union value *value = case_data_rw (trns_case, v);
360 if (var_is_numeric (v))
361 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
363 memset (value->s, ' ', var_get_width (v));
367 /* Makes all preparations for reading from the data source and writing
370 open_active_file (struct dataset *ds)
372 add_case_limit_trns (ds);
373 add_filter_trns (ds);
375 /* Finalize transformations. */
376 trns_chain_finalize (ds->cur_trns_chain);
378 /* Make permanent_dict refer to the dictionary right before
379 data reaches the sink. */
380 if (ds->permanent_dict == NULL)
381 ds->permanent_dict = ds->dict;
383 /* Figure out whether to compact. */
385 (dict_compacting_would_shrink (ds->permanent_dict)
386 ? dict_make_compactor (ds->permanent_dict)
390 if (ds->proc_sink == NULL)
391 ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
392 if (ds->proc_sink->class->open != NULL)
393 ds->proc_sink->class->open (ds->proc_sink);
395 /* Allocate memory for lag queue. */
402 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
403 for (i = 0; i < ds->n_lag; i++)
404 case_nullify (&ds->lag_queue[i]);
408 /* Add C to the lag queue. */
410 lag_case (struct dataset *ds, const struct ccase *c)
412 if (ds->lag_count < ds->n_lag)
414 case_destroy (&ds->lag_queue[ds->lag_head]);
415 case_clone (&ds->lag_queue[ds->lag_head], c);
416 if (++ds->lag_head >= ds->n_lag)
420 /* Clears the variables in C that need to be cleared between
423 clear_case (const struct dataset *ds, struct ccase *c)
425 size_t var_cnt = dict_get_var_cnt (ds->dict);
428 for (i = 0; i < var_cnt; i++)
430 struct variable *v = dict_get_var (ds->dict, i);
431 if (!var_get_leave (v))
433 if (var_is_numeric (v))
434 case_data_rw (c, v)->f = SYSMIS;
436 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
441 /* Closes the active file. */
443 close_active_file (struct dataset *ds)
445 /* Free memory for lag queue, and turn off lagging. */
450 for (i = 0; i < ds->n_lag; i++)
451 case_destroy (&ds->lag_queue[i]);
452 free (ds->lag_queue);
456 /* Dictionary from before TEMPORARY becomes permanent. */
457 proc_cancel_temporary_transformations (ds);
459 /* Finish compacting. */
460 if (ds->compactor != NULL)
462 dict_compactor_destroy (ds->compactor);
463 dict_compact_values (ds->dict);
464 ds->compactor = NULL;
467 /* Old data sink becomes new data source. */
468 if (ds->proc_sink->class->make_source != NULL)
469 ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
470 free_case_sink (ds->proc_sink);
471 ds->proc_sink = NULL;
473 dict_clear_vectors (ds->dict);
474 ds->permanent_dict = NULL;
475 return proc_cancel_all_transformations (ds);
478 /* Returns a pointer to the lagged case from N_BEFORE cases before the
479 current one, or NULL if there haven't been that many cases yet. */
481 lagged_case (const struct dataset *ds, int n_before)
483 assert (n_before >= 1 );
484 assert (n_before <= ds->n_lag);
486 if (n_before <= ds->lag_count)
488 int index = ds->lag_head - n_before;
491 return &ds->lag_queue[index];
497 /* Procedure that separates the data into SPLIT FILE groups. */
499 /* Represents auxiliary data for handling SPLIT FILE. */
500 struct split_aux_data
502 struct dataset *dataset; /* The dataset */
503 struct ccase prev_case; /* Data in previous case. */
505 /* Callback functions. */
512 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
513 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
514 static bool split_procedure_end_func (void *, const struct dataset *);
516 /* Like procedure(), but it automatically breaks the case stream
517 into SPLIT FILE break groups. Before each group of cases with
518 identical SPLIT FILE variable values, BEGIN_FUNC is called
519 with the first case in the group.
520 Then PROC_FUNC is called for each case in the group (including
522 END_FUNC is called when the group is finished. FUNC_AUX is
523 passed to each of the functions as auxiliary data.
525 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
526 and END_FUNC will be called at all.
528 If SPLIT FILE is not in effect, then there is one break group
529 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
532 Returns true if successful, false if an I/O error occurred. */
534 procedure_with_splits (struct dataset *ds,
540 struct split_aux_data split_aux;
543 case_nullify (&split_aux.prev_case);
544 split_aux.begin = begin;
545 split_aux.proc = proc;
547 split_aux.func_aux = func_aux;
548 split_aux.dataset = ds;
550 ok = internal_procedure (ds, split_procedure_case_func,
551 split_procedure_end_func, &split_aux);
553 case_destroy (&split_aux.prev_case);
558 /* Case callback used by procedure_with_splits(). */
560 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
562 struct split_aux_data *split_aux = split_aux_;
564 /* Start a new series if needed. */
565 if (case_is_null (&split_aux->prev_case)
566 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
568 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
569 split_aux->end (split_aux->func_aux, ds);
571 case_destroy (&split_aux->prev_case);
572 case_clone (&split_aux->prev_case, c);
574 if (split_aux->begin != NULL)
575 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
578 return (split_aux->proc == NULL
579 || split_aux->proc (c, split_aux->func_aux, ds));
582 /* End-of-file callback used by procedure_with_splits(). */
584 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
586 struct split_aux_data *split_aux = split_aux_;
588 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
589 split_aux->end (split_aux->func_aux, ds);
593 /* Compares the SPLIT FILE variables in cases A and B and returns
594 nonzero only if they differ. */
596 equal_splits (const struct ccase *a, const struct ccase *b,
597 const struct dataset *ds)
599 return case_compare (a, b,
600 dict_get_split_vars (ds->dict),
601 dict_get_split_cnt (ds->dict)) == 0;
604 /* Multipass procedure that separates the data into SPLIT FILE
607 /* Represents auxiliary data for handling SPLIT FILE in a
608 multipass procedure. */
609 struct multipass_split_aux_data
611 struct dataset *dataset; /* The dataset of the split */
612 struct ccase prev_case; /* Data in previous case. */
613 struct casefile *casefile; /* Accumulates data for a split. */
614 split_func *split; /* Function to call with the accumulated
616 void *func_aux; /* Auxiliary data. */
619 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
620 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
621 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
623 /* Returns true if successful, false if an I/O error occurred. */
625 multipass_procedure_with_splits (struct dataset *ds,
629 struct multipass_split_aux_data aux;
632 case_nullify (&aux.prev_case);
635 aux.func_aux = func_aux;
638 ok = internal_procedure (ds, multipass_split_case_func,
639 multipass_split_end_func, &aux);
640 case_destroy (&aux.prev_case);
645 /* Case callback used by multipass_procedure_with_splits(). */
647 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
649 struct multipass_split_aux_data *aux = aux_;
652 /* Start a new series if needed. */
653 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
655 /* Record split values. */
656 case_destroy (&aux->prev_case);
657 case_clone (&aux->prev_case, c);
659 /* Pass any cases to split_func. */
660 if (aux->casefile != NULL)
661 ok = multipass_split_output (aux, ds);
663 /* Start a new casefile. */
665 fastfile_create (dict_get_next_value_idx (ds->dict));
668 return casefile_append (aux->casefile, c) && ok;
671 /* End-of-file callback used by multipass_procedure_with_splits(). */
673 multipass_split_end_func (void *aux_, const struct dataset *ds)
675 struct multipass_split_aux_data *aux = aux_;
676 return (aux->casefile == NULL || multipass_split_output (aux, ds));
680 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
684 assert (aux->casefile != NULL);
685 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
686 casefile_destroy (aux->casefile);
687 aux->casefile = NULL;
692 /* Discards all the current state in preparation for a data-input
693 command like DATA LIST or GET. */
695 discard_variables (struct dataset *ds)
697 dict_clear (ds->dict);
698 fh_set_default_handle (NULL);
702 free_case_source (ds->proc_source);
703 ds->proc_source = NULL;
705 proc_cancel_all_transformations (ds);
708 /* Returns the current set of permanent transformations,
709 and clears the permanent transformations.
710 For use by INPUT PROGRAM. */
712 proc_capture_transformations (struct dataset *ds)
714 struct trns_chain *chain;
716 assert (ds->temporary_trns_chain == NULL);
717 chain = ds->permanent_trns_chain;
718 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
722 /* Adds a transformation that processes a case with PROC and
723 frees itself with FREE to the current set of transformations.
724 The functions are passed AUX as auxiliary data. */
726 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
728 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
731 /* Adds a transformation that processes a case with PROC and
732 frees itself with FREE to the current set of transformations.
733 When parsing of the block of transformations is complete,
734 FINALIZE will be called.
735 The functions are passed AUX as auxiliary data. */
737 add_transformation_with_finalizer (struct dataset *ds,
738 trns_finalize_func *finalize,
739 trns_proc_func *proc,
740 trns_free_func *free, void *aux)
742 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
745 /* Returns the index of the next transformation.
746 This value can be returned by a transformation procedure
747 function to indicate a "jump" to that transformation. */
749 next_transformation (const struct dataset *ds)
751 return trns_chain_next (ds->cur_trns_chain);
754 /* Returns true if the next call to add_transformation() will add
755 a temporary transformation, false if it will add a permanent
758 proc_in_temporary_transformations (const struct dataset *ds)
760 return ds->temporary_trns_chain != NULL;
763 /* Marks the start of temporary transformations.
764 Further calls to add_transformation() will add temporary
767 proc_start_temporary_transformations (struct dataset *ds)
769 if (!proc_in_temporary_transformations (ds))
771 add_case_limit_trns (ds);
773 ds->permanent_dict = dict_clone (ds->dict);
774 trns_chain_finalize (ds->permanent_trns_chain);
775 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
779 /* Converts all the temporary transformations, if any, to
780 permanent transformations. Further transformations will be
782 Returns true if anything changed, false otherwise. */
784 proc_make_temporary_transformations_permanent (struct dataset *ds)
786 if (proc_in_temporary_transformations (ds))
788 trns_chain_finalize (ds->temporary_trns_chain);
789 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
790 ds->temporary_trns_chain = NULL;
792 dict_destroy (ds->permanent_dict);
793 ds->permanent_dict = NULL;
801 /* Cancels all temporary transformations, if any. Further
802 transformations will be permanent.
803 Returns true if anything changed, false otherwise. */
805 proc_cancel_temporary_transformations (struct dataset *ds)
807 if (proc_in_temporary_transformations (ds))
809 dict_destroy (ds->dict);
810 ds->dict = ds->permanent_dict;
811 ds->permanent_dict = NULL;
813 trns_chain_destroy (ds->temporary_trns_chain);
814 ds->temporary_trns_chain = NULL;
822 /* Cancels all transformations, if any.
823 Returns true if successful, false on I/O error. */
825 proc_cancel_all_transformations (struct dataset *ds)
828 ok = trns_chain_destroy (ds->permanent_trns_chain);
829 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
830 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
831 ds->temporary_trns_chain = NULL;
835 /* Initializes procedure handling. */
837 create_dataset (void)
839 struct dataset *ds = xzalloc (sizeof(*ds));
840 ds->dict = dict_create ();
841 proc_cancel_all_transformations (ds);
845 /* Finishes up procedure handling. */
847 destroy_dataset (struct dataset *ds)
849 discard_variables (ds);
850 dict_destroy (ds->dict);
851 trns_chain_destroy (ds->permanent_trns_chain);
855 /* Sets SINK as the destination for procedure output from the
858 proc_set_sink (struct dataset *ds, struct case_sink *sink)
860 assert (ds->proc_sink == NULL);
861 ds->proc_sink = sink;
864 /* Sets SOURCE as the source for procedure input for the next
867 proc_set_source (struct dataset *ds, struct case_source *source)
869 assert (ds->proc_source == NULL);
870 ds->proc_source = source;
873 /* Returns true if a source for the next procedure has been
874 configured, false otherwise. */
876 proc_has_source (const struct dataset *ds)
878 return ds->proc_source != NULL;
881 /* Returns the output from the previous procedure.
882 For use only immediately after executing a procedure.
883 The returned casefile is owned by the caller; it will not be
884 automatically used for the next procedure's input. */
886 proc_capture_output (struct dataset *ds)
888 struct casefile *casefile;
890 /* Try to make sure that this function is called immediately
891 after procedure() or a similar function. */
892 assert (ds->proc_source != NULL);
893 assert (case_source_is_class (ds->proc_source, &storage_source_class));
894 assert (trns_chain_is_empty (ds->permanent_trns_chain));
895 assert (!proc_in_temporary_transformations (ds));
897 casefile = storage_source_decapsulate (ds->proc_source);
898 ds->proc_source = NULL;
903 static trns_proc_func case_limit_trns_proc;
904 static trns_free_func case_limit_trns_free;
906 /* Adds a transformation that limits the number of cases that may
907 pass through, if DS->DICT has a case limit. */
909 add_case_limit_trns (struct dataset *ds)
911 size_t case_limit = dict_get_case_limit (ds->dict);
914 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
915 *cases_remaining = case_limit;
916 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
918 dict_set_case_limit (ds->dict, 0);
922 /* Limits the maximum number of cases processed to
925 case_limit_trns_proc (void *cases_remaining_,
926 struct ccase *c UNUSED, casenumber case_nr UNUSED)
928 size_t *cases_remaining = cases_remaining_;
929 if (*cases_remaining > 0)
931 (*cases_remaining)--;
932 return TRNS_CONTINUE;
935 return TRNS_DROP_CASE;
938 /* Frees the data associated with a case limit transformation. */
940 case_limit_trns_free (void *cases_remaining_)
942 size_t *cases_remaining = cases_remaining_;
943 free (cases_remaining);
947 static trns_proc_func filter_trns_proc;
949 /* Adds a temporary transformation to filter data according to
950 the variable specified on FILTER, if any. */
952 add_filter_trns (struct dataset *ds)
954 struct variable *filter_var = dict_get_filter (ds->dict);
955 if (filter_var != NULL)
957 proc_start_temporary_transformations (ds);
958 add_transformation (ds, filter_trns_proc, NULL, filter_var);
962 /* FILTER transformation. */
964 filter_trns_proc (void *filter_var_,
965 struct ccase *c UNUSED, casenumber case_nr UNUSED)
968 struct variable *filter_var = filter_var_;
969 double f = case_num (c, filter_var);
970 return (f != 0.0 && !var_is_num_missing (filter_var, f)
971 ? TRNS_CONTINUE : TRNS_DROP_CASE);
976 dataset_dict (const struct dataset *ds)
983 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
989 dataset_n_lag (const struct dataset *ds)
995 dataset_set_n_lag (struct dataset *ds, int n_lag)