1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
43 /* An abstract factory which creates casefiles */
44 struct casefile_factory *cf_factory;
46 /* Callback which occurs when a procedure provides a new source for
48 replace_source_callback *replace_source ;
50 /* Callback which occurs whenever the DICT is replaced by a new one */
51 replace_dictionary_callback *replace_dict;
53 /* Cases are read from proc_source,
54 pass through permanent_trns_chain (which transforms them into
55 the format described by permanent_dict),
56 are written to proc_sink,
57 pass through temporary_trns_chain (which transforms them into
58 the format described by dict),
59 and are finally passed to the procedure. */
60 struct case_source *proc_source;
61 struct trns_chain *permanent_trns_chain;
62 struct dictionary *permanent_dict;
63 struct case_sink *proc_sink;
64 struct trns_chain *temporary_trns_chain;
65 struct dictionary *dict;
67 /* The transformation chain that the next transformation will be
69 struct trns_chain *cur_trns_chain;
71 /* The compactor used to compact a case, if necessary;
72 otherwise a null pointer. */
73 struct dict_compactor *compactor;
75 /* Time at which proc was last invoked. */
76 time_t last_proc_invocation;
79 int n_lag; /* Number of cases to lag. */
80 int lag_count; /* Number of cases in lag_queue so far. */
81 int lag_head; /* Index where next case will be added. */
82 struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
85 bool is_open; /* Procedure open? */
86 struct ccase trns_case; /* Case used for transformations. */
87 struct ccase sink_case; /* Case written to sink, if
88 compacting is necessary. */
89 size_t cases_written; /* Cases output so far. */
91 }; /* struct dataset */
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
97 static bool internal_procedure (struct dataset *ds, case_func *,
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
107 /* Public functions. */
109 /* Returns the last time the data was read. */
111 time_of_last_procedure (struct dataset *ds)
113 if (ds->last_proc_invocation == 0)
114 update_last_proc_invocation (ds);
115 return ds->last_proc_invocation;
118 /* Regular procedure. */
122 /* Reads the data from the input program and writes it to a new
123 active file. For each case we read from the input program, we
126 1. Execute permanent transformations. If these drop the case,
127 start the next case from step 1.
129 2. Write case to replacement active file.
131 3. Execute temporary transformations. If these drop the case,
132 start the next case from step 1.
134 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
136 Returns true if successful, false if an I/O error occurred. */
138 procedure (struct dataset *ds, case_func *cf, void *aux)
140 update_last_proc_invocation (ds);
142 /* Optimize the trivial case where we're not going to do
143 anything with the data, by not reading the data at all. */
145 && case_source_is_class (ds->proc_source, &storage_source_class)
146 && ds->proc_sink == NULL
147 && (ds->temporary_trns_chain == NULL
148 || trns_chain_is_empty (ds->temporary_trns_chain))
149 && trns_chain_is_empty (ds->permanent_trns_chain))
152 dict_set_case_limit (ds->dict, 0);
153 dict_clear_vectors (ds->dict);
157 return internal_procedure (ds, cf, NULL, aux);
160 /* Multipass procedure. */
162 struct multipass_aux_data
164 struct casefile *casefile;
166 bool (*proc_func) (const struct casefile *, void *aux);
170 /* Case processing function for multipass_procedure(). */
172 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
174 struct multipass_aux_data *aux_data = aux_data_;
175 return casefile_append (aux_data->casefile, c);
178 /* End-of-file function for multipass_procedure(). */
180 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
182 struct multipass_aux_data *aux_data = aux_data_;
183 return (aux_data->proc_func == NULL
184 || aux_data->proc_func (aux_data->casefile, aux_data->aux));
187 /* Procedure that allows multiple passes over the input data.
188 The entire active file is passed to PROC_FUNC, with the given
189 AUX as auxiliary data, as a unit. */
191 multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
193 struct multipass_aux_data aux_data;
197 ds->cf_factory->create_casefile (ds->cf_factory,
198 dict_get_next_value_idx (ds->dict));
200 aux_data.proc_func = proc_func;
203 ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
204 ok = !casefile_error (aux_data.casefile) && ok;
206 casefile_destroy (aux_data.casefile);
212 /* Procedure implementation. */
214 /* Executes a procedure.
215 Passes each case to CASE_FUNC.
216 Calls END_FUNC after the last case.
217 Returns true if successful, false if an I/O error occurred (or
218 if CASE_FUNC or END_FUNC ever returned false). */
220 internal_procedure (struct dataset *ds, case_func *proc,
228 while (ok && proc_read (ds, &c))
230 ok = proc (c, aux, ds) && ok;
232 ok = end (aux, ds) && ok;
234 if ( proc_close (ds) && ok )
243 /* Opens dataset DS for reading cases with proc_read.
244 proc_close must be called when done. */
246 proc_open (struct dataset *ds)
248 assert (ds->proc_source != NULL);
249 assert (!ds->is_open);
251 update_last_proc_invocation (ds);
253 open_active_file (ds);
256 create_trns_case (&ds->trns_case, ds->dict);
257 case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
258 ds->cases_written = 0;
262 /* Reads the next case from dataset DS, which must have been
263 opened for reading with proc_open.
264 Returns true if successful, in which case a pointer to the
265 case is stored in *C.
266 Return false at end of file or if a read error occurs. In
267 this case a null pointer is stored in *C. */
269 proc_read (struct dataset *ds, struct ccase **c)
271 enum trns_result retval = TRNS_DROP_CASE;
273 assert (ds->is_open);
279 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
280 if (retval == TRNS_ERROR)
285 /* Read a case from proc_source. */
286 clear_case (ds, &ds->trns_case);
287 if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
290 /* Execute permanent transformations. */
291 case_nr = ds->cases_written + 1;
292 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
293 &ds->trns_case, &case_nr);
294 if (retval != TRNS_CONTINUE)
297 /* Write case to LAG queue. */
299 lag_case (ds, &ds->trns_case);
301 /* Write case to replacement active file. */
303 if (ds->proc_sink->class->write != NULL)
305 if (ds->compactor != NULL)
307 dict_compactor_compact (ds->compactor, &ds->sink_case,
309 ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
312 ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
315 /* Execute temporary transformations. */
316 if (ds->temporary_trns_chain != NULL)
318 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
319 &ds->trns_case, &ds->cases_written);
320 if (retval != TRNS_CONTINUE)
329 /* Closes dataset DS for reading.
330 Returns true if successful, false if an I/O error occurred
331 while reading or closing the data set.
332 If DS has not been opened, returns true without doing
335 proc_close (struct dataset *ds)
340 /* Drain any remaining cases. */
344 if (!proc_read (ds, &c))
347 ds->ok = free_case_source (ds->proc_source) && ds->ok;
348 proc_set_source (ds, NULL);
350 case_destroy (&ds->sink_case);
351 case_destroy (&ds->trns_case);
353 ds->ok = close_active_file (ds) && ds->ok;
359 /* Updates last_proc_invocation. */
361 update_last_proc_invocation (struct dataset *ds)
363 ds->last_proc_invocation = time (NULL);
366 /* Creates and returns a case, initializing it from the vectors
367 that say which `value's need to be initialized just once, and
368 which ones need to be re-initialized before every case. */
370 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
372 size_t var_cnt = dict_get_var_cnt (dict);
375 case_create (trns_case, dict_get_next_value_idx (dict));
376 for (i = 0; i < var_cnt; i++)
378 struct variable *v = dict_get_var (dict, i);
379 union value *value = case_data_rw (trns_case, v);
381 if (var_is_numeric (v))
382 value->f = var_get_leave (v) ? 0.0 : SYSMIS;
384 memset (value->s, ' ', var_get_width (v));
388 /* Makes all preparations for reading from the data source and writing
391 open_active_file (struct dataset *ds)
393 add_case_limit_trns (ds);
394 add_filter_trns (ds);
396 /* Finalize transformations. */
397 trns_chain_finalize (ds->cur_trns_chain);
399 /* Make permanent_dict refer to the dictionary right before
400 data reaches the sink. */
401 if (ds->permanent_dict == NULL)
402 ds->permanent_dict = ds->dict;
404 /* Figure out whether to compact. */
406 (dict_compacting_would_shrink (ds->permanent_dict)
407 ? dict_make_compactor (ds->permanent_dict)
411 if (ds->proc_sink == NULL)
412 ds->proc_sink = create_case_sink (&storage_sink_class,
416 if (ds->proc_sink->class->open != NULL)
417 ds->proc_sink->class->open (ds->proc_sink);
419 /* Allocate memory for lag queue. */
426 ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
427 for (i = 0; i < ds->n_lag; i++)
428 case_nullify (&ds->lag_queue[i]);
432 /* Add C to the lag queue. */
434 lag_case (struct dataset *ds, const struct ccase *c)
436 if (ds->lag_count < ds->n_lag)
438 case_destroy (&ds->lag_queue[ds->lag_head]);
439 case_clone (&ds->lag_queue[ds->lag_head], c);
440 if (++ds->lag_head >= ds->n_lag)
444 /* Clears the variables in C that need to be cleared between
447 clear_case (const struct dataset *ds, struct ccase *c)
449 size_t var_cnt = dict_get_var_cnt (ds->dict);
452 for (i = 0; i < var_cnt; i++)
454 struct variable *v = dict_get_var (ds->dict, i);
455 if (!var_get_leave (v))
457 if (var_is_numeric (v))
458 case_data_rw (c, v)->f = SYSMIS;
460 memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
465 /* Closes the active file. */
467 close_active_file (struct dataset *ds)
469 /* Free memory for lag queue, and turn off lagging. */
474 for (i = 0; i < ds->n_lag; i++)
475 case_destroy (&ds->lag_queue[i]);
476 free (ds->lag_queue);
480 /* Dictionary from before TEMPORARY becomes permanent. */
481 proc_cancel_temporary_transformations (ds);
483 /* Finish compacting. */
484 if (ds->compactor != NULL)
486 dict_compactor_destroy (ds->compactor);
487 dict_compact_values (ds->dict);
488 ds->compactor = NULL;
491 /* Old data sink becomes new data source. */
492 if (ds->proc_sink->class->make_source != NULL)
493 proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
494 free_case_sink (ds->proc_sink);
495 ds->proc_sink = NULL;
497 dict_clear_vectors (ds->dict);
498 ds->permanent_dict = NULL;
499 return proc_cancel_all_transformations (ds);
502 /* Returns a pointer to the lagged case from N_BEFORE cases before the
503 current one, or NULL if there haven't been that many cases yet. */
505 lagged_case (const struct dataset *ds, int n_before)
507 assert (n_before >= 1 );
508 assert (n_before <= ds->n_lag);
510 if (n_before <= ds->lag_count)
512 int index = ds->lag_head - n_before;
515 return &ds->lag_queue[index];
521 /* Procedure that separates the data into SPLIT FILE groups. */
523 /* Represents auxiliary data for handling SPLIT FILE. */
524 struct split_aux_data
526 struct dataset *dataset; /* The dataset */
527 struct ccase prev_case; /* Data in previous case. */
529 /* Callback functions. */
536 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
537 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
538 static bool split_procedure_end_func (void *, const struct dataset *);
540 /* Like procedure(), but it automatically breaks the case stream
541 into SPLIT FILE break groups. Before each group of cases with
542 identical SPLIT FILE variable values, BEGIN_FUNC is called
543 with the first case in the group.
544 Then PROC_FUNC is called for each case in the group (including
546 END_FUNC is called when the group is finished. FUNC_AUX is
547 passed to each of the functions as auxiliary data.
549 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
550 and END_FUNC will be called at all.
552 If SPLIT FILE is not in effect, then there is one break group
553 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
556 Returns true if successful, false if an I/O error occurred. */
558 procedure_with_splits (struct dataset *ds,
564 struct split_aux_data split_aux;
567 case_nullify (&split_aux.prev_case);
568 split_aux.begin = begin;
569 split_aux.proc = proc;
571 split_aux.func_aux = func_aux;
572 split_aux.dataset = ds;
574 ok = internal_procedure (ds, split_procedure_case_func,
575 split_procedure_end_func, &split_aux);
577 case_destroy (&split_aux.prev_case);
582 /* Case callback used by procedure_with_splits(). */
584 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
586 struct split_aux_data *split_aux = split_aux_;
588 /* Start a new series if needed. */
589 if (case_is_null (&split_aux->prev_case)
590 || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
592 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
593 split_aux->end (split_aux->func_aux, ds);
595 case_destroy (&split_aux->prev_case);
596 case_clone (&split_aux->prev_case, c);
598 if (split_aux->begin != NULL)
599 split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
602 return (split_aux->proc == NULL
603 || split_aux->proc (c, split_aux->func_aux, ds));
606 /* End-of-file callback used by procedure_with_splits(). */
608 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
610 struct split_aux_data *split_aux = split_aux_;
612 if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
613 split_aux->end (split_aux->func_aux, ds);
617 /* Compares the SPLIT FILE variables in cases A and B and returns
618 nonzero only if they differ. */
620 equal_splits (const struct ccase *a, const struct ccase *b,
621 const struct dataset *ds)
623 return case_compare (a, b,
624 dict_get_split_vars (ds->dict),
625 dict_get_split_cnt (ds->dict)) == 0;
628 /* Multipass procedure that separates the data into SPLIT FILE
631 /* Represents auxiliary data for handling SPLIT FILE in a
632 multipass procedure. */
633 struct multipass_split_aux_data
635 struct dataset *dataset; /* The dataset of the split */
636 struct ccase prev_case; /* Data in previous case. */
637 struct casefile *casefile; /* Accumulates data for a split. */
638 split_func *split; /* Function to call with the accumulated
640 void *func_aux; /* Auxiliary data. */
643 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
644 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
645 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
647 /* Returns true if successful, false if an I/O error occurred. */
649 multipass_procedure_with_splits (struct dataset *ds,
653 struct multipass_split_aux_data aux;
656 case_nullify (&aux.prev_case);
659 aux.func_aux = func_aux;
662 ok = internal_procedure (ds, multipass_split_case_func,
663 multipass_split_end_func, &aux);
664 case_destroy (&aux.prev_case);
669 /* Case callback used by multipass_procedure_with_splits(). */
671 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
673 struct multipass_split_aux_data *aux = aux_;
676 /* Start a new series if needed. */
677 if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
679 /* Record split values. */
680 case_destroy (&aux->prev_case);
681 case_clone (&aux->prev_case, c);
683 /* Pass any cases to split_func. */
684 if (aux->casefile != NULL)
685 ok = multipass_split_output (aux, ds);
687 /* Start a new casefile. */
689 ds->cf_factory->create_casefile (ds->cf_factory,
690 dict_get_next_value_idx (ds->dict));
693 return casefile_append (aux->casefile, c) && ok;
696 /* End-of-file callback used by multipass_procedure_with_splits(). */
698 multipass_split_end_func (void *aux_, const struct dataset *ds)
700 struct multipass_split_aux_data *aux = aux_;
701 return (aux->casefile == NULL || multipass_split_output (aux, ds));
705 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
709 assert (aux->casefile != NULL);
710 ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
711 casefile_destroy (aux->casefile);
712 aux->casefile = NULL;
717 /* Discards all the current state in preparation for a data-input
718 command like DATA LIST or GET. */
720 discard_variables (struct dataset *ds)
722 dict_clear (ds->dict);
723 fh_set_default_handle (NULL);
727 free_case_source (ds->proc_source);
728 proc_set_source (ds, NULL);
730 proc_cancel_all_transformations (ds);
733 /* Returns the current set of permanent transformations,
734 and clears the permanent transformations.
735 For use by INPUT PROGRAM. */
737 proc_capture_transformations (struct dataset *ds)
739 struct trns_chain *chain;
741 assert (ds->temporary_trns_chain == NULL);
742 chain = ds->permanent_trns_chain;
743 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
747 /* Adds a transformation that processes a case with PROC and
748 frees itself with FREE to the current set of transformations.
749 The functions are passed AUX as auxiliary data. */
751 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
753 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
756 /* Adds a transformation that processes a case with PROC and
757 frees itself with FREE to the current set of transformations.
758 When parsing of the block of transformations is complete,
759 FINALIZE will be called.
760 The functions are passed AUX as auxiliary data. */
762 add_transformation_with_finalizer (struct dataset *ds,
763 trns_finalize_func *finalize,
764 trns_proc_func *proc,
765 trns_free_func *free, void *aux)
767 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
770 /* Returns the index of the next transformation.
771 This value can be returned by a transformation procedure
772 function to indicate a "jump" to that transformation. */
774 next_transformation (const struct dataset *ds)
776 return trns_chain_next (ds->cur_trns_chain);
779 /* Returns true if the next call to add_transformation() will add
780 a temporary transformation, false if it will add a permanent
783 proc_in_temporary_transformations (const struct dataset *ds)
785 return ds->temporary_trns_chain != NULL;
788 /* Marks the start of temporary transformations.
789 Further calls to add_transformation() will add temporary
792 proc_start_temporary_transformations (struct dataset *ds)
794 if (!proc_in_temporary_transformations (ds))
796 add_case_limit_trns (ds);
798 ds->permanent_dict = dict_clone (ds->dict);
800 trns_chain_finalize (ds->permanent_trns_chain);
801 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
805 /* Converts all the temporary transformations, if any, to
806 permanent transformations. Further transformations will be
808 Returns true if anything changed, false otherwise. */
810 proc_make_temporary_transformations_permanent (struct dataset *ds)
812 if (proc_in_temporary_transformations (ds))
814 trns_chain_finalize (ds->temporary_trns_chain);
815 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
816 ds->temporary_trns_chain = NULL;
818 dict_destroy (ds->permanent_dict);
819 ds->permanent_dict = NULL;
827 /* Cancels all temporary transformations, if any. Further
828 transformations will be permanent.
829 Returns true if anything changed, false otherwise. */
831 proc_cancel_temporary_transformations (struct dataset *ds)
833 if (proc_in_temporary_transformations (ds))
835 dataset_set_dict (ds, ds->permanent_dict);
836 ds->permanent_dict = NULL;
838 trns_chain_destroy (ds->temporary_trns_chain);
839 ds->temporary_trns_chain = NULL;
847 /* Cancels all transformations, if any.
848 Returns true if successful, false on I/O error. */
850 proc_cancel_all_transformations (struct dataset *ds)
853 ok = trns_chain_destroy (ds->permanent_trns_chain);
854 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
855 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
856 ds->temporary_trns_chain = NULL;
860 /* Initializes procedure handling. */
862 create_dataset (struct casefile_factory *fact,
863 replace_source_callback *rps,
864 replace_dictionary_callback *rds
867 struct dataset *ds = xzalloc (sizeof(*ds));
868 ds->dict = dict_create ();
869 ds->cf_factory = fact;
870 ds->replace_source = rps;
871 ds->replace_dict = rds;
872 proc_cancel_all_transformations (ds);
876 /* Finishes up procedure handling. */
878 destroy_dataset (struct dataset *ds)
880 discard_variables (ds);
881 dict_destroy (ds->dict);
882 trns_chain_destroy (ds->permanent_trns_chain);
886 /* Sets SINK as the destination for procedure output from the
889 proc_set_sink (struct dataset *ds, struct case_sink *sink)
891 assert (ds->proc_sink == NULL);
892 ds->proc_sink = sink;
895 /* Sets SOURCE as the source for procedure input for the next
898 proc_set_source (struct dataset *ds, struct case_source *source)
900 ds->proc_source = source;
902 if ( ds->replace_source )
903 ds->replace_source (ds->proc_source);
906 /* Returns true if a source for the next procedure has been
907 configured, false otherwise. */
909 proc_has_source (const struct dataset *ds)
911 return ds->proc_source != NULL;
914 /* Returns the output from the previous procedure.
915 For use only immediately after executing a procedure.
916 The returned casefile is owned by the caller; it will not be
917 automatically used for the next procedure's input. */
919 proc_capture_output (struct dataset *ds)
921 struct casefile *casefile;
923 /* Try to make sure that this function is called immediately
924 after procedure() or a similar function. */
925 assert (ds->proc_source != NULL);
926 assert (case_source_is_class (ds->proc_source, &storage_source_class));
927 assert (trns_chain_is_empty (ds->permanent_trns_chain));
928 assert (!proc_in_temporary_transformations (ds));
930 casefile = storage_source_decapsulate (ds->proc_source);
931 proc_set_source (ds, NULL);
936 static trns_proc_func case_limit_trns_proc;
937 static trns_free_func case_limit_trns_free;
939 /* Adds a transformation that limits the number of cases that may
940 pass through, if DS->DICT has a case limit. */
942 add_case_limit_trns (struct dataset *ds)
944 size_t case_limit = dict_get_case_limit (ds->dict);
947 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
948 *cases_remaining = case_limit;
949 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
951 dict_set_case_limit (ds->dict, 0);
955 /* Limits the maximum number of cases processed to
958 case_limit_trns_proc (void *cases_remaining_,
959 struct ccase *c UNUSED, casenumber case_nr UNUSED)
961 size_t *cases_remaining = cases_remaining_;
962 if (*cases_remaining > 0)
964 (*cases_remaining)--;
965 return TRNS_CONTINUE;
968 return TRNS_DROP_CASE;
971 /* Frees the data associated with a case limit transformation. */
973 case_limit_trns_free (void *cases_remaining_)
975 size_t *cases_remaining = cases_remaining_;
976 free (cases_remaining);
980 static trns_proc_func filter_trns_proc;
982 /* Adds a temporary transformation to filter data according to
983 the variable specified on FILTER, if any. */
985 add_filter_trns (struct dataset *ds)
987 struct variable *filter_var = dict_get_filter (ds->dict);
988 if (filter_var != NULL)
990 proc_start_temporary_transformations (ds);
991 add_transformation (ds, filter_trns_proc, NULL, filter_var);
995 /* FILTER transformation. */
997 filter_trns_proc (void *filter_var_,
998 struct ccase *c UNUSED, casenumber case_nr UNUSED)
1001 struct variable *filter_var = filter_var_;
1002 double f = case_num (c, filter_var);
1003 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
1004 ? TRNS_CONTINUE : TRNS_DROP_CASE);
1009 dataset_dict (const struct dataset *ds)
1015 /* Set or replace dataset DS's dictionary with DICT.
1016 The old dictionary is destroyed */
1018 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
1020 struct dictionary *old_dict = ds->dict;
1022 dict_copy_callbacks (dict, ds->dict);
1025 if ( ds->replace_dict )
1026 ds->replace_dict (dict);
1028 dict_destroy (old_dict);
1032 dataset_n_lag (const struct dataset *ds)
1038 dataset_set_n_lag (struct dataset *ds, int n_lag)
1044 struct casefile_factory *
1045 dataset_get_casefile_factory (const struct dataset *ds)
1047 return ds->cf_factory;