1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
48 /* A dataset is usually part of a session. Within a session its name must
49 unique. The name must either be a valid PSPP identifier or the empty
50 string. (It must be unique within the session even if it is the empty
51 string; that is, there may only be a single dataset within a session with
52 the empty string as its name.) */
53 struct session *session;
55 enum dataset_display display;
57 /* Cases are read from source,
58 their transformation variables are initialized,
59 pass through permanent_trns_chain (which transforms them into
60 the format described by permanent_dict),
62 pass through temporary_trns_chain (which transforms them into
63 the format described by dict),
64 and are finally passed to the procedure. */
65 struct casereader *source;
66 struct caseinit *caseinit;
67 struct trns_chain *permanent_trns_chain;
68 struct dictionary *permanent_dict;
69 struct casewriter *sink;
70 struct trns_chain *temporary_trns_chain;
71 struct dictionary *dict;
73 /* If true, cases are discarded instead of being written to
77 /* The transformation chain that the next transformation will be
79 struct trns_chain *cur_trns_chain;
81 /* The case map used to compact a case, if necessary;
82 otherwise a null pointer. */
83 struct case_map *compactor;
85 /* Time at which proc was last invoked. */
86 time_t last_proc_invocation;
88 /* Cases just before ("lagging") the current one. */
89 int n_lag; /* Number of cases to lag. */
90 struct deque lag; /* Deque of lagged cases. */
91 struct ccase **lag_cases; /* Lagged cases managed by deque. */
96 PROC_COMMITTED, /* No procedure in progress. */
97 PROC_OPEN, /* proc_open called, casereader still open. */
98 PROC_CLOSED /* casereader from proc_open destroyed,
99 but proc_commit not yet called. */
102 casenumber cases_written; /* Cases output so far. */
103 bool ok; /* Error status. */
104 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
106 const struct dataset_callbacks *callbacks;
109 /* Uniquely distinguishes datasets. */
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
120 static void update_last_proc_invocation (struct dataset *ds);
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
125 struct dataset *ds = ds_;
126 dataset_changed__ (ds);
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
132 static unsigned int seqno;
134 dict_set_change_callback (ds->dict, dict_callback, ds);
135 proc_cancel_all_transformations (ds);
136 dataset_set_session (ds, session);
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
141 SESSION already contains a dataset named NAME, it is deleted and replaced.
142 The dataset initially has an empty dictionary and no data source. */
144 dataset_create (struct session *session, const char *name)
148 ds = xzalloc (sizeof *ds);
149 ds->name = xstrdup (name);
150 ds->display = DATASET_FRONT;
151 ds->dict = dict_create (get_default_encoding ());
153 ds->caseinit = caseinit_create ();
155 dataset_create_finish__ (ds, session);
160 /* Creates and returns a new dataset that has the same data and dictionary as
161 OLD named NAME, adds it to the same session as OLD, and returns the new
162 dataset. If SESSION already contains a dataset named NAME, it is deleted
165 OLD must not have any active transformations or temporary state and must
166 not be in the middle of a procedure.
168 Callbacks are not cloned. */
170 dataset_clone (struct dataset *old, const char *name)
174 assert (old->proc_state == PROC_COMMITTED);
175 assert (trns_chain_is_empty (old->permanent_trns_chain));
176 assert (old->permanent_dict == NULL);
177 assert (old->sink == NULL);
178 assert (old->temporary_trns_chain == NULL);
180 new = xzalloc (sizeof *new);
181 new->name = xstrdup (name);
182 new->display = DATASET_FRONT;
183 new->source = casereader_clone (old->source);
184 new->dict = dict_clone (old->dict);
185 new->caseinit = caseinit_clone (old->caseinit);
186 new->last_proc_invocation = old->last_proc_invocation;
189 dataset_create_finish__ (new, old->session);
196 dataset_destroy (struct dataset *ds)
200 dataset_set_session (ds, NULL);
202 dict_destroy (ds->dict);
203 caseinit_destroy (ds->caseinit);
204 trns_chain_destroy (ds->permanent_trns_chain);
205 dataset_transformations_changed__ (ds, false);
210 /* Discards the active dataset's dictionary, data, and transformations. */
212 dataset_clear (struct dataset *ds)
214 assert (ds->proc_state == PROC_COMMITTED);
216 dict_clear (ds->dict);
217 fh_set_default_handle (NULL);
221 casereader_destroy (ds->source);
224 proc_cancel_all_transformations (ds);
228 dataset_name (const struct dataset *ds)
234 dataset_set_name (struct dataset *ds, const char *name)
236 struct session *session = ds->session;
241 active = session_active_dataset (session) == ds;
243 session_set_active_dataset (session, NULL);
244 dataset_set_session (ds, NULL);
248 ds->name = xstrdup (name);
252 dataset_set_session (ds, session);
254 session_set_active_dataset (session, ds);
259 dataset_session (const struct dataset *ds)
265 dataset_set_session (struct dataset *ds, struct session *session)
267 if (session != ds->session)
269 if (ds->session != NULL)
270 session_remove_dataset (ds->session, ds);
272 session_add_dataset (session, ds);
276 /* Returns the dictionary within DS. This is always nonnull, although it
277 might not contain any variables. */
279 dataset_dict (const struct dataset *ds)
284 /* Replaces DS's dictionary by DICT, discarding any source and
287 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 assert (ds->proc_state == PROC_COMMITTED);
290 assert (ds->dict != dict);
294 dict_destroy (ds->dict);
296 dict_set_change_callback (ds->dict, dict_callback, ds);
299 /* Returns the casereader that will be read when a procedure is executed on
300 DS. This can be NULL if none has been set up yet. */
301 const struct casereader *
302 dataset_source (const struct dataset *ds)
307 /* Returns true if DS has a data source, false otherwise. */
309 dataset_has_source (const struct dataset *ds)
311 return dataset_source (ds) != NULL;
314 /* Replaces the active dataset's data by READER. READER's cases must have an
315 appropriate format for DS's dictionary. */
317 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 casereader_destroy (ds->source);
322 caseinit_clear (ds->caseinit);
323 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325 return reader == NULL || !casereader_error (reader);
328 /* Returns the data source from DS and removes it from DS. Returns a null
329 pointer if DS has no data source. */
331 dataset_steal_source (struct dataset *ds)
333 struct casereader *reader = ds->source;
339 /* Returns a number unique to DS. It can be used to distinguish one dataset
340 from any other within a given program run, even datasets that do not exist
343 dataset_seqno (const struct dataset *ds)
349 dataset_set_callbacks (struct dataset *ds,
350 const struct dataset_callbacks *callbacks,
353 ds->callbacks = callbacks;
354 ds->cb_data = cb_data;
358 dataset_get_display (const struct dataset *ds)
364 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 ds->display = display;
369 /* Returns the last time the data was read. */
371 time_of_last_procedure (struct dataset *ds)
373 if (ds->last_proc_invocation == 0)
374 update_last_proc_invocation (ds);
375 return ds->last_proc_invocation;
378 /* Regular procedure. */
380 /* Executes any pending transformations, if necessary.
381 This is not identical to the EXECUTE command in that it won't
382 always read the source data. This can be important when the
383 source data is given inline within BEGIN DATA...END FILE. */
385 proc_execute (struct dataset *ds)
389 if ((ds->temporary_trns_chain == NULL
390 || trns_chain_is_empty (ds->temporary_trns_chain))
391 && trns_chain_is_empty (ds->permanent_trns_chain))
394 ds->discard_output = false;
395 dict_set_case_limit (ds->dict, 0);
396 dict_clear_vectors (ds->dict);
400 ok = casereader_destroy (proc_open (ds));
401 return proc_commit (ds) && ok;
404 static const struct casereader_class proc_casereader_class;
406 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
407 cases filtered out with FILTER BY will not be included in the casereader
408 (which is usually desirable). If FILTER is false, all cases will be
409 included regardless of FILTER BY settings.
411 proc_commit must be called when done. */
413 proc_open_filtering (struct dataset *ds, bool filter)
415 struct casereader *reader;
417 assert (ds->source != NULL);
418 assert (ds->proc_state == PROC_COMMITTED);
420 update_last_proc_invocation (ds);
422 caseinit_mark_for_init (ds->caseinit, ds->dict);
424 /* Finish up the collection of transformations. */
425 add_case_limit_trns (ds);
427 add_filter_trns (ds);
428 trns_chain_finalize (ds->cur_trns_chain);
430 /* Make permanent_dict refer to the dictionary right before
431 data reaches the sink. */
432 if (ds->permanent_dict == NULL)
433 ds->permanent_dict = ds->dict;
436 if (!ds->discard_output)
438 struct dictionary *pd = ds->permanent_dict;
439 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
440 if (compacted_value_cnt < dict_get_next_value_idx (pd))
442 struct caseproto *compacted_proto;
443 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
444 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
445 ds->sink = autopaging_writer_create (compacted_proto);
446 caseproto_unref (compacted_proto);
450 ds->compactor = NULL;
451 ds->sink = autopaging_writer_create (dict_get_proto (pd));
456 ds->compactor = NULL;
460 /* Allocate memory for lagged cases. */
461 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463 ds->proc_state = PROC_OPEN;
464 ds->cases_written = 0;
467 /* FIXME: use taint in dataset in place of `ok'? */
468 /* FIXME: for trivial cases we can just return a clone of
471 /* Create casereader and insert a shim on top. The shim allows us to
472 arbitrarily extend the casereader's lifetime, by slurping the cases into
473 the shim's buffer in proc_commit(). That is especially useful when output
474 table_items are generated directly from the procedure casereader (e.g. by
475 the LIST procedure) when we are using an output driver that keeps a
476 reference to the output items passed to it (e.g. the GUI output driver in
478 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480 &proc_casereader_class, ds);
481 ds->shim = casereader_shim_insert (reader);
485 /* Opens dataset DS for reading cases with proc_read.
486 proc_commit must be called when done. */
488 proc_open (struct dataset *ds)
490 return proc_open_filtering (ds, true);
493 /* Returns true if a procedure is in progress, that is, if
494 proc_open has been called but proc_commit has not. */
496 proc_is_open (const struct dataset *ds)
498 return ds->proc_state != PROC_COMMITTED;
501 /* "read" function for procedure casereader. */
502 static struct ccase *
503 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 struct dataset *ds = ds_;
506 enum trns_result retval = TRNS_DROP_CASE;
509 assert (ds->proc_state == PROC_OPEN);
510 for (; ; case_unref (c))
514 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
515 if (retval == TRNS_ERROR)
520 /* Read a case from source. */
521 c = casereader_read (ds->source);
524 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
525 caseinit_init_vars (ds->caseinit, c);
527 /* Execute permanent transformations. */
528 case_nr = ds->cases_written + 1;
529 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531 caseinit_update_left_vars (ds->caseinit, c);
532 if (retval != TRNS_CONTINUE)
535 /* Write case to collection of lagged cases. */
538 while (deque_count (&ds->lag) >= ds->n_lag)
539 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
540 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
543 /* Write case to replacement dataset. */
545 if (ds->sink != NULL)
546 casewriter_write (ds->sink,
547 case_map_execute (ds->compactor, case_ref (c)));
549 /* Execute temporary transformations. */
550 if (ds->temporary_trns_chain != NULL)
552 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
553 &c, ds->cases_written);
554 if (retval != TRNS_CONTINUE)
562 /* "destroy" function for procedure casereader. */
564 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 struct dataset *ds = ds_;
569 /* We are always the subreader for a casereader_buffer, so if we're being
570 destroyed then it's because the casereader_buffer has read all the cases
571 that it ever will. */
574 /* Make sure transformations happen for every input case, in
575 case they have side effects, and ensure that the replacement
576 active dataset gets all the cases it should. */
577 while ((c = casereader_read (reader)) != NULL)
580 ds->proc_state = PROC_CLOSED;
581 ds->ok = casereader_destroy (ds->source) && ds->ok;
583 dataset_set_source (ds, NULL);
586 /* Must return false if the source casereader, a transformation,
587 or the sink casewriter signaled an error. (If a temporary
588 transformation signals an error, then the return value is
589 false, but the replacement active dataset may still be
592 proc_commit (struct dataset *ds)
594 if (ds->shim != NULL)
595 casereader_shim_slurp (ds->shim);
597 assert (ds->proc_state == PROC_CLOSED);
598 ds->proc_state = PROC_COMMITTED;
600 dataset_changed__ (ds);
602 /* Free memory for lagged cases. */
603 while (!deque_is_empty (&ds->lag))
604 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
605 free (ds->lag_cases);
607 /* Dictionary from before TEMPORARY becomes permanent. */
608 proc_cancel_temporary_transformations (ds);
610 if (!ds->discard_output)
612 /* Finish compacting. */
613 if (ds->compactor != NULL)
615 case_map_destroy (ds->compactor);
616 ds->compactor = NULL;
618 dict_delete_scratch_vars (ds->dict);
619 dict_compact_values (ds->dict);
622 /* Old data sink becomes new data source. */
623 if (ds->sink != NULL)
624 ds->source = casewriter_make_reader (ds->sink);
629 ds->discard_output = false;
633 caseinit_clear (ds->caseinit);
634 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636 dict_clear_vectors (ds->dict);
637 ds->permanent_dict = NULL;
638 return proc_cancel_all_transformations (ds) && ds->ok;
641 /* Casereader class for procedure execution. */
642 static const struct casereader_class proc_casereader_class =
644 proc_casereader_read,
645 proc_casereader_destroy,
650 /* Updates last_proc_invocation. */
652 update_last_proc_invocation (struct dataset *ds)
654 ds->last_proc_invocation = time (NULL);
657 /* Returns a pointer to the lagged case from N_BEFORE cases before the
658 current one, or NULL if there haven't been that many cases yet. */
660 lagged_case (const struct dataset *ds, int n_before)
662 assert (n_before >= 1);
663 assert (n_before <= ds->n_lag);
665 if (n_before <= deque_count (&ds->lag))
666 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
671 /* Returns the current set of permanent transformations,
672 and clears the permanent transformations.
673 For use by INPUT PROGRAM. */
675 proc_capture_transformations (struct dataset *ds)
677 struct trns_chain *chain;
679 assert (ds->temporary_trns_chain == NULL);
680 chain = ds->permanent_trns_chain;
681 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
682 dataset_transformations_changed__ (ds, false);
687 /* Adds a transformation that processes a case with PROC and
688 frees itself with FREE to the current set of transformations.
689 The functions are passed AUX as auxiliary data. */
691 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
694 dataset_transformations_changed__ (ds, true);
697 /* Adds a transformation that processes a case with PROC and
698 frees itself with FREE to the current set of transformations.
699 When parsing of the block of transformations is complete,
700 FINALIZE will be called.
701 The functions are passed AUX as auxiliary data. */
703 add_transformation_with_finalizer (struct dataset *ds,
704 trns_finalize_func *finalize,
705 trns_proc_func *proc,
706 trns_free_func *free, void *aux)
708 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
709 dataset_transformations_changed__ (ds, true);
712 /* Returns the index of the next transformation.
713 This value can be returned by a transformation procedure
714 function to indicate a "jump" to that transformation. */
716 next_transformation (const struct dataset *ds)
718 return trns_chain_next (ds->cur_trns_chain);
721 /* Returns true if the next call to add_transformation() will add
722 a temporary transformation, false if it will add a permanent
725 proc_in_temporary_transformations (const struct dataset *ds)
727 return ds->temporary_trns_chain != NULL;
730 /* Marks the start of temporary transformations.
731 Further calls to add_transformation() will add temporary
734 proc_start_temporary_transformations (struct dataset *ds)
736 if (!proc_in_temporary_transformations (ds))
738 add_case_limit_trns (ds);
740 ds->permanent_dict = dict_clone (ds->dict);
742 trns_chain_finalize (ds->permanent_trns_chain);
743 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
744 dataset_transformations_changed__ (ds, true);
748 /* Converts all the temporary transformations, if any, to
749 permanent transformations. Further transformations will be
751 Returns true if anything changed, false otherwise. */
753 proc_make_temporary_transformations_permanent (struct dataset *ds)
755 if (proc_in_temporary_transformations (ds))
757 trns_chain_finalize (ds->temporary_trns_chain);
758 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
759 ds->temporary_trns_chain = NULL;
761 dict_destroy (ds->permanent_dict);
762 ds->permanent_dict = NULL;
770 /* Cancels all temporary transformations, if any. Further
771 transformations will be permanent.
772 Returns true if anything changed, false otherwise. */
774 proc_cancel_temporary_transformations (struct dataset *ds)
776 if (proc_in_temporary_transformations (ds))
778 dict_destroy (ds->dict);
779 ds->dict = ds->permanent_dict;
780 ds->permanent_dict = NULL;
782 trns_chain_destroy (ds->temporary_trns_chain);
783 ds->temporary_trns_chain = NULL;
784 dataset_transformations_changed__ (
785 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
792 /* Cancels all transformations, if any.
793 Returns true if successful, false on I/O error. */
795 proc_cancel_all_transformations (struct dataset *ds)
798 assert (ds->proc_state == PROC_COMMITTED);
799 ok = trns_chain_destroy (ds->permanent_trns_chain);
800 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
801 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
802 ds->temporary_trns_chain = NULL;
803 dataset_transformations_changed__ (ds, false);
808 /* Causes output from the next procedure to be discarded, instead
809 of being preserved for use as input for the next procedure. */
811 proc_discard_output (struct dataset *ds)
813 ds->discard_output = true;
817 /* Checks whether DS has a corrupted active dataset. If so,
818 discards it and returns false. If not, returns true without
821 dataset_end_of_command (struct dataset *ds)
823 if (ds->source != NULL)
825 if (casereader_error (ds->source))
832 const struct taint *taint = casereader_get_taint (ds->source);
833 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
834 assert (!taint_has_tainted_successor (taint));
840 static trns_proc_func case_limit_trns_proc;
841 static trns_free_func case_limit_trns_free;
843 /* Adds a transformation that limits the number of cases that may
844 pass through, if DS->DICT has a case limit. */
846 add_case_limit_trns (struct dataset *ds)
848 casenumber case_limit = dict_get_case_limit (ds->dict);
851 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
852 *cases_remaining = case_limit;
853 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
855 dict_set_case_limit (ds->dict, 0);
859 /* Limits the maximum number of cases processed to
862 case_limit_trns_proc (void *cases_remaining_,
863 struct ccase **c UNUSED, casenumber case_nr UNUSED)
865 size_t *cases_remaining = cases_remaining_;
866 if (*cases_remaining > 0)
868 (*cases_remaining)--;
869 return TRNS_CONTINUE;
872 return TRNS_DROP_CASE;
875 /* Frees the data associated with a case limit transformation. */
877 case_limit_trns_free (void *cases_remaining_)
879 size_t *cases_remaining = cases_remaining_;
880 free (cases_remaining);
884 static trns_proc_func filter_trns_proc;
886 /* Adds a temporary transformation to filter data according to
887 the variable specified on FILTER, if any. */
889 add_filter_trns (struct dataset *ds)
891 struct variable *filter_var = dict_get_filter (ds->dict);
892 if (filter_var != NULL)
894 proc_start_temporary_transformations (ds);
895 add_transformation (ds, filter_trns_proc, NULL, filter_var);
899 /* FILTER transformation. */
901 filter_trns_proc (void *filter_var_,
902 struct ccase **c UNUSED, casenumber case_nr UNUSED)
905 struct variable *filter_var = filter_var_;
906 double f = case_num (*c, filter_var);
907 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
908 ? TRNS_CONTINUE : TRNS_DROP_CASE);
913 dataset_need_lag (struct dataset *ds, int n_before)
915 ds->n_lag = MAX (ds->n_lag, n_before);
919 dataset_changed__ (struct dataset *ds)
921 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
922 ds->callbacks->changed (ds->cb_data);
926 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
928 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
929 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
932 /* Private interface for use by session code. */
935 dataset_set_session__ (struct dataset *ds, struct session *session)
937 ds->session = session;