1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
43 /* An abstract factory which creates casefiles */
44 struct casefile_factory *cf_factory;
46 /* Callback which occurs when a procedure provides a new source for
48 replace_source_callback *replace_source ;
50 /* Callback which occurs whenever the DICT is replaced by a new one */
51 replace_dictionary_callback *replace_dict;
53 /* Cases are read from proc_source,
54 pass through permanent_trns_chain (which transforms them into
55 the format described by permanent_dict),
56 are written to proc_sink,
57 pass through temporary_trns_chain (which transforms them into
58 the format described by dict),
59 and are finally passed to the procedure. */
60 struct case_source *proc_source;
61 struct trns_chain *permanent_trns_chain;
62 struct dictionary *permanent_dict;
63 struct case_sink *proc_sink;
64 struct trns_chain *temporary_trns_chain;
65 struct dictionary *dict;
67 /* The transformation chain that the next transformation will be
69 struct trns_chain *cur_trns_chain;
71 /* The compactor used to compact a case, if necessary;
72 otherwise a null pointer. */
73 struct dict_compactor *compactor;
75 /* Time at which proc was last invoked. */
76 time_t last_proc_invocation;
79 int n_lag; /* Number of cases to lag. */
80 int lag_count; /* Number of cases in lag_queue so far. */
81 int lag_head; /* Index where next case will be added. */
82 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
85 bool is_open; /* Procedure open? */
86 struct ccase trns_case; /* Case used for transformations. */
87 struct ccase sink_case; /* Case written to sink, if
88 compacting is necessary. */
89 size_t cases_written; /* Cases output so far. */
91 }; /* struct dataset */
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
97 static bool internal_procedure (struct dataset *ds, case_func *,
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
107 /* Public functions. */
109 /* Returns the last time the data was read. */
111 time_of_last_procedure (struct dataset *ds)
113 if (ds->last_proc_invocation == 0)
114 update_last_proc_invocation (ds);
115 return ds->last_proc_invocation;
118 /* Regular procedure. */
122 /* Reads the data from the input program and writes it to a new
123 active file. For each case we read from the input program, we
126 1. Execute permanent transformations. If these drop the case,
127 start the next case from step 1.
129 2. Write case to replacement active file.
131 3. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
136 Returns true if successful, false if an I/O error occurred. */
138 procedure (struct dataset *ds, case_func *cf, void *aux)
140 update_last_proc_invocation (ds);
142 /* Optimize the trivial case where we're not going to do
143 anything with the data, by not reading the data at all. */
145 && case_source_is_class (ds->proc_source, &storage_source_class)
146 && ds->proc_sink == NULL
147 && (ds->temporary_trns_chain == NULL
148 || trns_chain_is_empty (ds->temporary_trns_chain))
149 && trns_chain_is_empty (ds->permanent_trns_chain))
152 dict_set_case_limit (ds->dict, 0);
153 dict_clear_vectors (ds->dict);
157 return internal_procedure (ds, cf, NULL, aux);
160 /* Multipass procedure. */
162 struct multipass_aux_data
164 struct casefile *casefile;
166 bool (*proc_func) (const struct casefile *, void *aux);
170 /* Case processing function for multipass_procedure(). */
172 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
174 struct multipass_aux_data *aux_data = aux_data_;
175 return casefile_append (aux_data->casefile, c);
178 /* End-of-file function for multipass_procedure(). */
180 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
182 struct multipass_aux_data *aux_data = aux_data_;
183 return (aux_data->proc_func == NULL
184 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
187 /* Procedure that allows multiple passes over the input data.
188 The entire active file is passed to PROC_FUNC, with the given
189 AUX as auxiliary data, as a unit. */
191 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
193 struct multipass_aux_data aux_data;
197 ds->cf_factory->create_casefile (ds->cf_factory,
198 dict_get_next_value_idx (ds->dict));
200 aux_data.proc_func = proc_func;
203 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
204 ok = !casefile_error (aux_data.casefile) && ok;
206 casefile_destroy (aux_data.casefile);
212 /* Procedure implementation. */
214 /* Executes a procedure.
215 Passes each case to CASE_FUNC.
216 Calls END_FUNC after the last case.
217 Returns true if successful, false if an I/O error occurred (or
218 if CASE_FUNC or END_FUNC ever returned false). */
220 internal_procedure (struct dataset *ds, case_func *proc,
228 while (ok && proc_read (ds, &c))
230 ok = proc (c, aux, ds) && ok;
232 ok = end (aux, ds) && ok;
234 if ( proc_close (ds) && ok )
236 if ( ds->replace_source )
237 ds->replace_source (ds->proc_source);
245 /* Opens dataset DS for reading cases with proc_read.
246 proc_close must be called when done. */
248 proc_open (struct dataset *ds)
250 assert (ds->proc_source != NULL);
251 assert (!ds->is_open);
253 update_last_proc_invocation (ds);
255 open_active_file (ds);
258 create_trns_case (&ds->trns_case, ds->dict);
259 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
260 ds->cases_written = 0;
264 /* Reads the next case from dataset DS, which must have been
265 opened for reading with proc_open.
266 Returns true if successful, in which case a pointer to the
267 case is stored in *C.
268 Return false at end of file or if a read error occurs. In
269 this case a null pointer is stored in *C. */
271 proc_read (struct dataset *ds, struct ccase **c)
273 enum trns_result retval = TRNS_DROP_CASE;
275 assert (ds->is_open);
281 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
282 if (retval == TRNS_ERROR)
287 /* Read a case from proc_source. */
288 clear_case (ds, &ds->trns_case);
289 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
292 /* Execute permanent transformations. */
293 case_nr = ds->cases_written + 1;
294 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
295 &ds->trns_case, &case_nr);
296 if (retval != TRNS_CONTINUE)
299 /* Write case to LAG queue. */
301 lag_case (ds, &ds->trns_case);
303 /* Write case to replacement active file. */
305 if (ds->proc_sink->class->write != NULL)
307 if (ds->compactor != NULL)
309 dict_compactor_compact (ds->compactor, &ds->sink_case,
311 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
314 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
317 /* Execute temporary transformations. */
318 if (ds->temporary_trns_chain != NULL)
320 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
321 &ds->trns_case, &ds->cases_written);
322 if (retval != TRNS_CONTINUE)
331 /* Closes dataset DS for reading.
332 Returns true if successful, false if an I/O error occurred
333 while reading or closing the data set.
334 If DS has not been opened, returns true without doing
337 proc_close (struct dataset *ds)
342 /* Drain any remaining cases. */
346 if (!proc_read (ds, &c))
349 ds->ok = free_case_source (ds->proc_source) && ds->ok;
350 ds->proc_source = NULL;
352 case_destroy (&ds->sink_case);
353 case_destroy (&ds->trns_case);
355 ds->ok = close_active_file (ds) && ds->ok;
361 /* Updates last_proc_invocation. */
363 update_last_proc_invocation (struct dataset *ds)
365 ds->last_proc_invocation = time (NULL);
368 /* Creates and returns a case, initializing it from the vectors
369 that say which `value's need to be initialized just once, and
370 which ones need to be re-initialized before every case. */
372 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
374 size_t var_cnt = dict_get_var_cnt (dict);
377 case_create (trns_case, dict_get_next_value_idx (dict));
378 for (i = 0; i < var_cnt; i++)
380 struct variable *v = dict_get_var (dict, i);
381 union value *value = case_data_rw (trns_case, v);
383 if (var_is_numeric (v))
384 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
386 memset (value->s, ' ', var_get_width (v));
390 /* Makes all preparations for reading from the data source and writing
393 open_active_file (struct dataset *ds)
395 add_case_limit_trns (ds);
396 add_filter_trns (ds);
398 /* Finalize transformations. */
399 trns_chain_finalize (ds->cur_trns_chain);
401 /* Make permanent_dict refer to the dictionary right before
402 data reaches the sink. */
403 if (ds->permanent_dict == NULL)
404 ds->permanent_dict = ds->dict;
406 /* Figure out whether to compact. */
408 (dict_compacting_would_shrink (ds->permanent_dict)
409 ? dict_make_compactor (ds->permanent_dict)
413 if (ds->proc_sink == NULL)
414 ds->proc_sink = create_case_sink (&storage_sink_class,
418 if (ds->proc_sink->class->open != NULL)
419 ds->proc_sink->class->open (ds->proc_sink);
421 /* Allocate memory for lag queue. */
428 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
429 for (i = 0; i < ds->n_lag; i++)
430 case_nullify (&ds->lag_queue[i]);
434 /* Add C to the lag queue. */
436 lag_case (struct dataset *ds, const struct ccase *c)
438 if (ds->lag_count < ds->n_lag)
440 case_destroy (&ds->lag_queue[ds->lag_head]);
441 case_clone (&ds->lag_queue[ds->lag_head], c);
442 if (++ds->lag_head >= ds->n_lag)
446 /* Clears the variables in C that need to be cleared between
449 clear_case (const struct dataset *ds, struct ccase *c)
451 size_t var_cnt = dict_get_var_cnt (ds->dict);
454 for (i = 0; i < var_cnt; i++)
456 struct variable *v = dict_get_var (ds->dict, i);
457 if (!var_get_leave (v))
459 if (var_is_numeric (v))
460 case_data_rw (c, v)->f = SYSMIS;
462 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
467 /* Closes the active file. */
469 close_active_file (struct dataset *ds)
471 /* Free memory for lag queue, and turn off lagging. */
476 for (i = 0; i < ds->n_lag; i++)
477 case_destroy (&ds->lag_queue[i]);
478 free (ds->lag_queue);
482 /* Dictionary from before TEMPORARY becomes permanent. */
483 proc_cancel_temporary_transformations (ds);
485 /* Finish compacting. */
486 if (ds->compactor != NULL)
488 dict_compactor_destroy (ds->compactor);
489 dict_compact_values (ds->dict);
490 ds->compactor = NULL;
493 /* Old data sink becomes new data source. */
494 if (ds->proc_sink->class->make_source != NULL)
495 ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
496 free_case_sink (ds->proc_sink);
497 ds->proc_sink = NULL;
499 dict_clear_vectors (ds->dict);
500 ds->permanent_dict = NULL;
501 return proc_cancel_all_transformations (ds);
504 /* Returns a pointer to the lagged case from N_BEFORE cases before the
505 current one, or NULL if there haven't been that many cases yet. */
507 lagged_case (const struct dataset *ds, int n_before)
509 assert (n_before >= 1 );
510 assert (n_before <= ds->n_lag);
512 if (n_before <= ds->lag_count)
514 int index = ds->lag_head - n_before;
517 return &ds->lag_queue[index];
523 /* Procedure that separates the data into SPLIT FILE groups. */
525 /* Represents auxiliary data for handling SPLIT FILE. */
526 struct split_aux_data
528 struct dataset *dataset; /* The dataset */
529 struct ccase prev_case; /* Data in previous case. */
531 /* Callback functions. */
538 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
539 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
540 static bool split_procedure_end_func (void *, const struct dataset *);
542 /* Like procedure(), but it automatically breaks the case stream
543 into SPLIT FILE break groups. Before each group of cases with
544 identical SPLIT FILE variable values, BEGIN_FUNC is called
545 with the first case in the group.
546 Then PROC_FUNC is called for each case in the group (including
548 END_FUNC is called when the group is finished. FUNC_AUX is
549 passed to each of the functions as auxiliary data.
551 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
552 and END_FUNC will be called at all.
554 If SPLIT FILE is not in effect, then there is one break group
555 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
558 Returns true if successful, false if an I/O error occurred. */
560 procedure_with_splits (struct dataset *ds,
566 struct split_aux_data split_aux;
569 case_nullify (&split_aux.prev_case);
570 split_aux.begin = begin;
571 split_aux.proc = proc;
573 split_aux.func_aux = func_aux;
574 split_aux.dataset = ds;
576 ok = internal_procedure (ds, split_procedure_case_func,
577 split_procedure_end_func, &split_aux);
579 case_destroy (&split_aux.prev_case);
584 /* Case callback used by procedure_with_splits(). */
586 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
588 struct split_aux_data *split_aux = split_aux_;
590 /* Start a new series if needed. */
591 if (case_is_null (&split_aux->prev_case)
592 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
594 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
595 split_aux->end (split_aux->func_aux, ds);
597 case_destroy (&split_aux->prev_case);
598 case_clone (&split_aux->prev_case, c);
600 if (split_aux->begin != NULL)
601 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
604 return (split_aux->proc == NULL
605 || split_aux->proc (c, split_aux->func_aux, ds));
608 /* End-of-file callback used by procedure_with_splits(). */
610 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
612 struct split_aux_data *split_aux = split_aux_;
614 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
615 split_aux->end (split_aux->func_aux, ds);
619 /* Compares the SPLIT FILE variables in cases A and B and returns
620 nonzero only if they differ. */
622 equal_splits (const struct ccase *a, const struct ccase *b,
623 const struct dataset *ds)
625 return case_compare (a, b,
626 dict_get_split_vars (ds->dict),
627 dict_get_split_cnt (ds->dict)) == 0;
630 /* Multipass procedure that separates the data into SPLIT FILE
633 /* Represents auxiliary data for handling SPLIT FILE in a
634 multipass procedure. */
635 struct multipass_split_aux_data
637 struct dataset *dataset; /* The dataset of the split */
638 struct ccase prev_case; /* Data in previous case. */
639 struct casefile *casefile; /* Accumulates data for a split. */
640 split_func *split; /* Function to call with the accumulated
642 void *func_aux; /* Auxiliary data. */
645 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
646 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
647 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
649 /* Returns true if successful, false if an I/O error occurred. */
651 multipass_procedure_with_splits (struct dataset *ds,
655 struct multipass_split_aux_data aux;
658 case_nullify (&aux.prev_case);
661 aux.func_aux = func_aux;
664 ok = internal_procedure (ds, multipass_split_case_func,
665 multipass_split_end_func, &aux);
666 case_destroy (&aux.prev_case);
671 /* Case callback used by multipass_procedure_with_splits(). */
673 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
675 struct multipass_split_aux_data *aux = aux_;
678 /* Start a new series if needed. */
679 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
681 /* Record split values. */
682 case_destroy (&aux->prev_case);
683 case_clone (&aux->prev_case, c);
685 /* Pass any cases to split_func. */
686 if (aux->casefile != NULL)
687 ok = multipass_split_output (aux, ds);
689 /* Start a new casefile. */
691 ds->cf_factory->create_casefile (ds->cf_factory,
692 dict_get_next_value_idx (ds->dict));
695 return casefile_append (aux->casefile, c) && ok;
698 /* End-of-file callback used by multipass_procedure_with_splits(). */
700 multipass_split_end_func (void *aux_, const struct dataset *ds)
702 struct multipass_split_aux_data *aux = aux_;
703 return (aux->casefile == NULL || multipass_split_output (aux, ds));
707 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
711 assert (aux->casefile != NULL);
712 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
713 casefile_destroy (aux->casefile);
714 aux->casefile = NULL;
719 /* Discards all the current state in preparation for a data-input
720 command like DATA LIST or GET. */
722 discard_variables (struct dataset *ds)
724 dict_clear (ds->dict);
725 fh_set_default_handle (NULL);
729 free_case_source (ds->proc_source);
730 ds->proc_source = NULL;
731 if ( ds->replace_source )
732 ds->replace_source (ds->proc_source);
735 proc_cancel_all_transformations (ds);
738 /* Returns the current set of permanent transformations,
739 and clears the permanent transformations.
740 For use by INPUT PROGRAM. */
742 proc_capture_transformations (struct dataset *ds)
744 struct trns_chain *chain;
746 assert (ds->temporary_trns_chain == NULL);
747 chain = ds->permanent_trns_chain;
748 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
752 /* Adds a transformation that processes a case with PROC and
753 frees itself with FREE to the current set of transformations.
754 The functions are passed AUX as auxiliary data. */
756 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
758 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
761 /* Adds a transformation that processes a case with PROC and
762 frees itself with FREE to the current set of transformations.
763 When parsing of the block of transformations is complete,
764 FINALIZE will be called.
765 The functions are passed AUX as auxiliary data. */
767 add_transformation_with_finalizer (struct dataset *ds,
768 trns_finalize_func *finalize,
769 trns_proc_func *proc,
770 trns_free_func *free, void *aux)
772 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
775 /* Returns the index of the next transformation.
776 This value can be returned by a transformation procedure
777 function to indicate a "jump" to that transformation. */
779 next_transformation (const struct dataset *ds)
781 return trns_chain_next (ds->cur_trns_chain);
784 /* Returns true if the next call to add_transformation() will add
785 a temporary transformation, false if it will add a permanent
788 proc_in_temporary_transformations (const struct dataset *ds)
790 return ds->temporary_trns_chain != NULL;
793 /* Marks the start of temporary transformations.
794 Further calls to add_transformation() will add temporary
797 proc_start_temporary_transformations (struct dataset *ds)
799 if (!proc_in_temporary_transformations (ds))
801 add_case_limit_trns (ds);
803 ds->permanent_dict = dict_clone (ds->dict);
805 trns_chain_finalize (ds->permanent_trns_chain);
806 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
810 /* Converts all the temporary transformations, if any, to
811 permanent transformations. Further transformations will be
813 Returns true if anything changed, false otherwise. */
815 proc_make_temporary_transformations_permanent (struct dataset *ds)
817 if (proc_in_temporary_transformations (ds))
819 trns_chain_finalize (ds->temporary_trns_chain);
820 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
821 ds->temporary_trns_chain = NULL;
823 dict_destroy (ds->permanent_dict);
824 ds->permanent_dict = NULL;
832 /* Cancels all temporary transformations, if any. Further
833 transformations will be permanent.
834 Returns true if anything changed, false otherwise. */
836 proc_cancel_temporary_transformations (struct dataset *ds)
838 if (proc_in_temporary_transformations (ds))
840 dataset_set_dict (ds, ds->permanent_dict);
841 ds->permanent_dict = NULL;
843 trns_chain_destroy (ds->temporary_trns_chain);
844 ds->temporary_trns_chain = NULL;
852 /* Cancels all transformations, if any.
853 Returns true if successful, false on I/O error. */
855 proc_cancel_all_transformations (struct dataset *ds)
858 ok = trns_chain_destroy (ds->permanent_trns_chain);
859 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
860 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
861 ds->temporary_trns_chain = NULL;
865 /* Initializes procedure handling. */
867 create_dataset (struct casefile_factory *fact,
868 replace_source_callback *rps,
869 replace_dictionary_callback *rds
872 struct dataset *ds = xzalloc (sizeof(*ds));
873 ds->dict = dict_create ();
874 ds->cf_factory = fact;
875 ds->replace_source = rps;
876 ds->replace_dict = rds;
877 proc_cancel_all_transformations (ds);
881 /* Finishes up procedure handling. */
883 destroy_dataset (struct dataset *ds)
885 discard_variables (ds);
886 dict_destroy (ds->dict);
887 trns_chain_destroy (ds->permanent_trns_chain);
891 /* Sets SINK as the destination for procedure output from the
894 proc_set_sink (struct dataset *ds, struct case_sink *sink)
896 assert (ds->proc_sink == NULL);
897 ds->proc_sink = sink;
900 /* Sets SOURCE as the source for procedure input for the next
903 proc_set_source (struct dataset *ds, struct case_source *source)
905 assert (ds->proc_source == NULL);
906 ds->proc_source = source;
909 /* Returns true if a source for the next procedure has been
910 configured, false otherwise. */
912 proc_has_source (const struct dataset *ds)
914 return ds->proc_source != NULL;
917 /* Returns the output from the previous procedure.
918 For use only immediately after executing a procedure.
919 The returned casefile is owned by the caller; it will not be
920 automatically used for the next procedure's input. */
922 proc_capture_output (struct dataset *ds)
924 struct casefile *casefile;
926 /* Try to make sure that this function is called immediately
927 after procedure() or a similar function. */
928 assert (ds->proc_source != NULL);
929 assert (case_source_is_class (ds->proc_source, &storage_source_class));
930 assert (trns_chain_is_empty (ds->permanent_trns_chain));
931 assert (!proc_in_temporary_transformations (ds));
933 casefile = storage_source_decapsulate (ds->proc_source);
934 ds->proc_source = NULL;
939 static trns_proc_func case_limit_trns_proc;
940 static trns_free_func case_limit_trns_free;
942 /* Adds a transformation that limits the number of cases that may
943 pass through, if DS->DICT has a case limit. */
945 add_case_limit_trns (struct dataset *ds)
947 size_t case_limit = dict_get_case_limit (ds->dict);
950 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
951 *cases_remaining = case_limit;
952 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
954 dict_set_case_limit (ds->dict, 0);
958 /* Limits the maximum number of cases processed to
961 case_limit_trns_proc (void *cases_remaining_,
962 struct ccase *c UNUSED, casenumber case_nr UNUSED)
964 size_t *cases_remaining = cases_remaining_;
965 if (*cases_remaining > 0)
967 (*cases_remaining)--;
968 return TRNS_CONTINUE;
971 return TRNS_DROP_CASE;
974 /* Frees the data associated with a case limit transformation. */
976 case_limit_trns_free (void *cases_remaining_)
978 size_t *cases_remaining = cases_remaining_;
979 free (cases_remaining);
983 static trns_proc_func filter_trns_proc;
985 /* Adds a temporary transformation to filter data according to
986 the variable specified on FILTER, if any. */
988 add_filter_trns (struct dataset *ds)
990 struct variable *filter_var = dict_get_filter (ds->dict);
991 if (filter_var != NULL)
993 proc_start_temporary_transformations (ds);
994 add_transformation (ds, filter_trns_proc, NULL, filter_var);
998 /* FILTER transformation. */
1000 filter_trns_proc (void *filter_var_,
1001 struct ccase *c UNUSED, casenumber case_nr UNUSED)
1004 struct variable *filter_var = filter_var_;
1005 double f = case_num (c, filter_var);
1006 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
1007 ? TRNS_CONTINUE : TRNS_DROP_CASE);
1012 dataset_dict (const struct dataset *ds)
1018 /* Set or replace dataset DS's dictionary with DICT.
1019 The old dictionary is destroyed */
1021 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
1023 struct dictionary *old_dict = ds->dict;
1025 dict_copy_callbacks (dict, ds->dict);
1028 if ( ds->replace_dict )
1029 ds->replace_dict (dict);
1031 dict_destroy (old_dict);
1035 dataset_n_lag (const struct dataset *ds)
1041 dataset_set_n_lag (struct dataset *ds, int n_lag)
1047 struct casefile_factory *
1048 dataset_get_casefile_factory (const struct dataset *ds)
1050 return ds->cf_factory;