1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
48 /* A dataset is usually part of a session. Within a session its name must
49 unique. The name must either be a valid PSPP identifier or the empty
50 string. (It must be unique within the session even if it is the empty
51 string; that is, there may only be a single dataset within a session with
52 the empty string as its name.) */
53 struct session *session;
55 enum dataset_display display;
57 /* Cases are read from source,
58 their transformation variables are initialized,
59 pass through permanent_trns_chain (which transforms them into
60 the format described by permanent_dict),
62 pass through temporary_trns_chain (which transforms them into
63 the format described by dict),
64 and are finally passed to the procedure. */
65 struct casereader *source;
66 struct caseinit *caseinit;
67 struct trns_chain *permanent_trns_chain;
68 struct dictionary *permanent_dict;
69 struct casewriter *sink;
70 struct trns_chain *temporary_trns_chain;
71 struct dictionary *dict;
73 /* If true, cases are discarded instead of being written to
77 /* The transformation chain that the next transformation will be
79 struct trns_chain *cur_trns_chain;
81 /* The case map used to compact a case, if necessary;
82 otherwise a null pointer. */
83 struct case_map *compactor;
85 /* Time at which proc was last invoked. */
86 time_t last_proc_invocation;
88 /* Cases just before ("lagging") the current one. */
89 int n_lag; /* Number of cases to lag. */
90 struct deque lag; /* Deque of lagged cases. */
91 struct ccase **lag_cases; /* Lagged cases managed by deque. */
96 PROC_COMMITTED, /* No procedure in progress. */
97 PROC_OPEN, /* proc_open called, casereader still open. */
98 PROC_CLOSED /* casereader from proc_open destroyed,
99 but proc_commit not yet called. */
102 casenumber cases_written; /* Cases output so far. */
103 bool ok; /* Error status. */
104 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
106 const struct dataset_callbacks *callbacks;
109 /* Uniquely distinguishes datasets. */
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
120 static void update_last_proc_invocation (struct dataset *ds);
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
125 struct dataset *ds = ds_;
126 dataset_changed__ (ds);
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
132 static unsigned int seqno;
134 dict_set_change_callback (ds->dict, dict_callback, ds);
135 proc_cancel_all_transformations (ds);
136 dataset_set_session (ds, session);
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
141 SESSION already contains a dataset named NAME, it is deleted and replaced.
142 The dataset initially has an empty dictionary and no data source. */
144 dataset_create (struct session *session, const char *name)
146 struct dataset *ds = XZALLOC (struct dataset);
147 ds->name = xstrdup (name);
148 ds->display = DATASET_FRONT;
149 ds->dict = dict_create (get_default_encoding ());
151 ds->caseinit = caseinit_create ();
153 dataset_create_finish__ (ds, session);
158 /* Creates and returns a new dataset that has the same data and dictionary as
159 OLD named NAME, adds it to the same session as OLD, and returns the new
160 dataset. If SESSION already contains a dataset named NAME, it is deleted
163 OLD must not have any active transformations or temporary state and must
164 not be in the middle of a procedure.
166 Callbacks are not cloned. */
168 dataset_clone (struct dataset *old, const char *name)
172 assert (old->proc_state == PROC_COMMITTED);
173 assert (trns_chain_is_empty (old->permanent_trns_chain));
174 assert (old->permanent_dict == NULL);
175 assert (old->sink == NULL);
176 assert (old->temporary_trns_chain == NULL);
178 new = xzalloc (sizeof *new);
179 new->name = xstrdup (name);
180 new->display = DATASET_FRONT;
181 new->source = casereader_clone (old->source);
182 new->dict = dict_clone (old->dict);
183 new->caseinit = caseinit_clone (old->caseinit);
184 new->last_proc_invocation = old->last_proc_invocation;
187 dataset_create_finish__ (new, old->session);
194 dataset_destroy (struct dataset *ds)
198 dataset_set_session (ds, NULL);
200 dict_unref (ds->dict);
201 dict_unref (ds->permanent_dict);
202 caseinit_destroy (ds->caseinit);
203 trns_chain_destroy (ds->permanent_trns_chain);
204 dataset_transformations_changed__ (ds, false);
210 /* Discards the active dataset's dictionary, data, and transformations. */
212 dataset_clear (struct dataset *ds)
214 assert (ds->proc_state == PROC_COMMITTED);
216 dict_clear (ds->dict);
217 fh_set_default_handle (NULL);
221 casereader_destroy (ds->source);
224 proc_cancel_all_transformations (ds);
228 dataset_name (const struct dataset *ds)
234 dataset_set_name (struct dataset *ds, const char *name)
236 struct session *session = ds->session;
241 active = session_active_dataset (session) == ds;
243 session_set_active_dataset (session, NULL);
244 dataset_set_session (ds, NULL);
248 ds->name = xstrdup (name);
252 dataset_set_session (ds, session);
254 session_set_active_dataset (session, ds);
259 dataset_session (const struct dataset *ds)
265 dataset_set_session (struct dataset *ds, struct session *session)
267 if (session != ds->session)
269 if (ds->session != NULL)
270 session_remove_dataset (ds->session, ds);
272 session_add_dataset (session, ds);
276 /* Returns the dictionary within DS. This is always nonnull, although it
277 might not contain any variables. */
279 dataset_dict (const struct dataset *ds)
284 /* Replaces DS's dictionary by DICT, discarding any source and
287 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 assert (ds->proc_state == PROC_COMMITTED);
290 assert (ds->dict != dict);
294 dict_unref (ds->dict);
296 dict_set_change_callback (ds->dict, dict_callback, ds);
299 /* Returns the casereader that will be read when a procedure is executed on
300 DS. This can be NULL if none has been set up yet. */
301 const struct casereader *
302 dataset_source (const struct dataset *ds)
307 /* Returns true if DS has a data source, false otherwise. */
309 dataset_has_source (const struct dataset *ds)
311 return dataset_source (ds) != NULL;
314 /* Replaces the active dataset's data by READER. READER's cases must have an
315 appropriate format for DS's dictionary. */
317 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 casereader_destroy (ds->source);
322 caseinit_clear (ds->caseinit);
323 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325 return reader == NULL || !casereader_error (reader);
328 /* Returns the data source from DS and removes it from DS. Returns a null
329 pointer if DS has no data source. */
331 dataset_steal_source (struct dataset *ds)
333 struct casereader *reader = ds->source;
339 /* Returns a number unique to DS. It can be used to distinguish one dataset
340 from any other within a given program run, even datasets that do not exist
343 dataset_seqno (const struct dataset *ds)
349 dataset_set_callbacks (struct dataset *ds,
350 const struct dataset_callbacks *callbacks,
353 ds->callbacks = callbacks;
354 ds->cb_data = cb_data;
358 dataset_get_display (const struct dataset *ds)
364 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 ds->display = display;
369 /* Returns the last time the data was read. */
371 time_of_last_procedure (struct dataset *ds)
373 if (ds->last_proc_invocation == 0)
374 update_last_proc_invocation (ds);
375 return ds->last_proc_invocation;
378 /* Regular procedure. */
380 /* Executes any pending transformations, if necessary.
381 This is not identical to the EXECUTE command in that it won't
382 always read the source data. This can be important when the
383 source data is given inline within BEGIN DATA...END FILE. */
385 proc_execute (struct dataset *ds)
389 if ((ds->temporary_trns_chain == NULL
390 || trns_chain_is_empty (ds->temporary_trns_chain))
391 && trns_chain_is_empty (ds->permanent_trns_chain))
394 ds->discard_output = false;
395 dict_set_case_limit (ds->dict, 0);
396 dict_clear_vectors (ds->dict);
400 ok = casereader_destroy (proc_open (ds));
401 return proc_commit (ds) && ok;
404 static const struct casereader_class proc_casereader_class;
406 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
407 cases filtered out with FILTER BY will not be included in the casereader
408 (which is usually desirable). If FILTER is false, all cases will be
409 included regardless of FILTER BY settings.
411 proc_commit must be called when done. */
413 proc_open_filtering (struct dataset *ds, bool filter)
415 struct casereader *reader;
417 assert (ds->source != NULL);
418 assert (ds->proc_state == PROC_COMMITTED);
420 update_last_proc_invocation (ds);
422 caseinit_mark_for_init (ds->caseinit, ds->dict);
424 /* Finish up the collection of transformations. */
425 add_case_limit_trns (ds);
427 add_filter_trns (ds);
429 /* Make permanent_dict refer to the dictionary right before
430 data reaches the sink. */
431 if (ds->permanent_dict == NULL)
432 ds->permanent_dict = ds->dict;
435 if (!ds->discard_output)
437 struct dictionary *pd = ds->permanent_dict;
438 size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
439 if (compacted_n_values < dict_get_next_value_idx (pd))
441 struct caseproto *compacted_proto;
442 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
443 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
444 ds->sink = autopaging_writer_create (compacted_proto);
445 caseproto_unref (compacted_proto);
449 ds->compactor = NULL;
450 ds->sink = autopaging_writer_create (dict_get_proto (pd));
455 ds->compactor = NULL;
459 /* Allocate memory for lagged cases. */
460 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
462 ds->proc_state = PROC_OPEN;
463 ds->cases_written = 0;
466 /* FIXME: use taint in dataset in place of `ok'? */
467 /* FIXME: for trivial cases we can just return a clone of
470 /* Create casereader and insert a shim on top. The shim allows us to
471 arbitrarily extend the casereader's lifetime, by slurping the cases into
472 the shim's buffer in proc_commit(). That is especially useful when output
473 table_items are generated directly from the procedure casereader (e.g. by
474 the LIST procedure) when we are using an output driver that keeps a
475 reference to the output items passed to it (e.g. the GUI output driver in
477 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
479 &proc_casereader_class, ds);
480 ds->shim = casereader_shim_insert (reader);
484 /* Opens dataset DS for reading cases with proc_read.
485 proc_commit must be called when done. */
487 proc_open (struct dataset *ds)
489 return proc_open_filtering (ds, true);
492 /* Returns true if a procedure is in progress, that is, if
493 proc_open has been called but proc_commit has not. */
495 proc_is_open (const struct dataset *ds)
497 return ds->proc_state != PROC_COMMITTED;
500 /* "read" function for procedure casereader. */
501 static struct ccase *
502 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
504 struct dataset *ds = ds_;
505 enum trns_result retval = TRNS_DROP_CASE;
508 assert (ds->proc_state == PROC_OPEN);
509 for (; ; case_unref (c))
513 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
514 if (retval == TRNS_ERROR)
519 /* Read a case from source. */
520 c = casereader_read (ds->source);
523 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
524 caseinit_init_vars (ds->caseinit, c);
526 /* Execute permanent transformations. */
527 case_nr = ds->cases_written + 1;
528 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
530 caseinit_update_left_vars (ds->caseinit, c);
531 if (retval != TRNS_CONTINUE)
534 /* Write case to collection of lagged cases. */
537 while (deque_count (&ds->lag) >= ds->n_lag)
538 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
539 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542 /* Write case to replacement dataset. */
544 if (ds->sink != NULL)
545 casewriter_write (ds->sink,
546 case_map_execute (ds->compactor, case_ref (c)));
548 /* Execute temporary transformations. */
549 if (ds->temporary_trns_chain != NULL)
551 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
552 &c, ds->cases_written);
553 if (retval != TRNS_CONTINUE)
561 /* "destroy" function for procedure casereader. */
563 proc_casereader_destroy (struct casereader *reader, void *ds_)
565 struct dataset *ds = ds_;
568 /* We are always the subreader for a casereader_buffer, so if we're being
569 destroyed then it's because the casereader_buffer has read all the cases
570 that it ever will. */
573 /* Make sure transformations happen for every input case, in
574 case they have side effects, and ensure that the replacement
575 active dataset gets all the cases it should. */
576 while ((c = casereader_read (reader)) != NULL)
579 ds->proc_state = PROC_CLOSED;
580 ds->ok = casereader_destroy (ds->source) && ds->ok;
582 dataset_set_source (ds, NULL);
585 /* Must return false if the source casereader, a transformation,
586 or the sink casewriter signaled an error. (If a temporary
587 transformation signals an error, then the return value is
588 false, but the replacement active dataset may still be
591 proc_commit (struct dataset *ds)
593 if (ds->shim != NULL)
594 casereader_shim_slurp (ds->shim);
596 assert (ds->proc_state == PROC_CLOSED);
597 ds->proc_state = PROC_COMMITTED;
599 dataset_changed__ (ds);
601 /* Free memory for lagged cases. */
602 while (!deque_is_empty (&ds->lag))
603 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
604 free (ds->lag_cases);
606 /* Dictionary from before TEMPORARY becomes permanent. */
607 proc_cancel_temporary_transformations (ds);
609 if (!ds->discard_output)
611 /* Finish compacting. */
612 if (ds->compactor != NULL)
614 case_map_destroy (ds->compactor);
615 ds->compactor = NULL;
617 dict_delete_scratch_vars (ds->dict);
618 dict_compact_values (ds->dict);
621 /* Old data sink becomes new data source. */
622 if (ds->sink != NULL)
623 ds->source = casewriter_make_reader (ds->sink);
628 ds->discard_output = false;
632 caseinit_clear (ds->caseinit);
633 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
635 dict_clear_vectors (ds->dict);
636 ds->permanent_dict = NULL;
637 return proc_cancel_all_transformations (ds) && ds->ok;
640 /* Casereader class for procedure execution. */
641 static const struct casereader_class proc_casereader_class =
643 proc_casereader_read,
644 proc_casereader_destroy,
649 /* Updates last_proc_invocation. */
651 update_last_proc_invocation (struct dataset *ds)
653 ds->last_proc_invocation = time (NULL);
656 /* Returns a pointer to the lagged case from N_BEFORE cases before the
657 current one, or NULL if there haven't been that many cases yet. */
659 lagged_case (const struct dataset *ds, int n_before)
661 assert (n_before >= 1);
662 assert (n_before <= ds->n_lag);
664 if (n_before <= deque_count (&ds->lag))
665 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
670 /* Returns the current set of permanent transformations,
671 and clears the permanent transformations.
672 For use by INPUT PROGRAM. */
674 proc_capture_transformations (struct dataset *ds)
676 struct trns_chain *chain;
678 assert (ds->temporary_trns_chain == NULL);
679 chain = ds->permanent_trns_chain;
680 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
681 dataset_transformations_changed__ (ds, false);
686 /* Adds a transformation that processes a case with PROC and
687 frees itself with FREE to the current set of transformations.
688 The functions are passed AUX as auxiliary data. */
690 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
692 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
693 dataset_transformations_changed__ (ds, true);
696 /* Returns true if the next call to add_transformation() will add
697 a temporary transformation, false if it will add a permanent
700 proc_in_temporary_transformations (const struct dataset *ds)
702 return ds->temporary_trns_chain != NULL;
705 /* Marks the start of temporary transformations.
706 Further calls to add_transformation() will add temporary
709 proc_start_temporary_transformations (struct dataset *ds)
711 if (!proc_in_temporary_transformations (ds))
713 add_case_limit_trns (ds);
715 ds->permanent_dict = dict_clone (ds->dict);
717 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
718 dataset_transformations_changed__ (ds, true);
722 /* Converts all the temporary transformations, if any, to permanent
723 transformations. Further transformations will be permanent.
725 The FILTER command is implemented as a temporary transformation, so a
726 procedure that uses this function should usually use proc_open_filtering()
727 with FILTER false, instead of plain proc_open().
729 Returns true if anything changed, false otherwise. */
731 proc_make_temporary_transformations_permanent (struct dataset *ds)
733 if (proc_in_temporary_transformations (ds))
735 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
736 ds->temporary_trns_chain = NULL;
738 ds->cur_trns_chain = ds->permanent_trns_chain;
740 dict_unref (ds->permanent_dict);
741 ds->permanent_dict = NULL;
749 /* Cancels all temporary transformations, if any. Further
750 transformations will be permanent.
751 Returns true if anything changed, false otherwise. */
753 proc_cancel_temporary_transformations (struct dataset *ds)
755 if (proc_in_temporary_transformations (ds))
757 dict_unref (ds->dict);
758 ds->dict = ds->permanent_dict;
759 ds->permanent_dict = NULL;
761 trns_chain_destroy (ds->temporary_trns_chain);
762 ds->temporary_trns_chain = NULL;
763 dataset_transformations_changed__ (
764 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
771 /* Cancels all transformations, if any.
772 Returns true if successful, false on I/O error. */
774 proc_cancel_all_transformations (struct dataset *ds)
777 assert (ds->proc_state == PROC_COMMITTED);
778 ok = trns_chain_destroy (ds->permanent_trns_chain);
779 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
780 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
781 ds->temporary_trns_chain = NULL;
782 dataset_transformations_changed__ (ds, false);
787 static enum trns_result
788 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
790 struct variable *var = var_;
792 *cc = case_unshare (*cc);
793 *case_num_rw (*cc, var) = case_num;
795 return TRNS_CONTINUE;
798 /* Add a variable which we can sort by to get back the original order. */
800 add_permanent_ordering_transformation (struct dataset *ds)
802 struct variable *temp_var;
804 temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
805 if (proc_in_temporary_transformations (ds))
807 struct variable *perm_var;
809 perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
810 trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
814 add_transformation (ds, store_case_num, NULL, temp_var);
819 /* Causes output from the next procedure to be discarded, instead
820 of being preserved for use as input for the next procedure. */
822 proc_discard_output (struct dataset *ds)
824 ds->discard_output = true;
828 /* Checks whether DS has a corrupted active dataset. If so,
829 discards it and returns false. If not, returns true without
832 dataset_end_of_command (struct dataset *ds)
834 if (ds->source != NULL)
836 if (casereader_error (ds->source))
843 const struct taint *taint = casereader_get_taint (ds->source);
844 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
845 assert (!taint_has_tainted_successor (taint));
851 static trns_proc_func case_limit_trns_proc;
852 static trns_free_func case_limit_trns_free;
854 /* Adds a transformation that limits the number of cases that may
855 pass through, if DS->DICT has a case limit. */
857 add_case_limit_trns (struct dataset *ds)
859 casenumber case_limit = dict_get_case_limit (ds->dict);
862 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
863 *cases_remaining = case_limit;
864 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
866 dict_set_case_limit (ds->dict, 0);
870 /* Limits the maximum number of cases processed to
872 static enum trns_result
873 case_limit_trns_proc (void *cases_remaining_,
874 struct ccase **c UNUSED, casenumber case_nr UNUSED)
876 size_t *cases_remaining = cases_remaining_;
877 if (*cases_remaining > 0)
879 (*cases_remaining)--;
880 return TRNS_CONTINUE;
883 return TRNS_DROP_CASE;
886 /* Frees the data associated with a case limit transformation. */
888 case_limit_trns_free (void *cases_remaining_)
890 size_t *cases_remaining = cases_remaining_;
891 free (cases_remaining);
895 static trns_proc_func filter_trns_proc;
897 /* Adds a temporary transformation to filter data according to
898 the variable specified on FILTER, if any. */
900 add_filter_trns (struct dataset *ds)
902 struct variable *filter_var = dict_get_filter (ds->dict);
903 if (filter_var != NULL)
905 proc_start_temporary_transformations (ds);
906 add_transformation (ds, filter_trns_proc, NULL, filter_var);
910 /* FILTER transformation. */
911 static enum trns_result
912 filter_trns_proc (void *filter_var_,
913 struct ccase **c, casenumber case_nr UNUSED)
916 struct variable *filter_var = filter_var_;
917 double f = case_num (*c, filter_var);
918 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
919 ? TRNS_CONTINUE : TRNS_DROP_CASE);
924 dataset_need_lag (struct dataset *ds, int n_before)
926 ds->n_lag = MAX (ds->n_lag, n_before);
930 dataset_changed__ (struct dataset *ds)
932 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
933 ds->callbacks->changed (ds->cb_data);
937 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
939 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
940 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
943 /* Private interface for use by session code. */
946 dataset_set_session__ (struct dataset *ds, struct session *session)
948 ds->session = session;