1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
42 /* Procedure execution data. */
43 struct write_case_data
45 /* Function to call for each case. */
46 case_func_t case_func;
49 struct dataset *dataset; /* The dataset concerned */
50 struct ccase trns_case; /* Case used for transformations. */
51 struct ccase sink_case; /* Case written to sink, if
52 compacting is necessary. */
53 size_t cases_written; /* Cases output so far. */
57 /* Cases are read from proc_source,
58 pass through permanent_trns_chain (which transforms them into
59 the format described by permanent_dict),
60 are written to proc_sink,
61 pass through temporary_trns_chain (which transforms them into
62 the format described by dict),
63 and are finally passed to the procedure. */
64 struct case_source *proc_source;
65 struct trns_chain *permanent_trns_chain;
66 struct dictionary *permanent_dict;
67 struct case_sink *proc_sink;
68 struct trns_chain *temporary_trns_chain;
69 struct dictionary *dict;
71 /* The transformation chain that the next transformation will be
73 struct trns_chain *cur_trns_chain;
75 /* The compactor used to compact a case, if necessary;
76 otherwise a null pointer. */
77 struct dict_compactor *compactor;
79 /* Time at which proc was last invoked. */
80 time_t last_proc_invocation;
83 int n_lag; /* Number of cases to lag. */
84 int lag_count; /* Number of cases in lag_queue so far. */
85 int lag_head; /* Index where next case will be added. */
86 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
88 }; /* struct dataset */
91 struct dataset *current_dataset;
93 static void add_case_limit_trns (struct dataset *ds);
94 static void add_filter_trns (struct dataset *ds);
96 static bool internal_procedure (struct dataset *ds, case_func_t,
97 bool (*end_func) (void *),
99 static void update_last_proc_invocation (struct dataset *ds);
100 static void create_trns_case (struct ccase *, struct dictionary *);
101 static void open_active_file (struct dataset *ds);
102 static bool write_case (struct write_case_data *wc_data);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
107 /* Public functions. */
109 /* Returns the last time the data was read. */
111 time_of_last_procedure (struct dataset *ds)
113 if (ds->last_proc_invocation == 0)
114 update_last_proc_invocation (ds);
115 return ds->last_proc_invocation;
118 /* Regular procedure. */
122 /* Reads the data from the input program and writes it to a new
123 active file. For each case we read from the input program, we
126 1. Execute permanent transformations. If these drop the case,
127 start the next case from step 1.
129 2. Write case to replacement active file.
131 3. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
136 Returns true if successful, false if an I/O error occurred. */
138 procedure (struct dataset *ds, case_func_t cf, void *aux)
140 return internal_procedure (ds, cf, NULL, aux);
143 /* Multipass procedure. */
145 struct multipass_aux_data
147 struct casefile *casefile;
149 bool (*proc_func) (const struct casefile *, void *aux);
153 /* Case processing function for multipass_procedure(). */
155 multipass_case_func (const struct ccase *c, void *aux_data_)
157 struct multipass_aux_data *aux_data = aux_data_;
158 return casefile_append (aux_data->casefile, c);
161 /* End-of-file function for multipass_procedure(). */
163 multipass_end_func (void *aux_data_)
165 struct multipass_aux_data *aux_data = aux_data_;
166 return (aux_data->proc_func == NULL
167 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
170 /* Procedure that allows multiple passes over the input data.
171 The entire active file is passed to PROC_FUNC, with the given
172 AUX as auxiliary data, as a unit. */
174 multipass_procedure (struct dataset *ds, casefile_func_t proc_func, void *aux)
176 struct multipass_aux_data aux_data;
179 aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
180 aux_data.proc_func = proc_func;
183 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
184 ok = !casefile_error (aux_data.casefile) && ok;
186 casefile_destroy (aux_data.casefile);
191 /* Procedure implementation. */
194 /* Executes a procedure.
195 Passes each case to CASE_FUNC.
196 Calls END_FUNC after the last case.
197 Returns true if successful, false if an I/O error occurred (or
198 if CASE_FUNC or END_FUNC ever returned false). */
200 internal_procedure (struct dataset *ds, case_func_t case_func,
201 bool (*end_func) (void *),
204 struct write_case_data wc_data;
207 assert (ds->proc_source != NULL);
209 update_last_proc_invocation (ds);
211 /* Optimize the trivial case where we're not going to do
212 anything with the data, by not reading the data at all. */
213 if (case_func == NULL && end_func == NULL
214 && case_source_is_class (ds->proc_source, &storage_source_class)
215 && ds->proc_sink == NULL
216 && (ds->temporary_trns_chain == NULL
217 || trns_chain_is_empty (ds->temporary_trns_chain))
218 && trns_chain_is_empty (ds->permanent_trns_chain))
221 dict_set_case_limit (ds->dict, 0);
222 dict_clear_vectors (ds->dict);
226 open_active_file (ds);
228 wc_data.case_func = case_func;
230 wc_data.dataset = ds;
231 create_trns_case (&wc_data.trns_case, ds->dict);
232 case_create (&wc_data.sink_case,
233 dict_get_compacted_value_cnt (ds->dict));
234 wc_data.cases_written = 0;
236 ok = ds->proc_source->class->read (ds->proc_source,
238 write_case, &wc_data) && ok;
239 if (end_func != NULL)
240 ok = end_func (aux) && ok;
242 case_destroy (&wc_data.sink_case);
243 case_destroy (&wc_data.trns_case);
245 ok = close_active_file (ds) && ok;
250 /* Updates last_proc_invocation. */
252 update_last_proc_invocation (struct dataset *ds)
254 ds->last_proc_invocation = time (NULL);
257 /* Creates and returns a case, initializing it from the vectors
258 that say which `value's need to be initialized just once, and
259 which ones need to be re-initialized before every case. */
261 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
263 size_t var_cnt = dict_get_var_cnt (dict);
266 case_create (trns_case, dict_get_next_value_idx (dict));
267 for (i = 0; i < var_cnt; i++)
269 struct variable *v = dict_get_var (dict, i);
270 union value *value = case_data_rw (trns_case, v->fv);
272 if (v->type == NUMERIC)
273 value->f = v->leave ? 0.0 : SYSMIS;
275 memset (value->s, ' ', v->width);
279 /* Makes all preparations for reading from the data source and writing
282 open_active_file (struct dataset *ds)
284 add_case_limit_trns (ds);
285 add_filter_trns (ds);
287 /* Finalize transformations. */
288 trns_chain_finalize (ds->cur_trns_chain);
290 /* Make permanent_dict refer to the dictionary right before
291 data reaches the sink. */
292 if (ds->permanent_dict == NULL)
293 ds->permanent_dict = ds->dict;
295 /* Figure out whether to compact. */
297 (dict_compacting_would_shrink (ds->permanent_dict)
298 ? dict_make_compactor (ds->permanent_dict)
302 if (ds->proc_sink == NULL)
303 ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
304 if (ds->proc_sink->class->open != NULL)
305 ds->proc_sink->class->open (ds->proc_sink);
307 /* Allocate memory for lag queue. */
314 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
315 for (i = 0; i < ds->n_lag; i++)
316 case_nullify (&ds->lag_queue[i]);
320 /* Transforms trns_case and writes it to the replacement active
321 file if advisable. Returns true if more cases can be
322 accepted, false otherwise. Do not call this function again
323 after it has returned false once. */
325 write_case (struct write_case_data *wc_data)
327 enum trns_result retval;
330 struct dataset *ds = wc_data->dataset;
332 /* Execute permanent transformations. */
333 case_nr = wc_data->cases_written + 1;
334 retval = trns_chain_execute (ds->permanent_trns_chain,
335 &wc_data->trns_case, &case_nr);
336 if (retval != TRNS_CONTINUE)
339 /* Write case to LAG queue. */
341 lag_case (ds, &wc_data->trns_case);
343 /* Write case to replacement active file. */
344 wc_data->cases_written++;
345 if (ds->proc_sink->class->write != NULL)
347 if (ds->compactor != NULL)
349 dict_compactor_compact (ds->compactor, &wc_data->sink_case,
350 &wc_data->trns_case);
351 ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
354 ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
357 /* Execute temporary transformations. */
358 if (ds->temporary_trns_chain != NULL)
360 retval = trns_chain_execute (ds->temporary_trns_chain,
362 &wc_data->cases_written);
363 if (retval != TRNS_CONTINUE)
367 /* Pass case to procedure. */
368 if (wc_data->case_func != NULL)
369 if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
373 clear_case (ds, &wc_data->trns_case);
374 return retval != TRNS_ERROR;
377 /* Add C to the lag queue. */
379 lag_case (struct dataset *ds, const struct ccase *c)
381 if (ds->lag_count < ds->n_lag)
383 case_destroy (&ds->lag_queue[ds->lag_head]);
384 case_clone (&ds->lag_queue[ds->lag_head], c);
385 if (++ds->lag_head >= ds->n_lag)
389 /* Clears the variables in C that need to be cleared between
392 clear_case (const struct dataset *ds, struct ccase *c)
394 size_t var_cnt = dict_get_var_cnt (ds->dict);
397 for (i = 0; i < var_cnt; i++)
399 struct variable *v = dict_get_var (ds->dict, i);
402 if (v->type == NUMERIC)
403 case_data_rw (c, v->fv)->f = SYSMIS;
405 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
410 /* Closes the active file. */
412 close_active_file (struct dataset *ds)
414 /* Free memory for lag queue, and turn off lagging. */
419 for (i = 0; i < ds->n_lag; i++)
420 case_destroy (&ds->lag_queue[i]);
421 free (ds->lag_queue);
425 /* Dictionary from before TEMPORARY becomes permanent. */
426 proc_cancel_temporary_transformations (ds);
428 /* Finish compacting. */
429 if (ds->compactor != NULL)
431 dict_compactor_destroy (ds->compactor);
432 dict_compact_values (ds->dict);
433 ds->compactor = NULL;
436 /* Free data source. */
437 free_case_source (ds->proc_source);
438 ds->proc_source = NULL;
440 /* Old data sink becomes new data source. */
441 if (ds->proc_sink->class->make_source != NULL)
442 ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
443 free_case_sink (ds->proc_sink);
444 ds->proc_sink = NULL;
446 dict_clear_vectors (ds->dict);
447 ds->permanent_dict = NULL;
448 return proc_cancel_all_transformations (ds);
451 /* Returns a pointer to the lagged case from N_BEFORE cases before the
452 current one, or NULL if there haven't been that many cases yet. */
454 lagged_case (const struct dataset *ds, int n_before)
456 assert (n_before >= 1 );
457 assert (n_before <= ds->n_lag);
459 if (n_before <= ds->lag_count)
461 int index = ds->lag_head - n_before;
464 return &ds->lag_queue[index];
470 /* Procedure that separates the data into SPLIT FILE groups. */
472 /* Represents auxiliary data for handling SPLIT FILE. */
473 struct split_aux_data
475 struct dataset *dataset; /* The dataset */
476 struct ccase prev_case; /* Data in previous case. */
478 /* Callback functions. */
479 begin_func_t begin_func ;
480 case_func_t proc_func ;
481 void (*end_func) (void *);
485 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
486 static bool split_procedure_case_func (const struct ccase *c, void *);
487 static bool split_procedure_end_func (void *);
489 /* Like procedure(), but it automatically breaks the case stream
490 into SPLIT FILE break groups. Before each group of cases with
491 identical SPLIT FILE variable values, BEGIN_FUNC is called
492 with the first case in the group.
493 Then PROC_FUNC is called for each case in the group (including
495 END_FUNC is called when the group is finished. FUNC_AUX is
496 passed to each of the functions as auxiliary data.
498 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
499 and END_FUNC will be called at all.
501 If SPLIT FILE is not in effect, then there is one break group
502 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
505 Returns true if successful, false if an I/O error occurred. */
507 procedure_with_splits (struct dataset *ds,
508 begin_func_t begin_func,
509 case_func_t proc_func,
510 void (*end_func) (void *aux),
513 struct split_aux_data split_aux;
516 case_nullify (&split_aux.prev_case);
517 split_aux.begin_func = begin_func;
518 split_aux.proc_func = proc_func;
519 split_aux.end_func = end_func;
520 split_aux.func_aux = func_aux;
521 split_aux.dataset = ds;
523 ok = internal_procedure (ds, split_procedure_case_func,
524 split_procedure_end_func, &split_aux);
526 case_destroy (&split_aux.prev_case);
531 /* Case callback used by procedure_with_splits(). */
533 split_procedure_case_func (const struct ccase *c, void *split_aux_)
535 struct split_aux_data *split_aux = split_aux_;
537 /* Start a new series if needed. */
538 if (case_is_null (&split_aux->prev_case)
539 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
541 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
542 split_aux->end_func (split_aux->func_aux);
544 case_destroy (&split_aux->prev_case);
545 case_clone (&split_aux->prev_case, c);
547 if (split_aux->begin_func != NULL)
548 split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
552 return (split_aux->proc_func == NULL
553 || split_aux->proc_func (c, split_aux->func_aux));
556 /* End-of-file callback used by procedure_with_splits(). */
558 split_procedure_end_func (void *split_aux_)
560 struct split_aux_data *split_aux = split_aux_;
562 if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
563 split_aux->end_func (split_aux->func_aux);
567 /* Compares the SPLIT FILE variables in cases A and B and returns
568 nonzero only if they differ. */
570 equal_splits (const struct ccase *a, const struct ccase *b,
571 const struct dataset *ds)
573 return case_compare (a, b,
574 dict_get_split_vars (ds->dict),
575 dict_get_split_cnt (ds->dict)) == 0;
578 /* Multipass procedure that separates the data into SPLIT FILE
581 /* Represents auxiliary data for handling SPLIT FILE in a
582 multipass procedure. */
583 struct multipass_split_aux_data
585 struct dataset *dataset; /* The dataset of the split */
586 struct ccase prev_case; /* Data in previous case. */
587 struct casefile *casefile; /* Accumulates data for a split. */
589 /* Function to call with the accumulated data. */
590 bool (*split_func) (const struct ccase *first, const struct casefile *,
592 void *func_aux; /* Auxiliary data. */
595 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
596 static bool multipass_split_end_func (void *aux_);
597 static bool multipass_split_output (struct multipass_split_aux_data *);
599 /* Returns true if successful, false if an I/O error occurred. */
601 multipass_procedure_with_splits (struct dataset *ds,
602 bool (*split_func) (const struct ccase *first,
603 const struct casefile *,
607 struct multipass_split_aux_data aux;
610 case_nullify (&aux.prev_case);
612 aux.split_func = split_func;
613 aux.func_aux = func_aux;
616 ok = internal_procedure (ds, multipass_split_case_func,
617 multipass_split_end_func, &aux);
618 case_destroy (&aux.prev_case);
623 /* Case callback used by multipass_procedure_with_splits(). */
625 multipass_split_case_func (const struct ccase *c, void *aux_)
627 struct multipass_split_aux_data *aux = aux_;
628 struct dataset *ds = aux->dataset;
631 /* Start a new series if needed. */
632 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
634 /* Record split values. */
635 case_destroy (&aux->prev_case);
636 case_clone (&aux->prev_case, c);
638 /* Pass any cases to split_func. */
639 if (aux->casefile != NULL)
640 ok = multipass_split_output (aux);
642 /* Start a new casefile. */
644 fastfile_create (dict_get_next_value_idx (ds->dict));
647 return casefile_append (aux->casefile, c) && ok;
650 /* End-of-file callback used by multipass_procedure_with_splits(). */
652 multipass_split_end_func (void *aux_)
654 struct multipass_split_aux_data *aux = aux_;
655 return (aux->casefile == NULL || multipass_split_output (aux));
659 multipass_split_output (struct multipass_split_aux_data *aux)
663 assert (aux->casefile != NULL);
664 ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
665 casefile_destroy (aux->casefile);
666 aux->casefile = NULL;
671 /* Discards all the current state in preparation for a data-input
672 command like DATA LIST or GET. */
674 discard_variables (struct dataset *ds)
676 dict_clear (ds->dict);
677 fh_set_default_handle (NULL);
681 free_case_source (ds->proc_source);
682 ds->proc_source = NULL;
684 proc_cancel_all_transformations (ds);
687 /* Returns the current set of permanent transformations,
688 and clears the permanent transformations.
689 For use by INPUT PROGRAM. */
691 proc_capture_transformations (struct dataset *ds)
693 struct trns_chain *chain;
695 assert (ds->temporary_trns_chain == NULL);
696 chain = ds->permanent_trns_chain;
697 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
701 /* Adds a transformation that processes a case with PROC and
702 frees itself with FREE to the current set of transformations.
703 The functions are passed AUX as auxiliary data. */
705 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
707 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
710 /* Adds a transformation that processes a case with PROC and
711 frees itself with FREE to the current set of transformations.
712 When parsing of the block of transformations is complete,
713 FINALIZE will be called.
714 The functions are passed AUX as auxiliary data. */
716 add_transformation_with_finalizer (struct dataset *ds,
717 trns_finalize_func *finalize,
718 trns_proc_func *proc,
719 trns_free_func *free, void *aux)
721 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
724 /* Returns the index of the next transformation.
725 This value can be returned by a transformation procedure
726 function to indicate a "jump" to that transformation. */
728 next_transformation (const struct dataset *ds)
730 return trns_chain_next (ds->cur_trns_chain);
733 /* Returns true if the next call to add_transformation() will add
734 a temporary transformation, false if it will add a permanent
737 proc_in_temporary_transformations (const struct dataset *ds)
739 return ds->temporary_trns_chain != NULL;
742 /* Marks the start of temporary transformations.
743 Further calls to add_transformation() will add temporary
746 proc_start_temporary_transformations (struct dataset *ds)
748 if (!proc_in_temporary_transformations (ds))
750 add_case_limit_trns (ds);
752 ds->permanent_dict = dict_clone (ds->dict);
753 trns_chain_finalize (ds->permanent_trns_chain);
754 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
758 /* Converts all the temporary transformations, if any, to
759 permanent transformations. Further transformations will be
761 Returns true if anything changed, false otherwise. */
763 proc_make_temporary_transformations_permanent (struct dataset *ds)
765 if (proc_in_temporary_transformations (ds))
767 trns_chain_finalize (ds->temporary_trns_chain);
768 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
769 ds->temporary_trns_chain = NULL;
771 dict_destroy (ds->permanent_dict);
772 ds->permanent_dict = NULL;
780 /* Cancels all temporary transformations, if any. Further
781 transformations will be permanent.
782 Returns true if anything changed, false otherwise. */
784 proc_cancel_temporary_transformations (struct dataset *ds)
786 if (proc_in_temporary_transformations (ds))
788 dict_destroy (ds->dict);
789 ds->dict = ds->permanent_dict;
790 ds->permanent_dict = NULL;
792 trns_chain_destroy (ds->temporary_trns_chain);
793 ds->temporary_trns_chain = NULL;
801 /* Cancels all transformations, if any.
802 Returns true if successful, false on I/O error. */
804 proc_cancel_all_transformations (struct dataset *ds)
807 ok = trns_chain_destroy (ds->permanent_trns_chain);
808 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
809 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
810 ds->temporary_trns_chain = NULL;
814 /* Initializes procedure handling. */
816 create_dataset (void)
818 struct dataset *ds = xzalloc (sizeof(*ds));
819 ds->dict = dict_create ();
820 proc_cancel_all_transformations (ds);
824 /* Finishes up procedure handling. */
826 destroy_dataset (struct dataset *ds)
828 discard_variables (ds);
829 dict_destroy (ds->dict);
833 /* Sets SINK as the destination for procedure output from the
836 proc_set_sink (struct dataset *ds, struct case_sink *sink)
838 assert (ds->proc_sink == NULL);
839 ds->proc_sink = sink;
842 /* Sets SOURCE as the source for procedure input for the next
845 proc_set_source (struct dataset *ds, struct case_source *source)
847 assert (ds->proc_source == NULL);
848 ds->proc_source = source;
851 /* Returns true if a source for the next procedure has been
852 configured, false otherwise. */
854 proc_has_source (const struct dataset *ds)
856 return ds->proc_source != NULL;
859 /* Returns the output from the previous procedure.
860 For use only immediately after executing a procedure.
861 The returned casefile is owned by the caller; it will not be
862 automatically used for the next procedure's input. */
864 proc_capture_output (struct dataset *ds)
866 struct casefile *casefile;
868 /* Try to make sure that this function is called immediately
869 after procedure() or a similar function. */
870 assert (ds->proc_source != NULL);
871 assert (case_source_is_class (ds->proc_source, &storage_source_class));
872 assert (trns_chain_is_empty (ds->permanent_trns_chain));
873 assert (!proc_in_temporary_transformations (ds));
875 casefile = storage_source_decapsulate (ds->proc_source);
876 ds->proc_source = NULL;
881 static trns_proc_func case_limit_trns_proc;
882 static trns_free_func case_limit_trns_free;
884 /* Adds a transformation that limits the number of cases that may
885 pass through, if DS->DICT has a case limit. */
887 add_case_limit_trns (struct dataset *ds)
889 size_t case_limit = dict_get_case_limit (ds->dict);
892 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
893 *cases_remaining = case_limit;
894 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
896 dict_set_case_limit (ds->dict, 0);
900 /* Limits the maximum number of cases processed to
903 case_limit_trns_proc (void *cases_remaining_,
904 struct ccase *c UNUSED, casenum_t case_nr UNUSED)
906 size_t *cases_remaining = cases_remaining_;
907 if (*cases_remaining > 0)
909 (*cases_remaining)--;
910 return TRNS_CONTINUE;
913 return TRNS_DROP_CASE;
916 /* Frees the data associated with a case limit transformation. */
918 case_limit_trns_free (void *cases_remaining_)
920 size_t *cases_remaining = cases_remaining_;
921 free (cases_remaining);
925 static trns_proc_func filter_trns_proc;
927 /* Adds a temporary transformation to filter data according to
928 the variable specified on FILTER, if any. */
930 add_filter_trns (struct dataset *ds)
932 struct variable *filter_var = dict_get_filter (ds->dict);
933 if (filter_var != NULL)
935 proc_start_temporary_transformations (ds);
936 add_transformation (ds, filter_trns_proc, NULL, filter_var);
940 /* FILTER transformation. */
942 filter_trns_proc (void *filter_var_,
943 struct ccase *c UNUSED, casenum_t case_nr UNUSED)
946 struct variable *filter_var = filter_var_;
947 double f = case_num (c, filter_var->fv);
948 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
949 ? TRNS_CONTINUE : TRNS_DROP_CASE);
954 dataset_dict (const struct dataset *ds)
961 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
967 dataset_n_lag (const struct dataset *ds)
973 dataset_set_n_lag (struct dataset *ds, int n_lag)