1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
48 /* A dataset is usually part of a session. Within a session its name must
49 unique. The name must either be a valid PSPP identifier or the empty
50 string. (It must be unique within the session even if it is the empty
51 string; that is, there may only be a single dataset within a session with
52 the empty string as its name.) */
53 struct session *session;
55 enum dataset_display display;
57 /* Cases are read from source,
58 their transformation variables are initialized,
59 pass through permanent_trns_chain (which transforms them into
60 the format described by permanent_dict),
62 pass through temporary_trns_chain (which transforms them into
63 the format described by dict),
64 and are finally passed to the procedure. */
65 struct casereader *source;
66 struct caseinit *caseinit;
67 struct trns_chain permanent_trns_chain;
68 struct dictionary *permanent_dict;
69 struct casewriter *sink;
70 struct trns_chain temporary_trns_chain;
72 struct dictionary *dict;
74 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
75 struct trns_chain *stack;
77 size_t allocated_stack;
79 /* If true, cases are discarded instead of being written to
83 /* The case map used to compact a case, if necessary;
84 otherwise a null pointer. */
85 struct case_map *compactor;
87 /* Time at which proc was last invoked. */
88 time_t last_proc_invocation;
90 /* Cases just before ("lagging") the current one. */
91 int n_lag; /* Number of cases to lag. */
92 struct deque lag; /* Deque of lagged cases. */
93 struct ccase **lag_cases; /* Lagged cases managed by deque. */
98 PROC_COMMITTED, /* No procedure in progress. */
99 PROC_OPEN, /* proc_open called, casereader still open. */
100 PROC_CLOSED /* casereader from proc_open destroyed,
101 but proc_commit not yet called. */
104 casenumber cases_written; /* Cases output so far. */
105 bool ok; /* Error status. */
106 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
108 const struct dataset_callbacks *callbacks;
111 /* Uniquely distinguishes datasets. */
115 static void dataset_changed__ (struct dataset *);
116 static void dataset_transformations_changed__ (struct dataset *,
119 static void add_case_limit_trns (struct dataset *ds);
120 static void add_filter_trns (struct dataset *ds);
122 static void update_last_proc_invocation (struct dataset *ds);
125 dict_callback (struct dictionary *d UNUSED, void *ds_)
127 struct dataset *ds = ds_;
128 dataset_changed__ (ds);
132 dataset_create_finish__ (struct dataset *ds, struct session *session)
134 static unsigned int seqno;
136 dict_set_change_callback (ds->dict, dict_callback, ds);
137 proc_cancel_all_transformations (ds);
138 dataset_set_session (ds, session);
142 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
143 SESSION already contains a dataset named NAME, it is deleted and replaced.
144 The dataset initially has an empty dictionary and no data source. */
146 dataset_create (struct session *session, const char *name)
148 struct dataset *ds = XMALLOC (struct dataset);
149 *ds = (struct dataset) {
150 .name = xstrdup (name),
151 .display = DATASET_FRONT,
152 .dict = dict_create (get_default_encoding ()),
153 .caseinit = caseinit_create (),
155 dataset_create_finish__ (ds, session);
160 /* Creates and returns a new dataset that has the same data and dictionary as
161 OLD named NAME, adds it to the same session as OLD, and returns the new
162 dataset. If SESSION already contains a dataset named NAME, it is deleted
165 OLD must not have any active transformations or temporary state and must
166 not be in the middle of a procedure.
168 Callbacks are not cloned. */
170 dataset_clone (struct dataset *old, const char *name)
174 assert (old->proc_state == PROC_COMMITTED);
175 assert (!old->permanent_trns_chain.n);
176 assert (old->permanent_dict == NULL);
177 assert (old->sink == NULL);
178 assert (!old->temporary);
179 assert (!old->temporary_trns_chain.n);
181 new = xzalloc (sizeof *new);
182 new->name = xstrdup (name);
183 new->display = DATASET_FRONT;
184 new->source = casereader_clone (old->source);
185 new->dict = dict_clone (old->dict);
186 new->caseinit = caseinit_clone (old->caseinit);
187 new->last_proc_invocation = old->last_proc_invocation;
190 dataset_create_finish__ (new, old->session);
197 dataset_destroy (struct dataset *ds)
201 dataset_set_session (ds, NULL);
203 dict_unref (ds->dict);
204 dict_unref (ds->permanent_dict);
205 caseinit_destroy (ds->caseinit);
206 trns_chain_uninit (&ds->permanent_trns_chain);
207 for (size_t i = 0; i < ds->n_stack; i++)
208 trns_chain_uninit (&ds->stack[i]);
210 dataset_transformations_changed__ (ds, false);
216 /* Discards the active dataset's dictionary, data, and transformations. */
218 dataset_clear (struct dataset *ds)
220 assert (ds->proc_state == PROC_COMMITTED);
222 dict_clear (ds->dict);
223 fh_set_default_handle (NULL);
227 casereader_destroy (ds->source);
230 proc_cancel_all_transformations (ds);
234 dataset_name (const struct dataset *ds)
240 dataset_set_name (struct dataset *ds, const char *name)
242 struct session *session = ds->session;
247 active = session_active_dataset (session) == ds;
249 session_set_active_dataset (session, NULL);
250 dataset_set_session (ds, NULL);
254 ds->name = xstrdup (name);
258 dataset_set_session (ds, session);
260 session_set_active_dataset (session, ds);
265 dataset_session (const struct dataset *ds)
271 dataset_set_session (struct dataset *ds, struct session *session)
273 if (session != ds->session)
275 if (ds->session != NULL)
276 session_remove_dataset (ds->session, ds);
278 session_add_dataset (session, ds);
282 /* Returns the dictionary within DS. This is always nonnull, although it
283 might not contain any variables. */
285 dataset_dict (const struct dataset *ds)
290 /* Replaces DS's dictionary by DICT, discarding any source and
293 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
295 assert (ds->proc_state == PROC_COMMITTED);
296 assert (ds->dict != dict);
300 dict_unref (ds->dict);
302 dict_set_change_callback (ds->dict, dict_callback, ds);
305 /* Returns the casereader that will be read when a procedure is executed on
306 DS. This can be NULL if none has been set up yet. */
307 const struct casereader *
308 dataset_source (const struct dataset *ds)
313 /* Returns true if DS has a data source, false otherwise. */
315 dataset_has_source (const struct dataset *ds)
317 return dataset_source (ds) != NULL;
320 /* Replaces the active dataset's data by READER. READER's cases must have an
321 appropriate format for DS's dictionary. */
323 dataset_set_source (struct dataset *ds, struct casereader *reader)
325 casereader_destroy (ds->source);
328 caseinit_clear (ds->caseinit);
329 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
331 return reader == NULL || !casereader_error (reader);
334 /* Returns the data source from DS and removes it from DS. Returns a null
335 pointer if DS has no data source. */
337 dataset_steal_source (struct dataset *ds)
339 struct casereader *reader = ds->source;
345 /* Returns a number unique to DS. It can be used to distinguish one dataset
346 from any other within a given program run, even datasets that do not exist
349 dataset_seqno (const struct dataset *ds)
355 dataset_set_callbacks (struct dataset *ds,
356 const struct dataset_callbacks *callbacks,
359 ds->callbacks = callbacks;
360 ds->cb_data = cb_data;
364 dataset_get_display (const struct dataset *ds)
370 dataset_set_display (struct dataset *ds, enum dataset_display display)
372 ds->display = display;
375 /* Returns the last time the data was read. */
377 time_of_last_procedure (struct dataset *ds)
379 if (ds->last_proc_invocation == 0)
380 update_last_proc_invocation (ds);
381 return ds->last_proc_invocation;
384 /* Regular procedure. */
386 /* Executes any pending transformations, if necessary.
387 This is not identical to the EXECUTE command in that it won't
388 always read the source data. This can be important when the
389 source data is given inline within BEGIN DATA...END FILE. */
391 proc_execute (struct dataset *ds)
395 if ((!ds->temporary || !ds->temporary_trns_chain.n)
396 && !ds->permanent_trns_chain.n)
399 ds->discard_output = false;
400 dict_set_case_limit (ds->dict, 0);
401 dict_clear_vectors (ds->dict);
405 ok = casereader_destroy (proc_open (ds));
406 return proc_commit (ds) && ok;
409 static const struct casereader_class proc_casereader_class;
411 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
412 cases filtered out with FILTER BY will not be included in the casereader
413 (which is usually desirable). If FILTER is false, all cases will be
414 included regardless of FILTER BY settings.
416 proc_commit must be called when done. */
418 proc_open_filtering (struct dataset *ds, bool filter)
420 struct casereader *reader;
422 assert (ds->source != NULL);
423 assert (ds->proc_state == PROC_COMMITTED);
425 update_last_proc_invocation (ds);
427 caseinit_mark_for_init (ds->caseinit, ds->dict);
429 /* Finish up the collection of transformations. */
430 add_case_limit_trns (ds);
432 add_filter_trns (ds);
434 /* Make permanent_dict refer to the dictionary right before
435 data reaches the sink. */
436 if (ds->permanent_dict == NULL)
437 ds->permanent_dict = ds->dict;
440 if (!ds->discard_output)
442 struct dictionary *pd = ds->permanent_dict;
443 size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
444 if (compacted_n_values < dict_get_next_value_idx (pd))
446 struct caseproto *compacted_proto;
447 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
448 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
449 ds->sink = autopaging_writer_create (compacted_proto);
450 caseproto_unref (compacted_proto);
454 ds->compactor = NULL;
455 ds->sink = autopaging_writer_create (dict_get_proto (pd));
460 ds->compactor = NULL;
464 /* Allocate memory for lagged cases. */
465 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
467 ds->proc_state = PROC_OPEN;
468 ds->cases_written = 0;
471 /* FIXME: use taint in dataset in place of `ok'? */
472 /* FIXME: for trivial cases we can just return a clone of
475 /* Create casereader and insert a shim on top. The shim allows us to
476 arbitrarily extend the casereader's lifetime, by slurping the cases into
477 the shim's buffer in proc_commit(). That is especially useful when output
478 table_items are generated directly from the procedure casereader (e.g. by
479 the LIST procedure) when we are using an output driver that keeps a
480 reference to the output items passed to it (e.g. the GUI output driver in
482 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
484 &proc_casereader_class, ds);
485 ds->shim = casereader_shim_insert (reader);
489 /* Opens dataset DS for reading cases with proc_read.
490 proc_commit must be called when done. */
492 proc_open (struct dataset *ds)
494 return proc_open_filtering (ds, true);
497 /* Returns true if a procedure is in progress, that is, if
498 proc_open has been called but proc_commit has not. */
500 proc_is_open (const struct dataset *ds)
502 return ds->proc_state != PROC_COMMITTED;
505 /* "read" function for procedure casereader. */
506 static struct ccase *
507 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
509 struct dataset *ds = ds_;
510 enum trns_result retval = TRNS_DROP_CASE;
513 assert (ds->proc_state == PROC_OPEN);
514 for (; ; case_unref (c))
516 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
517 if (retval == TRNS_ERROR)
522 /* Read a case from source. */
523 c = casereader_read (ds->source);
526 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
527 caseinit_init_vars (ds->caseinit, c);
529 /* Execute permanent transformations. */
530 casenumber case_nr = ds->cases_written + 1;
531 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
532 caseinit_update_left_vars (ds->caseinit, c);
533 if (retval != TRNS_CONTINUE)
536 /* Write case to collection of lagged cases. */
539 while (deque_count (&ds->lag) >= ds->n_lag)
540 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
544 /* Write case to replacement dataset. */
546 if (ds->sink != NULL)
547 casewriter_write (ds->sink,
548 case_map_execute (ds->compactor, case_ref (c)));
550 /* Execute temporary transformations. */
551 if (ds->temporary_trns_chain.n)
553 retval = trns_chain_execute (&ds->temporary_trns_chain,
554 ds->cases_written, &c);
555 if (retval != TRNS_CONTINUE)
563 /* "destroy" function for procedure casereader. */
565 proc_casereader_destroy (struct casereader *reader, void *ds_)
567 struct dataset *ds = ds_;
570 /* We are always the subreader for a casereader_buffer, so if we're being
571 destroyed then it's because the casereader_buffer has read all the cases
572 that it ever will. */
575 /* Make sure transformations happen for every input case, in
576 case they have side effects, and ensure that the replacement
577 active dataset gets all the cases it should. */
578 while ((c = casereader_read (reader)) != NULL)
581 ds->proc_state = PROC_CLOSED;
582 ds->ok = casereader_destroy (ds->source) && ds->ok;
584 dataset_set_source (ds, NULL);
587 /* Must return false if the source casereader, a transformation,
588 or the sink casewriter signaled an error. (If a temporary
589 transformation signals an error, then the return value is
590 false, but the replacement active dataset may still be
593 proc_commit (struct dataset *ds)
595 if (ds->shim != NULL)
596 casereader_shim_slurp (ds->shim);
598 assert (ds->proc_state == PROC_CLOSED);
599 ds->proc_state = PROC_COMMITTED;
601 dataset_changed__ (ds);
603 /* Free memory for lagged cases. */
604 while (!deque_is_empty (&ds->lag))
605 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606 free (ds->lag_cases);
608 /* Dictionary from before TEMPORARY becomes permanent. */
609 proc_cancel_temporary_transformations (ds);
611 if (!ds->discard_output)
613 /* Finish compacting. */
614 if (ds->compactor != NULL)
616 case_map_destroy (ds->compactor);
617 ds->compactor = NULL;
619 dict_delete_scratch_vars (ds->dict);
620 dict_compact_values (ds->dict);
623 /* Old data sink becomes new data source. */
624 if (ds->sink != NULL)
625 ds->source = casewriter_make_reader (ds->sink);
630 ds->discard_output = false;
634 caseinit_clear (ds->caseinit);
635 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
637 dict_clear_vectors (ds->dict);
638 ds->permanent_dict = NULL;
639 return proc_cancel_all_transformations (ds) && ds->ok;
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
645 proc_casereader_read,
646 proc_casereader_destroy,
651 /* Updates last_proc_invocation. */
653 update_last_proc_invocation (struct dataset *ds)
655 ds->last_proc_invocation = time (NULL);
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659 current one, or NULL if there haven't been that many cases yet. */
661 lagged_case (const struct dataset *ds, int n_before)
663 assert (n_before >= 1);
664 assert (n_before <= ds->n_lag);
666 if (n_before <= deque_count (&ds->lag))
667 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
672 /* Adds TRNS to the current set of transformations. */
674 add_transformation (struct dataset *ds,
675 const struct trns_class *class, void *aux)
677 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
678 : ds->temporary ? &ds->temporary_trns_chain
679 : &ds->permanent_trns_chain);
680 struct transformation t = { .class = class, .aux = aux };
681 trns_chain_append (chain, &t);
682 dataset_transformations_changed__ (ds, true);
685 /* Returns true if the next call to add_transformation() will add
686 a temporary transformation, false if it will add a permanent
689 proc_in_temporary_transformations (const struct dataset *ds)
691 return ds->temporary;
694 /* Marks the start of temporary transformations.
695 Further calls to add_transformation() will add temporary
698 proc_start_temporary_transformations (struct dataset *ds)
700 if (!proc_in_temporary_transformations (ds))
702 add_case_limit_trns (ds);
704 ds->permanent_dict = dict_clone (ds->dict);
706 ds->temporary = true;
707 dataset_transformations_changed__ (ds, true);
711 /* Converts all the temporary transformations, if any, to permanent
712 transformations. Further transformations will be permanent.
714 The FILTER command is implemented as a temporary transformation, so a
715 procedure that uses this function should usually use proc_open_filtering()
716 with FILTER false, instead of plain proc_open().
718 Returns true if anything changed, false otherwise. */
720 proc_make_temporary_transformations_permanent (struct dataset *ds)
722 if (proc_in_temporary_transformations (ds))
724 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
726 ds->temporary = false;
728 dict_unref (ds->permanent_dict);
729 ds->permanent_dict = NULL;
737 /* Cancels all temporary transformations, if any. Further
738 transformations will be permanent.
739 Returns true if anything changed, false otherwise. */
741 proc_cancel_temporary_transformations (struct dataset *ds)
743 if (proc_in_temporary_transformations (ds))
745 dict_unref (ds->dict);
746 ds->dict = ds->permanent_dict;
747 ds->permanent_dict = NULL;
749 trns_chain_clear (&ds->temporary_trns_chain);
751 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
758 /* Cancels all transformations, if any.
759 Returns true if successful, false on I/O error. */
761 proc_cancel_all_transformations (struct dataset *ds)
764 assert (ds->proc_state == PROC_COMMITTED);
765 ok = trns_chain_clear (&ds->permanent_trns_chain);
766 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
767 ds->temporary = false;
768 for (size_t i = 0; i < ds->n_stack; i++)
769 ok = trns_chain_uninit (&ds->stack[i]) && ok;
771 dataset_transformations_changed__ (ds, false);
777 proc_push_transformations (struct dataset *ds)
779 if (ds->n_stack >= ds->allocated_stack)
780 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
782 trns_chain_init (&ds->stack[ds->n_stack++]);
786 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
788 assert (ds->n_stack > 0);
789 *chain = ds->stack[--ds->n_stack];
792 static enum trns_result
793 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
795 struct variable *var = var_;
797 *cc = case_unshare (*cc);
798 *case_num_rw (*cc, var) = case_num;
800 return TRNS_CONTINUE;
803 /* Add a variable which we can sort by to get back the original order. */
805 add_permanent_ordering_transformation (struct dataset *ds)
807 struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
808 struct variable *order_var
809 = (proc_in_temporary_transformations (ds)
810 ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
813 static const struct trns_class trns_class = {
815 .execute = store_case_num
817 const struct transformation t = { .class = &trns_class, .aux = order_var };
818 trns_chain_append (&ds->permanent_trns_chain, &t);
823 /* Causes output from the next procedure to be discarded, instead
824 of being preserved for use as input for the next procedure. */
826 proc_discard_output (struct dataset *ds)
828 ds->discard_output = true;
832 /* Checks whether DS has a corrupted active dataset. If so,
833 discards it and returns false. If not, returns true without
836 dataset_end_of_command (struct dataset *ds)
838 if (ds->source != NULL)
840 if (casereader_error (ds->source))
847 const struct taint *taint = casereader_get_taint (ds->source);
848 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
849 assert (!taint_has_tainted_successor (taint));
855 /* Limits the maximum number of cases processed to
857 static enum trns_result
858 case_limit_trns_proc (void *cases_remaining_,
859 struct ccase **c UNUSED, casenumber case_nr UNUSED)
861 size_t *cases_remaining = cases_remaining_;
862 if (*cases_remaining > 0)
864 (*cases_remaining)--;
865 return TRNS_CONTINUE;
868 return TRNS_DROP_CASE;
871 /* Frees the data associated with a case limit transformation. */
873 case_limit_trns_free (void *cases_remaining_)
875 size_t *cases_remaining = cases_remaining_;
876 free (cases_remaining);
880 /* Adds a transformation that limits the number of cases that may
881 pass through, if DS->DICT has a case limit. */
883 add_case_limit_trns (struct dataset *ds)
885 casenumber case_limit = dict_get_case_limit (ds->dict);
888 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
889 *cases_remaining = case_limit;
891 static const struct trns_class trns_class = {
892 .name = "case limit",
893 .execute = case_limit_trns_proc,
894 .destroy = case_limit_trns_free,
896 add_transformation (ds, &trns_class, cases_remaining);
898 dict_set_case_limit (ds->dict, 0);
903 /* FILTER transformation. */
904 static enum trns_result
905 filter_trns_proc (void *filter_var_,
906 struct ccase **c, casenumber case_nr UNUSED)
909 struct variable *filter_var = filter_var_;
910 double f = case_num (*c, filter_var);
911 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
912 ? TRNS_CONTINUE : TRNS_DROP_CASE);
915 /* Adds a temporary transformation to filter data according to
916 the variable specified on FILTER, if any. */
918 add_filter_trns (struct dataset *ds)
920 struct variable *filter_var = dict_get_filter (ds->dict);
921 if (filter_var != NULL)
923 proc_start_temporary_transformations (ds);
925 static const struct trns_class trns_class = {
927 .execute = filter_trns_proc,
929 add_transformation (ds, &trns_class, filter_var);
934 dataset_need_lag (struct dataset *ds, int n_before)
936 ds->n_lag = MAX (ds->n_lag, n_before);
940 dataset_changed__ (struct dataset *ds)
942 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
943 ds->callbacks->changed (ds->cb_data);
947 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
949 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
950 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
953 /* Private interface for use by session code. */
956 dataset_set_session__ (struct dataset *ds, struct session *session)
958 ds->session = session;