1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/control-stack.h"
34 #include "data/dictionary.h"
35 #include "data/file-handle-def.h"
36 #include "data/session.h"
37 #include "data/transformations.h"
38 #include "data/variable.h"
39 #include "libpspp/deque.h"
40 #include "libpspp/misc.h"
41 #include "libpspp/str.h"
42 #include "libpspp/taint.h"
43 #include "libpspp/i18n.h"
45 #include "gl/minmax.h"
46 #include "gl/xalloc.h"
49 /* A dataset is usually part of a session. Within a session its name must
50 unique. The name must either be a valid PSPP identifier or the empty
51 string. (It must be unique within the session even if it is the empty
52 string; that is, there may only be a single dataset within a session with
53 the empty string as its name.) */
54 struct session *session;
56 enum dataset_display display;
58 /* Cases are read from source,
59 their transformation variables are initialized,
60 pass through permanent_trns_chain (which transforms them into
61 the format described by permanent_dict),
63 pass through temporary_trns_chain (which transforms them into
64 the format described by dict),
65 and are finally passed to the procedure. */
66 struct casereader *source;
67 struct caseinit *caseinit;
68 struct trns_chain *permanent_trns_chain;
69 struct dictionary *permanent_dict;
70 struct casewriter *sink;
71 struct trns_chain *temporary_trns_chain;
72 struct dictionary *dict;
74 /* If true, cases are discarded instead of being written to
78 /* The transformation chain that the next transformation will be
80 struct trns_chain *cur_trns_chain;
82 /* The case map used to compact a case, if necessary;
83 otherwise a null pointer. */
84 struct case_map *compactor;
86 /* Time at which proc was last invoked. */
87 time_t last_proc_invocation;
89 /* Cases just before ("lagging") the current one. */
90 int n_lag; /* Number of cases to lag. */
91 struct deque lag; /* Deque of lagged cases. */
92 struct ccase **lag_cases; /* Lagged cases managed by deque. */
97 PROC_COMMITTED, /* No procedure in progress. */
98 PROC_OPEN, /* proc_open called, casereader still open. */
99 PROC_CLOSED /* casereader from proc_open destroyed,
100 but proc_commit not yet called. */
103 casenumber cases_written; /* Cases output so far. */
104 bool ok; /* Error status. */
105 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
107 const struct dataset_callbacks *callbacks;
110 /* Uniquely distinguishes datasets. */
114 static void dataset_changed__ (struct dataset *);
115 static void dataset_transformations_changed__ (struct dataset *,
118 static void add_case_limit_trns (struct dataset *ds);
119 static void add_filter_trns (struct dataset *ds);
121 static void update_last_proc_invocation (struct dataset *ds);
124 dict_callback (struct dictionary *d UNUSED, void *ds_)
126 struct dataset *ds = ds_;
127 dataset_changed__ (ds);
131 dataset_create_finish__ (struct dataset *ds, struct session *session)
133 static unsigned int seqno;
135 dict_set_change_callback (ds->dict, dict_callback, ds);
136 proc_cancel_all_transformations (ds);
137 dataset_set_session (ds, session);
141 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
142 SESSION already contains a dataset named NAME, it is deleted and replaced.
143 The dataset initially has an empty dictionary and no data source. */
145 dataset_create (struct session *session, const char *name)
149 ds = xzalloc (sizeof *ds);
150 ds->name = xstrdup (name);
151 ds->display = DATASET_FRONT;
152 ds->dict = dict_create (get_default_encoding ());
154 ds->caseinit = caseinit_create ();
156 dataset_create_finish__ (ds, session);
161 /* Creates and returns a new dataset that has the same data and dictionary as
162 OLD named NAME, adds it to the same session as OLD, and returns the new
163 dataset. If SESSION already contains a dataset named NAME, it is deleted
166 OLD must not have any active transformations or temporary state and must
167 not be in the middle of a procedure.
169 Callbacks are not cloned. */
171 dataset_clone (struct dataset *old, const char *name)
175 assert (old->proc_state == PROC_COMMITTED);
176 assert (trns_chain_is_empty (old->permanent_trns_chain));
177 assert (old->permanent_dict == NULL);
178 assert (old->sink == NULL);
179 assert (old->temporary_trns_chain == NULL);
181 new = xzalloc (sizeof *new);
182 new->name = xstrdup (name);
183 new->display = DATASET_FRONT;
184 new->source = casereader_clone (old->source);
185 new->dict = dict_clone (old->dict);
186 new->caseinit = caseinit_clone (old->caseinit);
187 new->last_proc_invocation = old->last_proc_invocation;
190 dataset_create_finish__ (new, old->session);
197 dataset_destroy (struct dataset *ds)
201 dataset_set_session (ds, NULL);
203 dict_destroy (ds->dict);
204 caseinit_destroy (ds->caseinit);
206 trns_chain_destroy (ds->permanent_trns_chain);
207 dataset_transformations_changed__ (ds, false);
213 /* Discards the active dataset's dictionary, data, and transformations. */
215 dataset_clear (struct dataset *ds)
217 assert (ds->proc_state == PROC_COMMITTED);
219 dict_clear (ds->dict);
220 fh_set_default_handle (NULL);
224 casereader_destroy (ds->source);
227 proc_cancel_all_transformations (ds);
231 dataset_name (const struct dataset *ds)
237 dataset_set_name (struct dataset *ds, const char *name)
239 struct session *session = ds->session;
244 active = session_active_dataset (session) == ds;
246 session_set_active_dataset (session, NULL);
247 dataset_set_session (ds, NULL);
251 ds->name = xstrdup (name);
255 dataset_set_session (ds, session);
257 session_set_active_dataset (session, ds);
262 dataset_session (const struct dataset *ds)
268 dataset_set_session (struct dataset *ds, struct session *session)
270 if (session != ds->session)
272 if (ds->session != NULL)
273 session_remove_dataset (ds->session, ds);
275 session_add_dataset (session, ds);
279 /* Returns the dictionary within DS. This is always nonnull, although it
280 might not contain any variables. */
282 dataset_dict (const struct dataset *ds)
287 /* Replaces DS's dictionary by DICT, discarding any source and
290 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
292 assert (ds->proc_state == PROC_COMMITTED);
293 assert (ds->dict != dict);
297 dict_destroy (ds->dict);
299 dict_set_change_callback (ds->dict, dict_callback, ds);
302 /* Returns the casereader that will be read when a procedure is executed on
303 DS. This can be NULL if none has been set up yet. */
304 const struct casereader *
305 dataset_source (const struct dataset *ds)
310 /* Returns true if DS has a data source, false otherwise. */
312 dataset_has_source (const struct dataset *ds)
314 return dataset_source (ds) != NULL;
317 /* Replaces the active dataset's data by READER. READER's cases must have an
318 appropriate format for DS's dictionary. */
320 dataset_set_source (struct dataset *ds, struct casereader *reader)
322 casereader_destroy (ds->source);
325 caseinit_clear (ds->caseinit);
326 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
328 return reader == NULL || !casereader_error (reader);
331 /* Returns the data source from DS and removes it from DS. Returns a null
332 pointer if DS has no data source. */
334 dataset_steal_source (struct dataset *ds)
336 struct casereader *reader = ds->source;
342 /* Returns a number unique to DS. It can be used to distinguish one dataset
343 from any other within a given program run, even datasets that do not exist
346 dataset_seqno (const struct dataset *ds)
352 dataset_set_callbacks (struct dataset *ds,
353 const struct dataset_callbacks *callbacks,
356 ds->callbacks = callbacks;
357 ds->cb_data = cb_data;
361 dataset_get_display (const struct dataset *ds)
367 dataset_set_display (struct dataset *ds, enum dataset_display display)
369 ds->display = display;
372 /* Returns the last time the data was read. */
374 time_of_last_procedure (struct dataset *ds)
376 if (ds->last_proc_invocation == 0)
377 update_last_proc_invocation (ds);
378 return ds->last_proc_invocation;
381 /* Regular procedure. */
383 /* Executes any pending transformations, if necessary.
384 This is not identical to the EXECUTE command in that it won't
385 always read the source data. This can be important when the
386 source data is given inline within BEGIN DATA...END FILE. */
388 proc_execute (struct dataset *ds)
392 if ((ds->temporary_trns_chain == NULL
393 || trns_chain_is_empty (ds->temporary_trns_chain))
394 && trns_chain_is_empty (ds->permanent_trns_chain))
397 ds->discard_output = false;
398 dict_set_case_limit (ds->dict, 0);
399 dict_clear_vectors (ds->dict);
403 ok = casereader_destroy (proc_open (ds));
404 return proc_commit (ds) && ok;
407 static const struct casereader_class proc_casereader_class;
409 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
410 cases filtered out with FILTER BY will not be included in the casereader
411 (which is usually desirable). If FILTER is false, all cases will be
412 included regardless of FILTER BY settings.
414 proc_commit must be called when done. */
416 proc_open_filtering (struct dataset *ds, bool filter)
418 struct casereader *reader;
420 assert (ds->source != NULL);
421 assert (ds->proc_state == PROC_COMMITTED);
423 update_last_proc_invocation (ds);
425 caseinit_mark_for_init (ds->caseinit, ds->dict);
427 /* Finish up the collection of transformations. */
428 add_case_limit_trns (ds);
430 add_filter_trns (ds);
433 /* Make permanent_dict refer to the dictionary right before
434 data reaches the sink. */
435 if (ds->permanent_dict == NULL)
436 ds->permanent_dict = ds->dict;
439 if (!ds->discard_output)
441 struct dictionary *pd = ds->permanent_dict;
442 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
443 if (compacted_value_cnt < dict_get_next_value_idx (pd))
445 struct caseproto *compacted_proto;
446 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
447 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
448 ds->sink = autopaging_writer_create (compacted_proto);
449 caseproto_unref (compacted_proto);
453 ds->compactor = NULL;
454 ds->sink = autopaging_writer_create (dict_get_proto (pd));
459 ds->compactor = NULL;
463 /* Allocate memory for lagged cases. */
464 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
466 ds->proc_state = PROC_OPEN;
467 ds->cases_written = 0;
470 /* FIXME: use taint in dataset in place of `ok'? */
471 /* FIXME: for trivial cases we can just return a clone of
474 /* Create casereader and insert a shim on top. The shim allows us to
475 arbitrarily extend the casereader's lifetime, by slurping the cases into
476 the shim's buffer in proc_commit(). That is especially useful when output
477 table_items are generated directly from the procedure casereader (e.g. by
478 the LIST procedure) when we are using an output driver that keeps a
479 reference to the output items passed to it (e.g. the GUI output driver in
481 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
483 &proc_casereader_class, ds);
484 ds->shim = casereader_shim_insert (reader);
488 /* Opens dataset DS for reading cases with proc_read.
489 proc_commit must be called when done. */
491 proc_open (struct dataset *ds)
493 return proc_open_filtering (ds, true);
496 /* Returns true if a procedure is in progress, that is, if
497 proc_open has been called but proc_commit has not. */
499 proc_is_open (const struct dataset *ds)
501 return ds->proc_state != PROC_COMMITTED;
504 /* "read" function for procedure casereader. */
505 static struct ccase *
506 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
508 struct dataset *ds = ds_;
509 enum trns_result retval = TRNS_DROP_CASE;
512 assert (ds->proc_state == PROC_OPEN);
513 for (; ; case_unref (c))
517 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
518 if (retval == TRNS_ERROR)
523 /* Read a case from source. */
524 c = casereader_read (ds->source);
527 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
528 caseinit_init_vars (ds->caseinit, c);
530 /* Execute permanent transformations. */
531 case_nr = ds->cases_written + 1;
532 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
534 caseinit_update_left_vars (ds->caseinit, c);
535 if (retval != TRNS_CONTINUE)
538 /* Write case to collection of lagged cases. */
541 while (deque_count (&ds->lag) >= ds->n_lag)
542 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
543 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
546 /* Write case to replacement dataset. */
548 if (ds->sink != NULL)
549 casewriter_write (ds->sink,
550 case_map_execute (ds->compactor, case_ref (c)));
552 /* Execute temporary transformations. */
553 if (ds->temporary_trns_chain != NULL)
555 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
556 &c, ds->cases_written);
557 if (retval != TRNS_CONTINUE)
565 /* "destroy" function for procedure casereader. */
567 proc_casereader_destroy (struct casereader *reader, void *ds_)
569 struct dataset *ds = ds_;
572 /* We are always the subreader for a casereader_buffer, so if we're being
573 destroyed then it's because the casereader_buffer has read all the cases
574 that it ever will. */
577 /* Make sure transformations happen for every input case, in
578 case they have side effects, and ensure that the replacement
579 active dataset gets all the cases it should. */
580 while ((c = casereader_read (reader)) != NULL)
583 ds->proc_state = PROC_CLOSED;
584 ds->ok = casereader_destroy (ds->source) && ds->ok;
586 dataset_set_source (ds, NULL);
589 /* Must return false if the source casereader, a transformation,
590 or the sink casewriter signaled an error. (If a temporary
591 transformation signals an error, then the return value is
592 false, but the replacement active dataset may still be
595 proc_commit (struct dataset *ds)
597 if (ds->shim != NULL)
598 casereader_shim_slurp (ds->shim);
600 assert (ds->proc_state == PROC_CLOSED);
601 ds->proc_state = PROC_COMMITTED;
603 dataset_changed__ (ds);
605 /* Free memory for lagged cases. */
606 while (!deque_is_empty (&ds->lag))
607 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
608 free (ds->lag_cases);
610 /* Dictionary from before TEMPORARY becomes permanent. */
611 proc_cancel_temporary_transformations (ds);
613 if (!ds->discard_output)
615 /* Finish compacting. */
616 if (ds->compactor != NULL)
618 case_map_destroy (ds->compactor);
619 ds->compactor = NULL;
621 dict_delete_scratch_vars (ds->dict);
622 dict_compact_values (ds->dict);
625 /* Old data sink becomes new data source. */
626 if (ds->sink != NULL)
627 ds->source = casewriter_make_reader (ds->sink);
632 ds->discard_output = false;
636 caseinit_clear (ds->caseinit);
637 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
639 dict_clear_vectors (ds->dict);
640 ds->permanent_dict = NULL;
641 return proc_cancel_all_transformations (ds) && ds->ok;
644 /* Casereader class for procedure execution. */
645 static const struct casereader_class proc_casereader_class =
647 proc_casereader_read,
648 proc_casereader_destroy,
653 /* Updates last_proc_invocation. */
655 update_last_proc_invocation (struct dataset *ds)
657 ds->last_proc_invocation = time (NULL);
660 /* Returns a pointer to the lagged case from N_BEFORE cases before the
661 current one, or NULL if there haven't been that many cases yet. */
663 lagged_case (const struct dataset *ds, int n_before)
665 assert (n_before >= 1);
666 assert (n_before <= ds->n_lag);
668 if (n_before <= deque_count (&ds->lag))
669 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
674 /* Returns the current set of permanent transformations,
675 and clears the permanent transformations.
676 For use by INPUT PROGRAM. */
678 proc_capture_transformations (struct dataset *ds)
680 struct trns_chain *chain;
682 assert (ds->temporary_trns_chain == NULL);
683 chain = ds->permanent_trns_chain;
684 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
685 dataset_transformations_changed__ (ds, false);
690 /* Adds a transformation that processes a case with PROC and
691 frees itself with FREE to the current set of transformations.
692 The functions are passed AUX as auxiliary data. */
694 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
696 trns_chain_append (ds->cur_trns_chain, proc, free, aux);
697 dataset_transformations_changed__ (ds, true);
700 /* Returns the index of the next transformation.
701 This value can be returned by a transformation procedure
702 function to indicate a "jump" to that transformation. */
704 next_transformation (const struct dataset *ds)
706 return trns_chain_next (ds->cur_trns_chain);
709 /* Returns true if the next call to add_transformation() will add
710 a temporary transformation, false if it will add a permanent
713 proc_in_temporary_transformations (const struct dataset *ds)
715 return ds->temporary_trns_chain != NULL;
718 /* Marks the start of temporary transformations.
719 Further calls to add_transformation() will add temporary
722 proc_start_temporary_transformations (struct dataset *ds)
724 if (!proc_in_temporary_transformations (ds))
726 add_case_limit_trns (ds);
728 ds->permanent_dict = dict_clone (ds->dict);
731 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
732 dataset_transformations_changed__ (ds, true);
736 /* Converts all the temporary transformations, if any, to permanent
737 transformations. Further transformations will be permanent.
739 The FILTER command is implemented as a temporary transformation, so a
740 procedure that uses this function should usually use proc_open_filtering()
741 with FILTER false, instead of plain proc_open().
743 Returns true if anything changed, false otherwise. */
745 proc_make_temporary_transformations_permanent (struct dataset *ds)
747 if (proc_in_temporary_transformations (ds))
750 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
751 ds->temporary_trns_chain = NULL;
753 ds->cur_trns_chain = ds->permanent_trns_chain;
755 dict_destroy (ds->permanent_dict);
756 ds->permanent_dict = NULL;
764 /* Cancels all temporary transformations, if any. Further
765 transformations will be permanent.
766 Returns true if anything changed, false otherwise. */
768 proc_cancel_temporary_transformations (struct dataset *ds)
770 if (proc_in_temporary_transformations (ds))
772 dict_destroy (ds->dict);
773 ds->dict = ds->permanent_dict;
774 ds->permanent_dict = NULL;
777 trns_chain_destroy (ds->temporary_trns_chain);
778 ds->temporary_trns_chain = NULL;
779 dataset_transformations_changed__ (
780 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
787 /* Cancels all transformations, if any.
788 Returns true if successful, false on I/O error. */
790 proc_cancel_all_transformations (struct dataset *ds)
793 assert (ds->proc_state == PROC_COMMITTED);
795 ok = trns_chain_destroy (ds->permanent_trns_chain);
796 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
797 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
798 ds->temporary_trns_chain = NULL;
799 dataset_transformations_changed__ (ds, false);
805 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
807 struct variable *var = var_;
809 *cc = case_unshare (*cc);
810 case_data_rw (*cc, var)->f = case_num;
812 return TRNS_CONTINUE;
815 /* Add a variable which we can sort by to get back the original order. */
817 add_permanent_ordering_transformation (struct dataset *ds)
819 struct variable *temp_var;
821 temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
822 if (proc_in_temporary_transformations (ds))
824 struct variable *perm_var;
826 perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
827 trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
829 trns_chain_finalize (ds->permanent_trns_chain);
832 add_transformation (ds, store_case_num, NULL, temp_var);
837 /* Causes output from the next procedure to be discarded, instead
838 of being preserved for use as input for the next procedure. */
840 proc_discard_output (struct dataset *ds)
842 ds->discard_output = true;
846 /* Checks whether DS has a corrupted active dataset. If so,
847 discards it and returns false. If not, returns true without
850 dataset_end_of_command (struct dataset *ds)
852 if (ds->source != NULL)
854 if (casereader_error (ds->source))
861 const struct taint *taint = casereader_get_taint (ds->source);
862 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
863 assert (!taint_has_tainted_successor (taint));
869 static trns_proc_func case_limit_trns_proc;
870 static trns_free_func case_limit_trns_free;
872 /* Adds a transformation that limits the number of cases that may
873 pass through, if DS->DICT has a case limit. */
875 add_case_limit_trns (struct dataset *ds)
877 casenumber case_limit = dict_get_case_limit (ds->dict);
880 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
881 *cases_remaining = case_limit;
882 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
884 dict_set_case_limit (ds->dict, 0);
888 /* Limits the maximum number of cases processed to
891 case_limit_trns_proc (void *cases_remaining_,
892 struct ccase **c UNUSED, casenumber case_nr UNUSED)
894 size_t *cases_remaining = cases_remaining_;
895 if (*cases_remaining > 0)
897 (*cases_remaining)--;
898 return TRNS_CONTINUE;
901 return TRNS_DROP_CASE;
904 /* Frees the data associated with a case limit transformation. */
906 case_limit_trns_free (void *cases_remaining_)
908 size_t *cases_remaining = cases_remaining_;
909 free (cases_remaining);
913 static trns_proc_func filter_trns_proc;
915 /* Adds a temporary transformation to filter data according to
916 the variable specified on FILTER, if any. */
918 add_filter_trns (struct dataset *ds)
920 struct variable *filter_var = dict_get_filter (ds->dict);
921 if (filter_var != NULL)
923 proc_start_temporary_transformations (ds);
924 add_transformation (ds, filter_trns_proc, NULL, filter_var);
928 /* FILTER transformation. */
930 filter_trns_proc (void *filter_var_,
931 struct ccase **c UNUSED, casenumber case_nr UNUSED)
934 struct variable *filter_var = filter_var_;
935 double f = case_num (*c, filter_var);
936 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
937 ? TRNS_CONTINUE : TRNS_DROP_CASE);
942 dataset_need_lag (struct dataset *ds, int n_before)
944 ds->n_lag = MAX (ds->n_lag, n_before);
948 dataset_changed__ (struct dataset *ds)
950 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
951 ds->callbacks->changed (ds->cb_data);
955 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
957 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
958 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
961 /* Private interface for use by session code. */
964 dataset_set_session__ (struct dataset *ds, struct session *session)
966 ds->session = session;