1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
50 /* A dataset is usually part of a session. Within a session its name must
51 unique. The name must either be a valid PSPP identifier or the empty
52 string. (It must be unique within the session even if it is the empty
53 string; that is, there may only be a single dataset within a session with
54 the empty string as its name.) */
55 struct session *session;
57 enum dataset_display display;
59 /* Cases are read from source,
60 their transformation variables are initialized,
61 pass through permanent_trns_chain (which transforms them into
62 the format described by permanent_dict),
64 pass through temporary_trns_chain (which transforms them into
65 the format described by dict),
66 and are finally passed to the procedure. */
67 struct casereader *source;
68 struct caseinit *caseinit;
69 struct trns_chain permanent_trns_chain;
70 struct dictionary *permanent_dict;
71 struct variable *order_var;
72 struct casewriter *sink;
73 struct trns_chain temporary_trns_chain;
75 struct dictionary *dict;
77 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
78 struct trns_chain *stack;
80 size_t allocated_stack;
82 /* If true, cases are discarded instead of being written to
86 /* Time at which proc was last invoked. */
87 time_t last_proc_invocation;
89 /* Cases just before ("lagging") the current one. */
90 int n_lag; /* Number of cases to lag. */
91 struct deque lag; /* Deque of lagged cases. */
92 struct ccase **lag_cases; /* Lagged cases managed by deque. */
97 PROC_COMMITTED, /* No procedure in progress. */
98 PROC_OPEN, /* proc_open called, casereader still open. */
99 PROC_CLOSED /* casereader from proc_open destroyed,
100 but proc_commit not yet called. */
103 casenumber cases_written; /* Cases output so far. */
104 bool ok; /* Error status. */
105 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
107 const struct dataset_callbacks *callbacks;
110 /* Uniquely distinguishes datasets. */
114 static void dataset_changed__ (struct dataset *);
115 static void dataset_transformations_changed__ (struct dataset *,
118 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
119 static void cancel_measurement_level_trns (struct trns_chain *);
120 static void add_case_limit_trns (struct dataset *ds);
121 static void add_filter_trns (struct dataset *ds);
123 static void update_last_proc_invocation (struct dataset *ds);
126 dict_callback (struct dictionary *d UNUSED, void *ds_)
128 struct dataset *ds = ds_;
129 dataset_changed__ (ds);
133 dataset_create_finish__ (struct dataset *ds, struct session *session)
135 static unsigned int seqno;
137 dict_set_change_callback (ds->dict, dict_callback, ds);
138 proc_cancel_all_transformations (ds);
139 dataset_set_session (ds, session);
143 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
144 SESSION already contains a dataset named NAME, it is deleted and replaced.
145 The dataset initially has an empty dictionary and no data source. */
147 dataset_create (struct session *session, const char *name)
149 struct dataset *ds = XMALLOC (struct dataset);
150 *ds = (struct dataset) {
151 .name = xstrdup (name),
152 .display = DATASET_FRONT,
153 .dict = dict_create (get_default_encoding ()),
154 .caseinit = caseinit_create (),
156 dataset_create_finish__ (ds, session);
161 /* Creates and returns a new dataset that has the same data and dictionary as
162 OLD named NAME, adds it to the same session as OLD, and returns the new
163 dataset. If SESSION already contains a dataset named NAME, it is deleted
166 OLD must not have any active transformations or temporary state and must
167 not be in the middle of a procedure.
169 Callbacks are not cloned. */
171 dataset_clone (struct dataset *old, const char *name)
175 assert (old->proc_state == PROC_COMMITTED);
176 assert (!old->permanent_trns_chain.n);
177 assert (old->permanent_dict == NULL);
178 assert (old->sink == NULL);
179 assert (!old->temporary);
180 assert (!old->temporary_trns_chain.n);
181 assert (!old->n_stack);
183 new = xzalloc (sizeof *new);
184 new->name = xstrdup (name);
185 new->display = DATASET_FRONT;
186 new->source = casereader_clone (old->source);
187 new->dict = dict_clone (old->dict);
188 new->caseinit = caseinit_clone (old->caseinit);
189 new->last_proc_invocation = old->last_proc_invocation;
192 dataset_create_finish__ (new, old->session);
199 dataset_destroy (struct dataset *ds)
203 dataset_set_session (ds, NULL);
205 dict_unref (ds->dict);
206 dict_unref (ds->permanent_dict);
207 caseinit_destroy (ds->caseinit);
208 trns_chain_uninit (&ds->permanent_trns_chain);
209 for (size_t i = 0; i < ds->n_stack; i++)
210 trns_chain_uninit (&ds->stack[i]);
212 dataset_transformations_changed__ (ds, false);
218 /* Discards the active dataset's dictionary, data, and transformations. */
220 dataset_clear (struct dataset *ds)
222 assert (ds->proc_state == PROC_COMMITTED);
224 dict_clear (ds->dict);
225 fh_set_default_handle (NULL);
229 casereader_destroy (ds->source);
232 proc_cancel_all_transformations (ds);
236 dataset_name (const struct dataset *ds)
242 dataset_set_name (struct dataset *ds, const char *name)
244 struct session *session = ds->session;
249 active = session_active_dataset (session) == ds;
251 session_set_active_dataset (session, NULL);
252 dataset_set_session (ds, NULL);
256 ds->name = xstrdup (name);
260 dataset_set_session (ds, session);
262 session_set_active_dataset (session, ds);
267 dataset_session (const struct dataset *ds)
273 dataset_set_session (struct dataset *ds, struct session *session)
275 if (session != ds->session)
277 if (ds->session != NULL)
278 session_remove_dataset (ds->session, ds);
280 session_add_dataset (session, ds);
284 /* Returns the dictionary within DS. This is always nonnull, although it
285 might not contain any variables. */
287 dataset_dict (const struct dataset *ds)
292 /* Replaces DS's dictionary by DICT, discarding any source and
295 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
297 assert (ds->proc_state == PROC_COMMITTED);
298 assert (ds->dict != dict);
302 dict_unref (ds->dict);
304 dict_set_change_callback (ds->dict, dict_callback, ds);
307 /* Returns the casereader that will be read when a procedure is executed on
308 DS. This can be NULL if none has been set up yet. */
309 const struct casereader *
310 dataset_source (const struct dataset *ds)
315 /* Returns true if DS has a data source, false otherwise. */
317 dataset_has_source (const struct dataset *ds)
319 return dataset_source (ds) != NULL;
322 /* Replaces the active dataset's data by READER. READER's cases must have an
323 appropriate format for DS's dictionary. */
325 dataset_set_source (struct dataset *ds, struct casereader *reader)
327 casereader_destroy (ds->source);
330 caseinit_clear (ds->caseinit);
331 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
333 return reader == NULL || !casereader_error (reader);
336 /* Returns the data source from DS and removes it from DS. Returns a null
337 pointer if DS has no data source. */
339 dataset_steal_source (struct dataset *ds)
341 struct casereader *reader = ds->source;
348 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
350 assert (!proc_in_temporary_transformations (ds));
351 assert (!proc_has_transformations (ds));
352 assert (n < dict_get_n_vars (ds->dict));
354 caseinit_mark_for_init (ds->caseinit, ds->dict);
355 ds->source = caseinit_translate_casereader_to_init_vars (
356 ds->caseinit, dict_get_proto (ds->dict), ds->source);
357 caseinit_clear (ds->caseinit);
358 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
360 struct case_map_stage *stage = case_map_stage_create (ds->dict);
361 dict_delete_vars (ds->dict, vars, n);
362 ds->source = case_map_create_input_translator (
363 case_map_stage_get_case_map (stage), ds->source);
364 case_map_stage_destroy (stage);
365 caseinit_clear (ds->caseinit);
366 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
370 dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
372 assert (!proc_in_temporary_transformations (ds));
373 assert (!proc_has_transformations (ds));
374 assert (n <= dict_get_n_vars (ds->dict));
376 caseinit_mark_for_init (ds->caseinit, ds->dict);
377 ds->source = caseinit_translate_casereader_to_init_vars (
378 ds->caseinit, dict_get_proto (ds->dict), ds->source);
379 caseinit_clear (ds->caseinit);
380 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
382 struct case_map_stage *stage = case_map_stage_create (ds->dict);
383 dict_reorder_vars (ds->dict, vars, n);
384 ds->source = case_map_create_input_translator (
385 case_map_stage_get_case_map (stage), ds->source);
386 case_map_stage_destroy (stage);
387 caseinit_clear (ds->caseinit);
388 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
391 /* Returns a number unique to DS. It can be used to distinguish one dataset
392 from any other within a given program run, even datasets that do not exist
395 dataset_seqno (const struct dataset *ds)
401 dataset_set_callbacks (struct dataset *ds,
402 const struct dataset_callbacks *callbacks,
405 ds->callbacks = callbacks;
406 ds->cb_data = cb_data;
410 dataset_get_display (const struct dataset *ds)
416 dataset_set_display (struct dataset *ds, enum dataset_display display)
418 ds->display = display;
421 /* Returns the last time the data was read. */
423 time_of_last_procedure (struct dataset *ds)
427 if (ds->last_proc_invocation == 0)
428 update_last_proc_invocation (ds);
429 return ds->last_proc_invocation;
432 /* Regular procedure. */
434 /* Executes any pending transformations, if necessary.
435 This is not identical to the EXECUTE command in that it won't
436 always read the source data. This can be important when the
437 source data is given inline within BEGIN DATA...END FILE. */
439 proc_execute (struct dataset *ds)
443 if ((!ds->temporary || !ds->temporary_trns_chain.n)
444 && !ds->permanent_trns_chain.n)
447 ds->discard_output = false;
448 dict_set_case_limit (ds->dict, 0);
449 dict_clear_vectors (ds->dict);
453 ok = casereader_destroy (proc_open (ds));
454 return proc_commit (ds) && ok;
457 static const struct casereader_class proc_casereader_class;
459 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
460 cases filtered out with FILTER BY will not be included in the casereader
461 (which is usually desirable). If FILTER is false, all cases will be
462 included regardless of FILTER BY settings.
464 proc_commit must be called when done. */
466 proc_open_filtering (struct dataset *ds, bool filter)
468 struct casereader *reader;
470 assert (ds->n_stack == 0);
471 assert (ds->source != NULL);
472 assert (ds->proc_state == PROC_COMMITTED);
474 update_last_proc_invocation (ds);
476 caseinit_mark_for_init (ds->caseinit, ds->dict);
477 ds->source = caseinit_translate_casereader_to_init_vars (
478 ds->caseinit, dict_get_proto (ds->dict), ds->source);
480 /* Finish up the collection of transformations. */
481 add_case_limit_trns (ds);
483 add_filter_trns (ds);
484 if (!proc_in_temporary_transformations (ds))
485 add_measurement_level_trns (ds, ds->dict);
487 /* Make permanent_dict refer to the dictionary right before
488 data reaches the sink. */
489 if (ds->permanent_dict == NULL)
490 ds->permanent_dict = ds->dict;
493 if (!ds->discard_output)
495 struct dictionary *pd = dict_clone (ds->permanent_dict);
496 struct case_map_stage *stage = case_map_stage_create (pd);
497 dict_delete_scratch_vars (pd);
498 ds->sink = case_map_create_output_translator (
499 case_map_stage_get_case_map (stage),
500 autopaging_writer_create (dict_get_proto (pd)));
501 case_map_stage_destroy (stage);
507 /* Allocate memory for lagged cases. */
508 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
510 ds->proc_state = PROC_OPEN;
511 ds->cases_written = 0;
514 /* FIXME: use taint in dataset in place of `ok'? */
515 /* FIXME: for trivial cases we can just return a clone of
518 /* Create casereader and insert a shim on top. The shim allows us to
519 arbitrarily extend the casereader's lifetime, by slurping the cases into
520 the shim's buffer in proc_commit(). That is especially useful when output
521 table_items are generated directly from the procedure casereader (e.g. by
522 the LIST procedure) when we are using an output driver that keeps a
523 reference to the output items passed to it (e.g. the GUI output driver in
525 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
527 &proc_casereader_class, ds);
528 ds->shim = casereader_shim_insert (reader);
532 /* Opens dataset DS for reading cases with proc_read.
533 proc_commit must be called when done. */
535 proc_open (struct dataset *ds)
537 return proc_open_filtering (ds, true);
540 /* Returns true if a procedure is in progress, that is, if
541 proc_open has been called but proc_commit has not. */
543 proc_is_open (const struct dataset *ds)
545 return ds->proc_state != PROC_COMMITTED;
548 /* "read" function for procedure casereader. */
549 static struct ccase *
550 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
552 struct dataset *ds = ds_;
553 enum trns_result retval = TRNS_DROP_CASE;
556 assert (ds->proc_state == PROC_OPEN);
557 for (; ; case_unref (c))
559 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
560 if (retval == TRNS_ERROR)
565 /* Read a case from source. */
566 c = casereader_read (ds->source);
569 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
570 caseinit_restore_left_vars (ds->caseinit, c);
572 /* Execute permanent transformations. */
573 casenumber case_nr = ds->cases_written + 1;
574 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
575 caseinit_save_left_vars (ds->caseinit, c);
576 if (retval != TRNS_CONTINUE)
579 /* Write case to collection of lagged cases. */
582 while (deque_count (&ds->lag) >= ds->n_lag)
583 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
584 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
587 /* Write case to replacement dataset. */
589 if (ds->sink != NULL)
592 *case_num_rw (c, ds->order_var) = case_nr;
593 casewriter_write (ds->sink, case_ref (c));
596 /* Execute temporary transformations. */
597 if (ds->temporary_trns_chain.n)
599 retval = trns_chain_execute (&ds->temporary_trns_chain,
600 ds->cases_written, &c);
601 if (retval != TRNS_CONTINUE)
609 /* "destroy" function for procedure casereader. */
611 proc_casereader_destroy (struct casereader *reader, void *ds_)
613 struct dataset *ds = ds_;
616 /* We are always the subreader for a casereader_buffer, so if we're being
617 destroyed then it's because the casereader_buffer has read all the cases
618 that it ever will. */
621 /* Make sure transformations happen for every input case, in
622 case they have side effects, and ensure that the replacement
623 active dataset gets all the cases it should. */
624 while ((c = casereader_read (reader)) != NULL)
627 ds->proc_state = PROC_CLOSED;
628 ds->ok = casereader_destroy (ds->source) && ds->ok;
630 dataset_set_source (ds, NULL);
633 /* Must return false if the source casereader, a transformation,
634 or the sink casewriter signaled an error. (If a temporary
635 transformation signals an error, then the return value is
636 false, but the replacement active dataset may still be
639 proc_commit (struct dataset *ds)
641 if (ds->shim != NULL)
642 casereader_shim_slurp (ds->shim);
644 assert (ds->proc_state == PROC_CLOSED);
645 ds->proc_state = PROC_COMMITTED;
647 dataset_changed__ (ds);
649 /* Free memory for lagged cases. */
650 while (!deque_is_empty (&ds->lag))
651 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
652 free (ds->lag_cases);
654 /* Dictionary from before TEMPORARY becomes permanent. */
655 proc_cancel_temporary_transformations (ds);
656 bool ok = proc_cancel_all_transformations (ds) && ds->ok;
658 if (!ds->discard_output)
660 dict_delete_scratch_vars (ds->dict);
662 /* Old data sink becomes new data source. */
663 if (ds->sink != NULL)
664 ds->source = casewriter_make_reader (ds->sink);
669 ds->discard_output = false;
673 caseinit_clear (ds->caseinit);
674 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
676 dict_clear_vectors (ds->dict);
677 ds->permanent_dict = NULL;
678 ds->order_var = NULL;
682 /* Casereader class for procedure execution. */
683 static const struct casereader_class proc_casereader_class =
685 proc_casereader_read,
686 proc_casereader_destroy,
691 /* Updates last_proc_invocation. */
693 update_last_proc_invocation (struct dataset *ds)
695 ds->last_proc_invocation = time (NULL);
698 /* Returns a pointer to the lagged case from N_BEFORE cases before the
699 current one, or NULL if there haven't been that many cases yet. */
701 lagged_case (const struct dataset *ds, int n_before)
703 assert (n_before >= 1);
704 assert (n_before <= ds->n_lag);
706 if (n_before <= deque_count (&ds->lag))
707 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
712 /* Adds TRNS to the current set of transformations. */
714 add_transformation (struct dataset *ds,
715 const struct trns_class *class, void *aux)
717 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
718 : ds->temporary ? &ds->temporary_trns_chain
719 : &ds->permanent_trns_chain);
720 struct transformation t = { .class = class, .aux = aux };
721 trns_chain_append (chain, &t);
722 dataset_transformations_changed__ (ds, true);
725 /* Returns true if the next call to add_transformation() will add
726 a temporary transformation, false if it will add a permanent
729 proc_in_temporary_transformations (const struct dataset *ds)
731 return ds->temporary;
734 /* Marks the start of temporary transformations.
735 Further calls to add_transformation() will add temporary
738 proc_start_temporary_transformations (struct dataset *ds)
740 assert (!ds->n_stack);
741 if (!proc_in_temporary_transformations (ds))
743 add_case_limit_trns (ds);
745 ds->permanent_dict = dict_clone (ds->dict);
746 add_measurement_level_trns (ds, ds->permanent_dict);
748 ds->temporary = true;
749 dataset_transformations_changed__ (ds, true);
753 /* Converts all the temporary transformations, if any, to permanent
754 transformations. Further transformations will be permanent.
756 The FILTER command is implemented as a temporary transformation, so a
757 procedure that uses this function should usually use proc_open_filtering()
758 with FILTER false, instead of plain proc_open().
760 Returns true if anything changed, false otherwise. */
762 proc_make_temporary_transformations_permanent (struct dataset *ds)
764 if (proc_in_temporary_transformations (ds))
766 cancel_measurement_level_trns (&ds->permanent_trns_chain);
767 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
769 ds->temporary = false;
771 dict_unref (ds->permanent_dict);
772 ds->permanent_dict = NULL;
780 /* Cancels all temporary transformations, if any. Further
781 transformations will be permanent.
782 Returns true if anything changed, false otherwise. */
784 proc_cancel_temporary_transformations (struct dataset *ds)
786 if (proc_in_temporary_transformations (ds))
788 trns_chain_clear (&ds->temporary_trns_chain);
790 dict_unref (ds->dict);
791 ds->dict = ds->permanent_dict;
792 ds->permanent_dict = NULL;
794 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
801 /* Cancels all transformations, if any.
802 Returns true if successful, false on I/O error. */
804 proc_cancel_all_transformations (struct dataset *ds)
807 assert (ds->proc_state == PROC_COMMITTED);
808 ok = trns_chain_clear (&ds->permanent_trns_chain);
809 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
810 ds->temporary = false;
811 for (size_t i = 0; i < ds->n_stack; i++)
812 ok = trns_chain_uninit (&ds->stack[i]) && ok;
814 dataset_transformations_changed__ (ds, false);
820 proc_push_transformations (struct dataset *ds)
822 if (ds->n_stack >= ds->allocated_stack)
823 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
825 trns_chain_init (&ds->stack[ds->n_stack++]);
829 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
831 assert (ds->n_stack > 0);
832 *chain = ds->stack[--ds->n_stack];
836 proc_has_transformations (const struct dataset *ds)
838 return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
841 static enum trns_result
842 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
844 struct variable *var = var_;
846 *cc = case_unshare (*cc);
847 *case_num_rw (*cc, var) = case_num;
849 return TRNS_CONTINUE;
852 /* Add a variable $ORDERING which we can sort by to get back the original order. */
854 add_permanent_ordering_transformation (struct dataset *ds)
856 struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
857 struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
858 ds->order_var = order_var;
860 if (ds->permanent_dict)
862 order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
863 static const struct trns_class trns_class = {
865 .execute = store_case_num
867 const struct transformation t = { .class = &trns_class, .aux = order_var };
868 trns_chain_prepend (&ds->temporary_trns_chain, &t);
874 /* Causes output from the next procedure to be discarded, instead
875 of being preserved for use as input for the next procedure. */
877 proc_discard_output (struct dataset *ds)
879 ds->discard_output = true;
883 /* Checks whether DS has a corrupted active dataset. If so,
884 discards it and returns false. If not, returns true without
887 dataset_end_of_command (struct dataset *ds)
889 if (ds->source != NULL)
891 if (casereader_error (ds->source))
898 const struct taint *taint = casereader_get_taint (ds->source);
899 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
900 assert (!taint_has_tainted_successor (taint));
906 /* Limits the maximum number of cases processed to
908 static enum trns_result
909 case_limit_trns_proc (void *cases_remaining_,
910 struct ccase **c UNUSED, casenumber case_nr UNUSED)
912 size_t *cases_remaining = cases_remaining_;
913 if (*cases_remaining > 0)
915 (*cases_remaining)--;
916 return TRNS_CONTINUE;
919 return TRNS_DROP_CASE;
922 /* Frees the data associated with a case limit transformation. */
924 case_limit_trns_free (void *cases_remaining_)
926 size_t *cases_remaining = cases_remaining_;
927 free (cases_remaining);
931 /* Adds a transformation that limits the number of cases that may
932 pass through, if DS->DICT has a case limit. */
934 add_case_limit_trns (struct dataset *ds)
936 casenumber case_limit = dict_get_case_limit (ds->dict);
939 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
940 *cases_remaining = case_limit;
942 static const struct trns_class trns_class = {
943 .name = "case limit",
944 .execute = case_limit_trns_proc,
945 .destroy = case_limit_trns_free,
947 add_transformation (ds, &trns_class, cases_remaining);
949 dict_set_case_limit (ds->dict, 0);
954 /* FILTER transformation. */
955 static enum trns_result
956 filter_trns_proc (void *filter_var_,
957 struct ccase **c, casenumber case_nr UNUSED)
960 struct variable *filter_var = filter_var_;
961 double f = case_num (*c, filter_var);
962 return (f != 0.0 && !var_is_num_missing (filter_var, f)
963 ? TRNS_CONTINUE : TRNS_DROP_CASE);
966 /* Adds a temporary transformation to filter data according to
967 the variable specified on FILTER, if any. */
969 add_filter_trns (struct dataset *ds)
971 struct variable *filter_var = dict_get_filter (ds->dict);
972 if (filter_var != NULL)
974 proc_start_temporary_transformations (ds);
976 static const struct trns_class trns_class = {
978 .execute = filter_trns_proc,
980 add_transformation (ds, &trns_class, filter_var);
985 dataset_need_lag (struct dataset *ds, int n_before)
987 ds->n_lag = MAX (ds->n_lag, n_before);
990 /* Measurement guesser, for guessing a measurement level from formats and
995 struct hmap_node hmap_node;
1001 struct variable *var;
1002 struct hmap *values;
1006 mg_var_uninit (struct mg_var *mgv)
1008 struct mg_value *mgvalue, *next;
1009 HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1012 hmap_delete (mgv->values, &mgvalue->hmap_node);
1015 hmap_destroy (mgv->values);
1020 mg_var_interpret (const struct mg_var *mgv)
1022 size_t n = hmap_count (mgv->values);
1025 /* All missing (or no data). */
1026 return MEASURE_NOMINAL;
1029 const struct mg_value *mgvalue;
1030 HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1032 if (mgvalue->value < 10)
1033 return MEASURE_NOMINAL;
1034 return MEASURE_SCALE;
1038 mg_var_add_value (struct mg_var *mgv, double value)
1040 if (var_is_num_missing (mgv->var, value))
1041 return MEASURE_UNKNOWN;
1042 else if (value < 0 || value != floor (value))
1043 return MEASURE_SCALE;
1045 size_t hash = hash_double (value, 0);
1046 struct mg_value *mgvalue;
1047 HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1049 if (mgvalue->value == value)
1050 return MEASURE_UNKNOWN;
1052 mgvalue = xmalloc (sizeof *mgvalue);
1053 mgvalue->value = value;
1054 hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1055 if (hmap_count (mgv->values) >= settings_get_scalemin ())
1056 return MEASURE_SCALE;
1058 return MEASURE_UNKNOWN;
1061 struct measure_guesser
1063 struct mg_var *vars;
1067 static struct measure_guesser *
1068 measure_guesser_create__ (struct dictionary *dict)
1070 struct mg_var *mgvs = NULL;
1072 size_t allocated_mgvs = 0;
1074 for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1076 struct variable *var = dict_get_var (dict, i);
1077 if (var_get_measure (var) != MEASURE_UNKNOWN)
1080 struct fmt_spec f = var_get_print_format (var);
1081 enum measure m = var_default_measure_for_format (f.type);
1082 if (m != MEASURE_UNKNOWN)
1084 var_set_measure (var, m);
1088 if (n_mgvs >= allocated_mgvs)
1089 mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1091 struct mg_var *mgv = &mgvs[n_mgvs++];
1092 *mgv = (struct mg_var) {
1094 .values = xmalloc (sizeof *mgv->values),
1096 hmap_init (mgv->values);
1101 struct measure_guesser *mg = xmalloc (sizeof *mg);
1102 *mg = (struct measure_guesser) {
1109 /* Scans through DS's dictionary for variables that have an unknown measurement
1110 level. For those, if the measurement level can be guessed based on the
1111 variable's type and format, sets a default. If that's enough, returns NULL.
1112 If any remain whose levels are unknown and can't be guessed that way,
1113 creates and returns a structure that the caller should pass to
1114 measure_guesser_add_case() or measure_guesser_run() for guessing a
1115 measurement level based on the data. */
1116 struct measure_guesser *
1117 measure_guesser_create (struct dataset *ds)
1119 return measure_guesser_create__ (dataset_dict (ds));
1122 /* Adds data from case C to MG. */
1124 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1126 for (size_t i = 0; i < mg->n_vars; )
1128 struct mg_var *mgv = &mg->vars[i];
1129 double value = case_num (c, mgv->var);
1130 enum measure m = mg_var_add_value (mgv, value);
1131 if (m != MEASURE_UNKNOWN)
1133 var_set_measure (mgv->var, m);
1135 mg_var_uninit (mgv);
1136 *mgv = mg->vars[--mg->n_vars];
1145 measure_guesser_destroy (struct measure_guesser *mg)
1150 for (size_t i = 0; i < mg->n_vars; i++)
1152 struct mg_var *mgv = &mg->vars[i];
1153 var_set_measure (mgv->var, mg_var_interpret (mgv));
1154 mg_var_uninit (mgv);
1160 /* Adds final measurement levels based on MG, after all the cases have been
1163 measure_guesser_commit (struct measure_guesser *mg)
1165 for (size_t i = 0; i < mg->n_vars; i++)
1167 struct mg_var *mgv = &mg->vars[i];
1168 var_set_measure (mgv->var, mg_var_interpret (mgv));
1172 /* Passes the cases in READER through MG and uses the data in the cases to set
1173 measurement levels for the variables where they were still unknown. */
1175 measure_guesser_run (struct measure_guesser *mg,
1176 const struct casereader *reader)
1178 struct casereader *r = casereader_clone (reader);
1179 while (mg->n_vars > 0)
1181 struct ccase *c = casereader_read (r);
1184 measure_guesser_add_case (mg, c);
1187 casereader_destroy (r);
1189 measure_guesser_commit (mg);
1192 /* A transformation for guessing measurement levels. */
1194 static enum trns_result
1195 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1197 struct measure_guesser *mg = mg_;
1198 measure_guesser_add_case (mg, *c);
1199 return TRNS_CONTINUE;
1203 mg_trns_free (void *mg_)
1205 struct measure_guesser *mg = mg_;
1206 measure_guesser_commit (mg);
1207 measure_guesser_destroy (mg);
1211 static const struct trns_class mg_trns_class = {
1212 .name = "add measurement level",
1213 .execute = mg_trns_proc,
1214 .destroy = mg_trns_free,
1218 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1220 struct measure_guesser *mg = measure_guesser_create__ (dict);
1222 add_transformation (ds, &mg_trns_class, mg);
1226 cancel_measurement_level_trns (struct trns_chain *chain)
1231 struct transformation *trns = &chain->xforms[chain->n - 1];
1232 if (trns->class != &mg_trns_class)
1235 struct measure_guesser *mg = trns->aux;
1236 measure_guesser_destroy (mg);
1241 dataset_changed__ (struct dataset *ds)
1243 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1244 ds->callbacks->changed (ds->cb_data);
1248 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1250 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1251 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1254 /* Private interface for use by session code. */
1257 dataset_set_session__ (struct dataset *ds, struct session *session)
1259 ds->session = session;