1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
42 /* Procedure execution data. */
43 struct write_case_data
45 /* Function to call for each case. */
49 struct dataset *dataset; /* The dataset concerned */
50 struct ccase trns_case; /* Case used for transformations. */
51 struct ccase sink_case; /* Case written to sink, if
52 compacting is necessary. */
53 size_t cases_written; /* Cases output so far. */
57 /* Cases are read from proc_source,
58 pass through permanent_trns_chain (which transforms them into
59 the format described by permanent_dict),
60 are written to proc_sink,
61 pass through temporary_trns_chain (which transforms them into
62 the format described by dict),
63 and are finally passed to the procedure. */
64 struct case_source *proc_source;
65 struct trns_chain *permanent_trns_chain;
66 struct dictionary *permanent_dict;
67 struct case_sink *proc_sink;
68 struct trns_chain *temporary_trns_chain;
69 struct dictionary *dict;
71 /* The transformation chain that the next transformation will be
73 struct trns_chain *cur_trns_chain;
75 /* The compactor used to compact a case, if necessary;
76 otherwise a null pointer. */
77 struct dict_compactor *compactor;
79 /* Time at which proc was last invoked. */
80 time_t last_proc_invocation;
83 int n_lag; /* Number of cases to lag. */
84 int lag_count; /* Number of cases in lag_queue so far. */
85 int lag_head; /* Index where next case will be added. */
86 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
88 }; /* struct dataset */
91 static void add_case_limit_trns (struct dataset *ds);
92 static void add_filter_trns (struct dataset *ds);
94 static bool internal_procedure (struct dataset *ds, case_func *,
97 static void update_last_proc_invocation (struct dataset *ds);
98 static void create_trns_case (struct ccase *, struct dictionary *);
99 static void open_active_file (struct dataset *ds);
100 static bool write_case (struct write_case_data *wc_data);
101 static void lag_case (struct dataset *ds, const struct ccase *c);
102 static void clear_case (const struct dataset *ds, struct ccase *c);
103 static bool close_active_file (struct dataset *ds);
105 /* Public functions. */
107 /* Returns the last time the data was read. */
109 time_of_last_procedure (struct dataset *ds)
111 if (ds->last_proc_invocation == 0)
112 update_last_proc_invocation (ds);
113 return ds->last_proc_invocation;
116 /* Regular procedure. */
120 /* Reads the data from the input program and writes it to a new
121 active file. For each case we read from the input program, we
124 1. Execute permanent transformations. If these drop the case,
125 start the next case from step 1.
127 2. Write case to replacement active file.
129 3. Execute temporary transformations. If these drop the case,
130 start the next case from step 1.
132 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
134 Returns true if successful, false if an I/O error occurred. */
136 procedure (struct dataset *ds, case_func *cf, void *aux)
138 return internal_procedure (ds, cf, NULL, aux);
141 /* Multipass procedure. */
143 struct multipass_aux_data
145 struct casefile *casefile;
147 bool (*proc_func) (const struct casefile *, void *aux);
151 /* Case processing function for multipass_procedure(). */
153 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
155 struct multipass_aux_data *aux_data = aux_data_;
156 return casefile_append (aux_data->casefile, c);
159 /* End-of-file function for multipass_procedure(). */
161 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
163 struct multipass_aux_data *aux_data = aux_data_;
164 return (aux_data->proc_func == NULL
165 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
168 /* Procedure that allows multiple passes over the input data.
169 The entire active file is passed to PROC_FUNC, with the given
170 AUX as auxiliary data, as a unit. */
172 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
174 struct multipass_aux_data aux_data;
177 aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
178 aux_data.proc_func = proc_func;
181 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
182 ok = !casefile_error (aux_data.casefile) && ok;
184 casefile_destroy (aux_data.casefile);
189 /* Procedure implementation. */
192 /* Executes a procedure.
193 Passes each case to CASE_FUNC.
194 Calls END_FUNC after the last case.
195 Returns true if successful, false if an I/O error occurred (or
196 if CASE_FUNC or END_FUNC ever returned false). */
198 internal_procedure (struct dataset *ds, case_func *proc,
202 struct write_case_data wc_data;
205 assert (ds->proc_source != NULL);
207 update_last_proc_invocation (ds);
209 /* Optimize the trivial case where we're not going to do
210 anything with the data, by not reading the data at all. */
211 if (proc == NULL && end == NULL
212 && case_source_is_class (ds->proc_source, &storage_source_class)
213 && ds->proc_sink == NULL
214 && (ds->temporary_trns_chain == NULL
215 || trns_chain_is_empty (ds->temporary_trns_chain))
216 && trns_chain_is_empty (ds->permanent_trns_chain))
219 dict_set_case_limit (ds->dict, 0);
220 dict_clear_vectors (ds->dict);
224 open_active_file (ds);
228 wc_data.dataset = ds;
229 create_trns_case (&wc_data.trns_case, ds->dict);
230 case_create (&wc_data.sink_case,
231 dict_get_compacted_value_cnt (ds->dict));
232 wc_data.cases_written = 0;
234 ok = ds->proc_source->class->read (ds->proc_source,
236 write_case, &wc_data) && ok;
238 ok = end (aux, ds) && ok;
240 case_destroy (&wc_data.sink_case);
241 case_destroy (&wc_data.trns_case);
243 ok = close_active_file (ds) && ok;
248 /* Updates last_proc_invocation. */
250 update_last_proc_invocation (struct dataset *ds)
252 ds->last_proc_invocation = time (NULL);
255 /* Creates and returns a case, initializing it from the vectors
256 that say which `value's need to be initialized just once, and
257 which ones need to be re-initialized before every case. */
259 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
261 size_t var_cnt = dict_get_var_cnt (dict);
264 case_create (trns_case, dict_get_next_value_idx (dict));
265 for (i = 0; i < var_cnt; i++)
267 struct variable *v = dict_get_var (dict, i);
268 union value *value = case_data_rw (trns_case, v->fv);
270 if (v->type == NUMERIC)
271 value->f = v->leave ? 0.0 : SYSMIS;
273 memset (value->s, ' ', v->width);
277 /* Makes all preparations for reading from the data source and writing
280 open_active_file (struct dataset *ds)
282 add_case_limit_trns (ds);
283 add_filter_trns (ds);
285 /* Finalize transformations. */
286 trns_chain_finalize (ds->cur_trns_chain);
288 /* Make permanent_dict refer to the dictionary right before
289 data reaches the sink. */
290 if (ds->permanent_dict == NULL)
291 ds->permanent_dict = ds->dict;
293 /* Figure out whether to compact. */
295 (dict_compacting_would_shrink (ds->permanent_dict)
296 ? dict_make_compactor (ds->permanent_dict)
300 if (ds->proc_sink == NULL)
301 ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
302 if (ds->proc_sink->class->open != NULL)
303 ds->proc_sink->class->open (ds->proc_sink);
305 /* Allocate memory for lag queue. */
312 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
313 for (i = 0; i < ds->n_lag; i++)
314 case_nullify (&ds->lag_queue[i]);
318 /* Transforms trns_case and writes it to the replacement active
319 file if advisable. Returns true if more cases can be
320 accepted, false otherwise. Do not call this function again
321 after it has returned false once. */
323 write_case (struct write_case_data *wc_data)
325 enum trns_result retval;
328 struct dataset *ds = wc_data->dataset;
330 /* Execute permanent transformations. */
331 case_nr = wc_data->cases_written + 1;
332 retval = trns_chain_execute (ds->permanent_trns_chain,
333 &wc_data->trns_case, &case_nr);
334 if (retval != TRNS_CONTINUE)
337 /* Write case to LAG queue. */
339 lag_case (ds, &wc_data->trns_case);
341 /* Write case to replacement active file. */
342 wc_data->cases_written++;
343 if (ds->proc_sink->class->write != NULL)
345 if (ds->compactor != NULL)
347 dict_compactor_compact (ds->compactor, &wc_data->sink_case,
348 &wc_data->trns_case);
349 ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
352 ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
355 /* Execute temporary transformations. */
356 if (ds->temporary_trns_chain != NULL)
358 retval = trns_chain_execute (ds->temporary_trns_chain,
360 &wc_data->cases_written);
361 if (retval != TRNS_CONTINUE)
365 /* Pass case to procedure. */
366 if (wc_data->proc != NULL)
367 if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
371 clear_case (ds, &wc_data->trns_case);
372 return retval != TRNS_ERROR;
375 /* Add C to the lag queue. */
377 lag_case (struct dataset *ds, const struct ccase *c)
379 if (ds->lag_count < ds->n_lag)
381 case_destroy (&ds->lag_queue[ds->lag_head]);
382 case_clone (&ds->lag_queue[ds->lag_head], c);
383 if (++ds->lag_head >= ds->n_lag)
387 /* Clears the variables in C that need to be cleared between
390 clear_case (const struct dataset *ds, struct ccase *c)
392 size_t var_cnt = dict_get_var_cnt (ds->dict);
395 for (i = 0; i < var_cnt; i++)
397 struct variable *v = dict_get_var (ds->dict, i);
400 if (v->type == NUMERIC)
401 case_data_rw (c, v->fv)->f = SYSMIS;
403 memset (case_data_rw (c, v->fv)->s, ' ', v->width);
408 /* Closes the active file. */
410 close_active_file (struct dataset *ds)
412 /* Free memory for lag queue, and turn off lagging. */
417 for (i = 0; i < ds->n_lag; i++)
418 case_destroy (&ds->lag_queue[i]);
419 free (ds->lag_queue);
423 /* Dictionary from before TEMPORARY becomes permanent. */
424 proc_cancel_temporary_transformations (ds);
426 /* Finish compacting. */
427 if (ds->compactor != NULL)
429 dict_compactor_destroy (ds->compactor);
430 dict_compact_values (ds->dict);
431 ds->compactor = NULL;
434 /* Free data source. */
435 free_case_source (ds->proc_source);
436 ds->proc_source = NULL;
438 /* Old data sink becomes new data source. */
439 if (ds->proc_sink->class->make_source != NULL)
440 ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
441 free_case_sink (ds->proc_sink);
442 ds->proc_sink = NULL;
444 dict_clear_vectors (ds->dict);
445 ds->permanent_dict = NULL;
446 return proc_cancel_all_transformations (ds);
449 /* Returns a pointer to the lagged case from N_BEFORE cases before the
450 current one, or NULL if there haven't been that many cases yet. */
452 lagged_case (const struct dataset *ds, int n_before)
454 assert (n_before >= 1 );
455 assert (n_before <= ds->n_lag);
457 if (n_before <= ds->lag_count)
459 int index = ds->lag_head - n_before;
462 return &ds->lag_queue[index];
468 /* Procedure that separates the data into SPLIT FILE groups. */
470 /* Represents auxiliary data for handling SPLIT FILE. */
471 struct split_aux_data
473 struct dataset *dataset; /* The dataset */
474 struct ccase prev_case; /* Data in previous case. */
476 /* Callback functions. */
483 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
484 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
485 static bool split_procedure_end_func (void *, const struct dataset *);
487 /* Like procedure(), but it automatically breaks the case stream
488 into SPLIT FILE break groups. Before each group of cases with
489 identical SPLIT FILE variable values, BEGIN_FUNC is called
490 with the first case in the group.
491 Then PROC_FUNC is called for each case in the group (including
493 END_FUNC is called when the group is finished. FUNC_AUX is
494 passed to each of the functions as auxiliary data.
496 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
497 and END_FUNC will be called at all.
499 If SPLIT FILE is not in effect, then there is one break group
500 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
503 Returns true if successful, false if an I/O error occurred. */
505 procedure_with_splits (struct dataset *ds,
511 struct split_aux_data split_aux;
514 case_nullify (&split_aux.prev_case);
515 split_aux.begin = begin;
516 split_aux.proc = proc;
518 split_aux.func_aux = func_aux;
519 split_aux.dataset = ds;
521 ok = internal_procedure (ds, split_procedure_case_func,
522 split_procedure_end_func, &split_aux);
524 case_destroy (&split_aux.prev_case);
529 /* Case callback used by procedure_with_splits(). */
531 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
533 struct split_aux_data *split_aux = split_aux_;
535 /* Start a new series if needed. */
536 if (case_is_null (&split_aux->prev_case)
537 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
539 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
540 split_aux->end (split_aux->func_aux, ds);
542 case_destroy (&split_aux->prev_case);
543 case_clone (&split_aux->prev_case, c);
545 if (split_aux->begin != NULL)
546 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
549 return (split_aux->proc == NULL
550 || split_aux->proc (c, split_aux->func_aux, ds));
553 /* End-of-file callback used by procedure_with_splits(). */
555 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
557 struct split_aux_data *split_aux = split_aux_;
559 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
560 split_aux->end (split_aux->func_aux, ds);
564 /* Compares the SPLIT FILE variables in cases A and B and returns
565 nonzero only if they differ. */
567 equal_splits (const struct ccase *a, const struct ccase *b,
568 const struct dataset *ds)
570 return case_compare (a, b,
571 dict_get_split_vars (ds->dict),
572 dict_get_split_cnt (ds->dict)) == 0;
575 /* Multipass procedure that separates the data into SPLIT FILE
578 /* Represents auxiliary data for handling SPLIT FILE in a
579 multipass procedure. */
580 struct multipass_split_aux_data
582 struct dataset *dataset; /* The dataset of the split */
583 struct ccase prev_case; /* Data in previous case. */
584 struct casefile *casefile; /* Accumulates data for a split. */
585 split_func *split; /* Function to call with the accumulated
587 void *func_aux; /* Auxiliary data. */
590 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
591 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
592 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
594 /* Returns true if successful, false if an I/O error occurred. */
596 multipass_procedure_with_splits (struct dataset *ds,
600 struct multipass_split_aux_data aux;
603 case_nullify (&aux.prev_case);
606 aux.func_aux = func_aux;
609 ok = internal_procedure (ds, multipass_split_case_func,
610 multipass_split_end_func, &aux);
611 case_destroy (&aux.prev_case);
616 /* Case callback used by multipass_procedure_with_splits(). */
618 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
620 struct multipass_split_aux_data *aux = aux_;
623 /* Start a new series if needed. */
624 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
626 /* Record split values. */
627 case_destroy (&aux->prev_case);
628 case_clone (&aux->prev_case, c);
630 /* Pass any cases to split_func. */
631 if (aux->casefile != NULL)
632 ok = multipass_split_output (aux, ds);
634 /* Start a new casefile. */
636 fastfile_create (dict_get_next_value_idx (ds->dict));
639 return casefile_append (aux->casefile, c) && ok;
642 /* End-of-file callback used by multipass_procedure_with_splits(). */
644 multipass_split_end_func (void *aux_, const struct dataset *ds)
646 struct multipass_split_aux_data *aux = aux_;
647 return (aux->casefile == NULL || multipass_split_output (aux, ds));
651 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
655 assert (aux->casefile != NULL);
656 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
657 casefile_destroy (aux->casefile);
658 aux->casefile = NULL;
663 /* Discards all the current state in preparation for a data-input
664 command like DATA LIST or GET. */
666 discard_variables (struct dataset *ds)
668 dict_clear (ds->dict);
669 fh_set_default_handle (NULL);
673 free_case_source (ds->proc_source);
674 ds->proc_source = NULL;
676 proc_cancel_all_transformations (ds);
679 /* Returns the current set of permanent transformations,
680 and clears the permanent transformations.
681 For use by INPUT PROGRAM. */
683 proc_capture_transformations (struct dataset *ds)
685 struct trns_chain *chain;
687 assert (ds->temporary_trns_chain == NULL);
688 chain = ds->permanent_trns_chain;
689 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
693 /* Adds a transformation that processes a case with PROC and
694 frees itself with FREE to the current set of transformations.
695 The functions are passed AUX as auxiliary data. */
697 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
699 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
702 /* Adds a transformation that processes a case with PROC and
703 frees itself with FREE to the current set of transformations.
704 When parsing of the block of transformations is complete,
705 FINALIZE will be called.
706 The functions are passed AUX as auxiliary data. */
708 add_transformation_with_finalizer (struct dataset *ds,
709 trns_finalize_func *finalize,
710 trns_proc_func *proc,
711 trns_free_func *free, void *aux)
713 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
716 /* Returns the index of the next transformation.
717 This value can be returned by a transformation procedure
718 function to indicate a "jump" to that transformation. */
720 next_transformation (const struct dataset *ds)
722 return trns_chain_next (ds->cur_trns_chain);
725 /* Returns true if the next call to add_transformation() will add
726 a temporary transformation, false if it will add a permanent
729 proc_in_temporary_transformations (const struct dataset *ds)
731 return ds->temporary_trns_chain != NULL;
734 /* Marks the start of temporary transformations.
735 Further calls to add_transformation() will add temporary
738 proc_start_temporary_transformations (struct dataset *ds)
740 if (!proc_in_temporary_transformations (ds))
742 add_case_limit_trns (ds);
744 ds->permanent_dict = dict_clone (ds->dict);
745 trns_chain_finalize (ds->permanent_trns_chain);
746 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
750 /* Converts all the temporary transformations, if any, to
751 permanent transformations. Further transformations will be
753 Returns true if anything changed, false otherwise. */
755 proc_make_temporary_transformations_permanent (struct dataset *ds)
757 if (proc_in_temporary_transformations (ds))
759 trns_chain_finalize (ds->temporary_trns_chain);
760 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
761 ds->temporary_trns_chain = NULL;
763 dict_destroy (ds->permanent_dict);
764 ds->permanent_dict = NULL;
772 /* Cancels all temporary transformations, if any. Further
773 transformations will be permanent.
774 Returns true if anything changed, false otherwise. */
776 proc_cancel_temporary_transformations (struct dataset *ds)
778 if (proc_in_temporary_transformations (ds))
780 dict_destroy (ds->dict);
781 ds->dict = ds->permanent_dict;
782 ds->permanent_dict = NULL;
784 trns_chain_destroy (ds->temporary_trns_chain);
785 ds->temporary_trns_chain = NULL;
793 /* Cancels all transformations, if any.
794 Returns true if successful, false on I/O error. */
796 proc_cancel_all_transformations (struct dataset *ds)
799 ok = trns_chain_destroy (ds->permanent_trns_chain);
800 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
801 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
802 ds->temporary_trns_chain = NULL;
806 /* Initializes procedure handling. */
808 create_dataset (void)
810 struct dataset *ds = xzalloc (sizeof(*ds));
811 ds->dict = dict_create ();
812 proc_cancel_all_transformations (ds);
816 /* Finishes up procedure handling. */
818 destroy_dataset (struct dataset *ds)
820 discard_variables (ds);
821 dict_destroy (ds->dict);
822 trns_chain_destroy (ds->permanent_trns_chain);
826 /* Sets SINK as the destination for procedure output from the
829 proc_set_sink (struct dataset *ds, struct case_sink *sink)
831 assert (ds->proc_sink == NULL);
832 ds->proc_sink = sink;
835 /* Sets SOURCE as the source for procedure input for the next
838 proc_set_source (struct dataset *ds, struct case_source *source)
840 assert (ds->proc_source == NULL);
841 ds->proc_source = source;
844 /* Returns true if a source for the next procedure has been
845 configured, false otherwise. */
847 proc_has_source (const struct dataset *ds)
849 return ds->proc_source != NULL;
852 /* Returns the output from the previous procedure.
853 For use only immediately after executing a procedure.
854 The returned casefile is owned by the caller; it will not be
855 automatically used for the next procedure's input. */
857 proc_capture_output (struct dataset *ds)
859 struct casefile *casefile;
861 /* Try to make sure that this function is called immediately
862 after procedure() or a similar function. */
863 assert (ds->proc_source != NULL);
864 assert (case_source_is_class (ds->proc_source, &storage_source_class));
865 assert (trns_chain_is_empty (ds->permanent_trns_chain));
866 assert (!proc_in_temporary_transformations (ds));
868 casefile = storage_source_decapsulate (ds->proc_source);
869 ds->proc_source = NULL;
874 static trns_proc_func case_limit_trns_proc;
875 static trns_free_func case_limit_trns_free;
877 /* Adds a transformation that limits the number of cases that may
878 pass through, if DS->DICT has a case limit. */
880 add_case_limit_trns (struct dataset *ds)
882 size_t case_limit = dict_get_case_limit (ds->dict);
885 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
886 *cases_remaining = case_limit;
887 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
889 dict_set_case_limit (ds->dict, 0);
893 /* Limits the maximum number of cases processed to
896 case_limit_trns_proc (void *cases_remaining_,
897 struct ccase *c UNUSED, casenumber case_nr UNUSED)
899 size_t *cases_remaining = cases_remaining_;
900 if (*cases_remaining > 0)
902 (*cases_remaining)--;
903 return TRNS_CONTINUE;
906 return TRNS_DROP_CASE;
909 /* Frees the data associated with a case limit transformation. */
911 case_limit_trns_free (void *cases_remaining_)
913 size_t *cases_remaining = cases_remaining_;
914 free (cases_remaining);
918 static trns_proc_func filter_trns_proc;
920 /* Adds a temporary transformation to filter data according to
921 the variable specified on FILTER, if any. */
923 add_filter_trns (struct dataset *ds)
925 struct variable *filter_var = dict_get_filter (ds->dict);
926 if (filter_var != NULL)
928 proc_start_temporary_transformations (ds);
929 add_transformation (ds, filter_trns_proc, NULL, filter_var);
933 /* FILTER transformation. */
935 filter_trns_proc (void *filter_var_,
936 struct ccase *c UNUSED, casenumber case_nr UNUSED)
939 struct variable *filter_var = filter_var_;
940 double f = case_num (c, filter_var->fv);
941 return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
942 ? TRNS_CONTINUE : TRNS_DROP_CASE);
947 dataset_dict (const struct dataset *ds)
954 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
960 dataset_n_lag (const struct dataset *ds)
966 dataset_set_n_lag (struct dataset *ds, int n_lag)