1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
50 /* A dataset is usually part of a session. Within a session its name must
51 unique. The name must either be a valid PSPP identifier or the empty
52 string. (It must be unique within the session even if it is the empty
53 string; that is, there may only be a single dataset within a session with
54 the empty string as its name.) */
55 struct session *session;
57 enum dataset_display display;
59 /* Cases are read from source,
60 their transformation variables are initialized,
61 pass through permanent_trns_chain (which transforms them into
62 the format described by permanent_dict),
64 pass through temporary_trns_chain (which transforms them into
65 the format described by dict),
66 and are finally passed to the procedure. */
67 struct casereader *source;
68 struct caseinit *caseinit;
69 struct trns_chain permanent_trns_chain;
70 struct dictionary *permanent_dict;
71 struct casewriter *sink;
72 struct trns_chain temporary_trns_chain;
74 struct dictionary *dict;
76 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77 struct trns_chain *stack;
79 size_t allocated_stack;
81 /* If true, cases are discarded instead of being written to
85 /* The case map used to compact a case, if necessary;
86 otherwise a null pointer. */
87 struct case_map *compactor;
89 /* Time at which proc was last invoked. */
90 time_t last_proc_invocation;
92 /* Cases just before ("lagging") the current one. */
93 int n_lag; /* Number of cases to lag. */
94 struct deque lag; /* Deque of lagged cases. */
95 struct ccase **lag_cases; /* Lagged cases managed by deque. */
100 PROC_COMMITTED, /* No procedure in progress. */
101 PROC_OPEN, /* proc_open called, casereader still open. */
102 PROC_CLOSED /* casereader from proc_open destroyed,
103 but proc_commit not yet called. */
106 casenumber cases_written; /* Cases output so far. */
107 bool ok; /* Error status. */
108 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
110 const struct dataset_callbacks *callbacks;
113 /* Uniquely distinguishes datasets. */
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
126 static void update_last_proc_invocation (struct dataset *ds);
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
131 struct dataset *ds = ds_;
132 dataset_changed__ (ds);
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
138 static unsigned int seqno;
140 dict_set_change_callback (ds->dict, dict_callback, ds);
141 proc_cancel_all_transformations (ds);
142 dataset_set_session (ds, session);
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
147 SESSION already contains a dataset named NAME, it is deleted and replaced.
148 The dataset initially has an empty dictionary and no data source. */
150 dataset_create (struct session *session, const char *name)
152 struct dataset *ds = XMALLOC (struct dataset);
153 *ds = (struct dataset) {
154 .name = xstrdup (name),
155 .display = DATASET_FRONT,
156 .dict = dict_create (get_default_encoding ()),
157 .caseinit = caseinit_create (),
159 dataset_create_finish__ (ds, session);
164 /* Creates and returns a new dataset that has the same data and dictionary as
165 OLD named NAME, adds it to the same session as OLD, and returns the new
166 dataset. If SESSION already contains a dataset named NAME, it is deleted
169 OLD must not have any active transformations or temporary state and must
170 not be in the middle of a procedure.
172 Callbacks are not cloned. */
174 dataset_clone (struct dataset *old, const char *name)
178 assert (old->proc_state == PROC_COMMITTED);
179 assert (!old->permanent_trns_chain.n);
180 assert (old->permanent_dict == NULL);
181 assert (old->sink == NULL);
182 assert (!old->temporary);
183 assert (!old->temporary_trns_chain.n);
184 assert (!old->n_stack);
186 new = xzalloc (sizeof *new);
187 new->name = xstrdup (name);
188 new->display = DATASET_FRONT;
189 new->source = casereader_clone (old->source);
190 new->dict = dict_clone (old->dict);
191 new->caseinit = caseinit_clone (old->caseinit);
192 new->last_proc_invocation = old->last_proc_invocation;
195 dataset_create_finish__ (new, old->session);
202 dataset_destroy (struct dataset *ds)
206 dataset_set_session (ds, NULL);
208 dict_unref (ds->dict);
209 dict_unref (ds->permanent_dict);
210 caseinit_destroy (ds->caseinit);
211 trns_chain_uninit (&ds->permanent_trns_chain);
212 for (size_t i = 0; i < ds->n_stack; i++)
213 trns_chain_uninit (&ds->stack[i]);
215 dataset_transformations_changed__ (ds, false);
221 /* Discards the active dataset's dictionary, data, and transformations. */
223 dataset_clear (struct dataset *ds)
225 assert (ds->proc_state == PROC_COMMITTED);
227 dict_clear (ds->dict);
228 fh_set_default_handle (NULL);
232 casereader_destroy (ds->source);
235 proc_cancel_all_transformations (ds);
239 dataset_name (const struct dataset *ds)
245 dataset_set_name (struct dataset *ds, const char *name)
247 struct session *session = ds->session;
252 active = session_active_dataset (session) == ds;
254 session_set_active_dataset (session, NULL);
255 dataset_set_session (ds, NULL);
259 ds->name = xstrdup (name);
263 dataset_set_session (ds, session);
265 session_set_active_dataset (session, ds);
270 dataset_session (const struct dataset *ds)
276 dataset_set_session (struct dataset *ds, struct session *session)
278 if (session != ds->session)
280 if (ds->session != NULL)
281 session_remove_dataset (ds->session, ds);
283 session_add_dataset (session, ds);
287 /* Returns the dictionary within DS. This is always nonnull, although it
288 might not contain any variables. */
290 dataset_dict (const struct dataset *ds)
295 /* Replaces DS's dictionary by DICT, discarding any source and
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
300 assert (ds->proc_state == PROC_COMMITTED);
301 assert (ds->dict != dict);
305 dict_unref (ds->dict);
307 dict_set_change_callback (ds->dict, dict_callback, ds);
310 /* Returns the casereader that will be read when a procedure is executed on
311 DS. This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
318 /* Returns true if DS has a data source, false otherwise. */
320 dataset_has_source (const struct dataset *ds)
322 return dataset_source (ds) != NULL;
325 /* Replaces the active dataset's data by READER. READER's cases must have an
326 appropriate format for DS's dictionary. */
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
330 casereader_destroy (ds->source);
333 caseinit_clear (ds->caseinit);
334 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
336 return reader == NULL || !casereader_error (reader);
339 /* Returns the data source from DS and removes it from DS. Returns a null
340 pointer if DS has no data source. */
342 dataset_steal_source (struct dataset *ds)
344 struct casereader *reader = ds->source;
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
353 dict_delete_vars (ds->dict, vars, n);
357 struct case_map *map = case_map_to_compact_dict (ds->d, 0);
358 ds->source = case_map_create_input_translator (map, ds->source);
360 dict_compact_values (ds->dict);
363 /* Returns a number unique to DS. It can be used to distinguish one dataset
364 from any other within a given program run, even datasets that do not exist
367 dataset_seqno (const struct dataset *ds)
373 dataset_set_callbacks (struct dataset *ds,
374 const struct dataset_callbacks *callbacks,
377 ds->callbacks = callbacks;
378 ds->cb_data = cb_data;
382 dataset_get_display (const struct dataset *ds)
388 dataset_set_display (struct dataset *ds, enum dataset_display display)
390 ds->display = display;
393 /* Returns the last time the data was read. */
395 time_of_last_procedure (struct dataset *ds)
399 if (ds->last_proc_invocation == 0)
400 update_last_proc_invocation (ds);
401 return ds->last_proc_invocation;
404 /* Regular procedure. */
406 /* Executes any pending transformations, if necessary.
407 This is not identical to the EXECUTE command in that it won't
408 always read the source data. This can be important when the
409 source data is given inline within BEGIN DATA...END FILE. */
411 proc_execute (struct dataset *ds)
415 if ((!ds->temporary || !ds->temporary_trns_chain.n)
416 && !ds->permanent_trns_chain.n)
419 ds->discard_output = false;
420 dict_set_case_limit (ds->dict, 0);
421 dict_clear_vectors (ds->dict);
425 ok = casereader_destroy (proc_open (ds));
426 return proc_commit (ds) && ok;
429 static const struct casereader_class proc_casereader_class;
431 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
432 cases filtered out with FILTER BY will not be included in the casereader
433 (which is usually desirable). If FILTER is false, all cases will be
434 included regardless of FILTER BY settings.
436 proc_commit must be called when done. */
438 proc_open_filtering (struct dataset *ds, bool filter)
440 struct casereader *reader;
442 assert (ds->n_stack == 0);
443 assert (ds->source != NULL);
444 assert (ds->proc_state == PROC_COMMITTED);
446 update_last_proc_invocation (ds);
448 caseinit_mark_for_init (ds->caseinit, ds->dict);
450 /* Finish up the collection of transformations. */
451 add_case_limit_trns (ds);
453 add_filter_trns (ds);
454 if (!proc_in_temporary_transformations (ds))
455 add_measurement_level_trns (ds, ds->dict);
457 /* Make permanent_dict refer to the dictionary right before
458 data reaches the sink. */
459 if (ds->permanent_dict == NULL)
460 ds->permanent_dict = ds->dict;
463 if (!ds->discard_output)
465 struct dictionary *pd = ds->permanent_dict;
466 size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
467 if (compacted_n_values < dict_get_next_value_idx (pd))
469 struct caseproto *compacted_proto;
470 compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
471 ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
472 ds->sink = autopaging_writer_create (compacted_proto);
473 caseproto_unref (compacted_proto);
477 ds->compactor = NULL;
478 ds->sink = autopaging_writer_create (dict_get_proto (pd));
483 ds->compactor = NULL;
487 /* Allocate memory for lagged cases. */
488 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
490 ds->proc_state = PROC_OPEN;
491 ds->cases_written = 0;
494 /* FIXME: use taint in dataset in place of `ok'? */
495 /* FIXME: for trivial cases we can just return a clone of
498 /* Create casereader and insert a shim on top. The shim allows us to
499 arbitrarily extend the casereader's lifetime, by slurping the cases into
500 the shim's buffer in proc_commit(). That is especially useful when output
501 table_items are generated directly from the procedure casereader (e.g. by
502 the LIST procedure) when we are using an output driver that keeps a
503 reference to the output items passed to it (e.g. the GUI output driver in
505 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
507 &proc_casereader_class, ds);
508 ds->shim = casereader_shim_insert (reader);
512 /* Opens dataset DS for reading cases with proc_read.
513 proc_commit must be called when done. */
515 proc_open (struct dataset *ds)
517 return proc_open_filtering (ds, true);
520 /* Returns true if a procedure is in progress, that is, if
521 proc_open has been called but proc_commit has not. */
523 proc_is_open (const struct dataset *ds)
525 return ds->proc_state != PROC_COMMITTED;
528 /* "read" function for procedure casereader. */
529 static struct ccase *
530 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
532 struct dataset *ds = ds_;
533 enum trns_result retval = TRNS_DROP_CASE;
536 assert (ds->proc_state == PROC_OPEN);
537 for (; ; case_unref (c))
539 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
540 if (retval == TRNS_ERROR)
545 /* Read a case from source. */
546 c = casereader_read (ds->source);
549 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
550 caseinit_init_vars (ds->caseinit, c);
552 /* Execute permanent transformations. */
553 casenumber case_nr = ds->cases_written + 1;
554 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
555 caseinit_update_left_vars (ds->caseinit, c);
556 if (retval != TRNS_CONTINUE)
559 /* Write case to collection of lagged cases. */
562 while (deque_count (&ds->lag) >= ds->n_lag)
563 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
564 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
567 /* Write case to replacement dataset. */
569 if (ds->sink != NULL)
570 casewriter_write (ds->sink,
571 case_map_execute (ds->compactor, case_ref (c)));
573 /* Execute temporary transformations. */
574 if (ds->temporary_trns_chain.n)
576 retval = trns_chain_execute (&ds->temporary_trns_chain,
577 ds->cases_written, &c);
578 if (retval != TRNS_CONTINUE)
586 /* "destroy" function for procedure casereader. */
588 proc_casereader_destroy (struct casereader *reader, void *ds_)
590 struct dataset *ds = ds_;
593 /* We are always the subreader for a casereader_buffer, so if we're being
594 destroyed then it's because the casereader_buffer has read all the cases
595 that it ever will. */
598 /* Make sure transformations happen for every input case, in
599 case they have side effects, and ensure that the replacement
600 active dataset gets all the cases it should. */
601 while ((c = casereader_read (reader)) != NULL)
604 ds->proc_state = PROC_CLOSED;
605 ds->ok = casereader_destroy (ds->source) && ds->ok;
607 dataset_set_source (ds, NULL);
610 /* Must return false if the source casereader, a transformation,
611 or the sink casewriter signaled an error. (If a temporary
612 transformation signals an error, then the return value is
613 false, but the replacement active dataset may still be
616 proc_commit (struct dataset *ds)
618 if (ds->shim != NULL)
619 casereader_shim_slurp (ds->shim);
621 assert (ds->proc_state == PROC_CLOSED);
622 ds->proc_state = PROC_COMMITTED;
624 dataset_changed__ (ds);
626 /* Free memory for lagged cases. */
627 while (!deque_is_empty (&ds->lag))
628 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
629 free (ds->lag_cases);
631 /* Dictionary from before TEMPORARY becomes permanent. */
632 proc_cancel_temporary_transformations (ds);
633 bool ok = proc_cancel_all_transformations (ds) && ds->ok;
635 if (!ds->discard_output)
637 /* Finish compacting. */
638 if (ds->compactor != NULL)
640 case_map_destroy (ds->compactor);
641 ds->compactor = NULL;
643 dict_delete_scratch_vars (ds->dict);
644 dict_compact_values (ds->dict);
647 /* Old data sink becomes new data source. */
648 if (ds->sink != NULL)
649 ds->source = casewriter_make_reader (ds->sink);
654 ds->discard_output = false;
658 caseinit_clear (ds->caseinit);
659 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
661 dict_clear_vectors (ds->dict);
662 ds->permanent_dict = NULL;
666 /* Casereader class for procedure execution. */
667 static const struct casereader_class proc_casereader_class =
669 proc_casereader_read,
670 proc_casereader_destroy,
675 /* Updates last_proc_invocation. */
677 update_last_proc_invocation (struct dataset *ds)
679 ds->last_proc_invocation = time (NULL);
682 /* Returns a pointer to the lagged case from N_BEFORE cases before the
683 current one, or NULL if there haven't been that many cases yet. */
685 lagged_case (const struct dataset *ds, int n_before)
687 assert (n_before >= 1);
688 assert (n_before <= ds->n_lag);
690 if (n_before <= deque_count (&ds->lag))
691 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
696 /* Adds TRNS to the current set of transformations. */
698 add_transformation (struct dataset *ds,
699 const struct trns_class *class, void *aux)
701 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
702 : ds->temporary ? &ds->temporary_trns_chain
703 : &ds->permanent_trns_chain);
704 struct transformation t = { .class = class, .aux = aux };
705 trns_chain_append (chain, &t);
706 dataset_transformations_changed__ (ds, true);
709 /* Returns true if the next call to add_transformation() will add
710 a temporary transformation, false if it will add a permanent
713 proc_in_temporary_transformations (const struct dataset *ds)
715 return ds->temporary;
718 /* Marks the start of temporary transformations.
719 Further calls to add_transformation() will add temporary
722 proc_start_temporary_transformations (struct dataset *ds)
724 assert (!ds->n_stack);
725 if (!proc_in_temporary_transformations (ds))
727 add_case_limit_trns (ds);
729 ds->permanent_dict = dict_clone (ds->dict);
730 add_measurement_level_trns (ds, ds->permanent_dict);
732 ds->temporary = true;
733 dataset_transformations_changed__ (ds, true);
737 /* Converts all the temporary transformations, if any, to permanent
738 transformations. Further transformations will be permanent.
740 The FILTER command is implemented as a temporary transformation, so a
741 procedure that uses this function should usually use proc_open_filtering()
742 with FILTER false, instead of plain proc_open().
744 Returns true if anything changed, false otherwise. */
746 proc_make_temporary_transformations_permanent (struct dataset *ds)
748 if (proc_in_temporary_transformations (ds))
750 cancel_measurement_level_trns (&ds->permanent_trns_chain);
751 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
753 ds->temporary = false;
755 dict_unref (ds->permanent_dict);
756 ds->permanent_dict = NULL;
764 /* Cancels all temporary transformations, if any. Further
765 transformations will be permanent.
766 Returns true if anything changed, false otherwise. */
768 proc_cancel_temporary_transformations (struct dataset *ds)
770 if (proc_in_temporary_transformations (ds))
772 trns_chain_clear (&ds->temporary_trns_chain);
774 dict_unref (ds->dict);
775 ds->dict = ds->permanent_dict;
776 ds->permanent_dict = NULL;
778 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
785 /* Cancels all transformations, if any.
786 Returns true if successful, false on I/O error. */
788 proc_cancel_all_transformations (struct dataset *ds)
791 assert (ds->proc_state == PROC_COMMITTED);
792 ok = trns_chain_clear (&ds->permanent_trns_chain);
793 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
794 ds->temporary = false;
795 for (size_t i = 0; i < ds->n_stack; i++)
796 ok = trns_chain_uninit (&ds->stack[i]) && ok;
798 dataset_transformations_changed__ (ds, false);
804 proc_push_transformations (struct dataset *ds)
806 if (ds->n_stack >= ds->allocated_stack)
807 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
809 trns_chain_init (&ds->stack[ds->n_stack++]);
813 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
815 assert (ds->n_stack > 0);
816 *chain = ds->stack[--ds->n_stack];
819 static enum trns_result
820 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
822 struct variable *var = var_;
824 *cc = case_unshare (*cc);
825 *case_num_rw (*cc, var) = case_num;
827 return TRNS_CONTINUE;
830 /* Add a variable which we can sort by to get back the original order. */
832 add_permanent_ordering_transformation (struct dataset *ds)
834 struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
835 struct variable *order_var
836 = (proc_in_temporary_transformations (ds)
837 ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
840 static const struct trns_class trns_class = {
842 .execute = store_case_num
844 const struct transformation t = { .class = &trns_class, .aux = order_var };
845 trns_chain_append (&ds->permanent_trns_chain, &t);
850 /* Causes output from the next procedure to be discarded, instead
851 of being preserved for use as input for the next procedure. */
853 proc_discard_output (struct dataset *ds)
855 ds->discard_output = true;
859 /* Checks whether DS has a corrupted active dataset. If so,
860 discards it and returns false. If not, returns true without
863 dataset_end_of_command (struct dataset *ds)
865 if (ds->source != NULL)
867 if (casereader_error (ds->source))
874 const struct taint *taint = casereader_get_taint (ds->source);
875 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
876 assert (!taint_has_tainted_successor (taint));
882 /* Limits the maximum number of cases processed to
884 static enum trns_result
885 case_limit_trns_proc (void *cases_remaining_,
886 struct ccase **c UNUSED, casenumber case_nr UNUSED)
888 size_t *cases_remaining = cases_remaining_;
889 if (*cases_remaining > 0)
891 (*cases_remaining)--;
892 return TRNS_CONTINUE;
895 return TRNS_DROP_CASE;
898 /* Frees the data associated with a case limit transformation. */
900 case_limit_trns_free (void *cases_remaining_)
902 size_t *cases_remaining = cases_remaining_;
903 free (cases_remaining);
907 /* Adds a transformation that limits the number of cases that may
908 pass through, if DS->DICT has a case limit. */
910 add_case_limit_trns (struct dataset *ds)
912 casenumber case_limit = dict_get_case_limit (ds->dict);
915 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
916 *cases_remaining = case_limit;
918 static const struct trns_class trns_class = {
919 .name = "case limit",
920 .execute = case_limit_trns_proc,
921 .destroy = case_limit_trns_free,
923 add_transformation (ds, &trns_class, cases_remaining);
925 dict_set_case_limit (ds->dict, 0);
930 /* FILTER transformation. */
931 static enum trns_result
932 filter_trns_proc (void *filter_var_,
933 struct ccase **c, casenumber case_nr UNUSED)
936 struct variable *filter_var = filter_var_;
937 double f = case_num (*c, filter_var);
938 return (f != 0.0 && !var_is_num_missing (filter_var, f)
939 ? TRNS_CONTINUE : TRNS_DROP_CASE);
942 /* Adds a temporary transformation to filter data according to
943 the variable specified on FILTER, if any. */
945 add_filter_trns (struct dataset *ds)
947 struct variable *filter_var = dict_get_filter (ds->dict);
948 if (filter_var != NULL)
950 proc_start_temporary_transformations (ds);
952 static const struct trns_class trns_class = {
954 .execute = filter_trns_proc,
956 add_transformation (ds, &trns_class, filter_var);
961 dataset_need_lag (struct dataset *ds, int n_before)
963 ds->n_lag = MAX (ds->n_lag, n_before);
966 /* Measurement guesser, for guessing a measurement level from formats and
971 struct hmap_node hmap_node;
977 struct variable *var;
982 mg_var_uninit (struct mg_var *mgv)
984 struct mg_value *mgvalue, *next;
985 HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
988 hmap_delete (mgv->values, &mgvalue->hmap_node);
991 hmap_destroy (mgv->values);
996 mg_var_interpret (const struct mg_var *mgv)
998 size_t n = hmap_count (mgv->values);
1001 /* All missing (or no data). */
1002 return MEASURE_NOMINAL;
1005 const struct mg_value *mgvalue;
1006 HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1008 if (mgvalue->value < 10)
1009 return MEASURE_NOMINAL;
1010 return MEASURE_SCALE;
1014 mg_var_add_value (struct mg_var *mgv, double value)
1016 if (var_is_num_missing (mgv->var, value))
1017 return MEASURE_UNKNOWN;
1018 else if (value < 0 || value != floor (value))
1019 return MEASURE_SCALE;
1021 size_t hash = hash_double (value, 0);
1022 struct mg_value *mgvalue;
1023 HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1025 if (mgvalue->value == value)
1026 return MEASURE_UNKNOWN;
1028 mgvalue = xmalloc (sizeof *mgvalue);
1029 mgvalue->value = value;
1030 hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1031 if (hmap_count (mgv->values) >= settings_get_scalemin ())
1032 return MEASURE_SCALE;
1034 return MEASURE_UNKNOWN;
1037 struct measure_guesser
1039 struct mg_var *vars;
1043 static struct measure_guesser *
1044 measure_guesser_create__ (struct dictionary *dict)
1046 struct mg_var *mgvs = NULL;
1048 size_t allocated_mgvs = 0;
1050 for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1052 struct variable *var = dict_get_var (dict, i);
1053 if (var_get_measure (var) != MEASURE_UNKNOWN)
1056 const struct fmt_spec *f = var_get_print_format (var);
1057 enum measure m = var_default_measure_for_format (f->type);
1058 if (m != MEASURE_UNKNOWN)
1060 var_set_measure (var, m);
1064 if (n_mgvs >= allocated_mgvs)
1065 mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1067 struct mg_var *mgv = &mgvs[n_mgvs++];
1068 *mgv = (struct mg_var) {
1070 .values = xmalloc (sizeof *mgv->values),
1072 hmap_init (mgv->values);
1077 struct measure_guesser *mg = xmalloc (sizeof *mg);
1078 *mg = (struct measure_guesser) {
1085 /* Scans through DS's dictionary for variables that have an unknown measurement
1086 level. For those, if the measurement level can be guessed based on the
1087 variable's type and format, sets a default. If that's enough, returns NULL.
1088 If any remain whose levels are unknown and can't be guessed that way,
1089 creates and returns a structure that the caller should pass to
1090 measure_guesser_add_case() or measure_guesser_run() for guessing a
1091 measurement level based on the data. */
1092 struct measure_guesser *
1093 measure_guesser_create (struct dataset *ds)
1095 return measure_guesser_create__ (dataset_dict (ds));
1098 /* Adds data from case C to MG. */
1100 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1102 for (size_t i = 0; i < mg->n_vars; )
1104 struct mg_var *mgv = &mg->vars[i];
1105 double value = case_num (c, mgv->var);
1106 enum measure m = mg_var_add_value (mgv, value);
1107 if (m != MEASURE_UNKNOWN)
1109 var_set_measure (mgv->var, m);
1111 mg_var_uninit (mgv);
1112 *mgv = mg->vars[--mg->n_vars];
1121 measure_guesser_destroy (struct measure_guesser *mg)
1126 for (size_t i = 0; i < mg->n_vars; i++)
1128 struct mg_var *mgv = &mg->vars[i];
1129 var_set_measure (mgv->var, mg_var_interpret (mgv));
1130 mg_var_uninit (mgv);
1136 /* Adds final measurement levels based on MG, after all the cases have been
1139 measure_guesser_commit (struct measure_guesser *mg)
1141 for (size_t i = 0; i < mg->n_vars; i++)
1143 struct mg_var *mgv = &mg->vars[i];
1144 var_set_measure (mgv->var, mg_var_interpret (mgv));
1148 /* Passes the cases in READER through MG and uses the data in the cases to set
1149 measurement levels for the variables where they were still unknown. */
1151 measure_guesser_run (struct measure_guesser *mg,
1152 const struct casereader *reader)
1154 struct casereader *r = casereader_clone (reader);
1155 while (mg->n_vars > 0)
1157 struct ccase *c = casereader_read (r);
1160 measure_guesser_add_case (mg, c);
1163 casereader_destroy (r);
1165 measure_guesser_commit (mg);
1168 /* A transformation for guessing measurement levels. */
1170 static enum trns_result
1171 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1173 struct measure_guesser *mg = mg_;
1174 measure_guesser_add_case (mg, *c);
1175 return TRNS_CONTINUE;
1179 mg_trns_free (void *mg_)
1181 struct measure_guesser *mg = mg_;
1182 measure_guesser_commit (mg);
1183 measure_guesser_destroy (mg);
1187 static const struct trns_class mg_trns_class = {
1188 .name = "add measurement level",
1189 .execute = mg_trns_proc,
1190 .destroy = mg_trns_free,
1194 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1196 struct measure_guesser *mg = measure_guesser_create__ (dict);
1198 add_transformation (ds, &mg_trns_class, mg);
1202 cancel_measurement_level_trns (struct trns_chain *chain)
1207 struct transformation *trns = &chain->xforms[chain->n - 1];
1208 if (trns->class != &mg_trns_class)
1211 struct measure_guesser *mg = trns->aux;
1212 measure_guesser_destroy (mg);
1217 dataset_changed__ (struct dataset *ds)
1219 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1220 ds->callbacks->changed (ds->cb_data);
1224 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1226 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1227 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1230 /* Private interface for use by session code. */
1233 dataset_set_session__ (struct dataset *ds, struct session *session)
1235 ds->session = session;