1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
50 /* A dataset is usually part of a session. Within a session its name must
51 unique. The name must either be a valid PSPP identifier or the empty
52 string. (It must be unique within the session even if it is the empty
53 string; that is, there may only be a single dataset within a session with
54 the empty string as its name.) */
55 struct session *session;
57 enum dataset_display display;
59 /* Cases are read from source,
60 their transformation variables are initialized,
61 pass through permanent_trns_chain (which transforms them into
62 the format described by permanent_dict),
64 pass through temporary_trns_chain (which transforms them into
65 the format described by dict),
66 and are finally passed to the procedure. */
67 struct casereader *source;
68 struct caseinit *caseinit;
69 struct trns_chain permanent_trns_chain;
70 struct dictionary *permanent_dict;
71 struct casewriter *sink;
72 struct trns_chain temporary_trns_chain;
74 struct dictionary *dict;
76 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77 struct trns_chain *stack;
79 size_t allocated_stack;
81 /* If true, cases are discarded instead of being written to
85 /* The case map used to compact a case, if necessary;
86 otherwise a null pointer. */
87 struct case_map *compactor;
89 /* Time at which proc was last invoked. */
90 time_t last_proc_invocation;
92 /* Cases just before ("lagging") the current one. */
93 int n_lag; /* Number of cases to lag. */
94 struct deque lag; /* Deque of lagged cases. */
95 struct ccase **lag_cases; /* Lagged cases managed by deque. */
100 PROC_COMMITTED, /* No procedure in progress. */
101 PROC_OPEN, /* proc_open called, casereader still open. */
102 PROC_CLOSED /* casereader from proc_open destroyed,
103 but proc_commit not yet called. */
106 casenumber cases_written; /* Cases output so far. */
107 bool ok; /* Error status. */
108 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
110 const struct dataset_callbacks *callbacks;
113 /* Uniquely distinguishes datasets. */
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
126 static void update_last_proc_invocation (struct dataset *ds);
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
131 struct dataset *ds = ds_;
132 dataset_changed__ (ds);
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
138 static unsigned int seqno;
140 dict_set_change_callback (ds->dict, dict_callback, ds);
141 proc_cancel_all_transformations (ds);
142 dataset_set_session (ds, session);
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
147 SESSION already contains a dataset named NAME, it is deleted and replaced.
148 The dataset initially has an empty dictionary and no data source. */
150 dataset_create (struct session *session, const char *name)
152 struct dataset *ds = XMALLOC (struct dataset);
153 *ds = (struct dataset) {
154 .name = xstrdup (name),
155 .display = DATASET_FRONT,
156 .dict = dict_create (get_default_encoding ()),
157 .caseinit = caseinit_create (),
159 dataset_create_finish__ (ds, session);
164 /* Creates and returns a new dataset that has the same data and dictionary as
165 OLD named NAME, adds it to the same session as OLD, and returns the new
166 dataset. If SESSION already contains a dataset named NAME, it is deleted
169 OLD must not have any active transformations or temporary state and must
170 not be in the middle of a procedure.
172 Callbacks are not cloned. */
174 dataset_clone (struct dataset *old, const char *name)
178 assert (old->proc_state == PROC_COMMITTED);
179 assert (!old->permanent_trns_chain.n);
180 assert (old->permanent_dict == NULL);
181 assert (old->sink == NULL);
182 assert (!old->temporary);
183 assert (!old->temporary_trns_chain.n);
184 assert (!old->n_stack);
186 new = xzalloc (sizeof *new);
187 new->name = xstrdup (name);
188 new->display = DATASET_FRONT;
189 new->source = casereader_clone (old->source);
190 new->dict = dict_clone (old->dict);
191 new->caseinit = caseinit_clone (old->caseinit);
192 new->last_proc_invocation = old->last_proc_invocation;
195 dataset_create_finish__ (new, old->session);
202 dataset_destroy (struct dataset *ds)
206 dataset_set_session (ds, NULL);
208 dict_unref (ds->dict);
209 dict_unref (ds->permanent_dict);
210 caseinit_destroy (ds->caseinit);
211 trns_chain_uninit (&ds->permanent_trns_chain);
212 for (size_t i = 0; i < ds->n_stack; i++)
213 trns_chain_uninit (&ds->stack[i]);
215 dataset_transformations_changed__ (ds, false);
221 /* Discards the active dataset's dictionary, data, and transformations. */
223 dataset_clear (struct dataset *ds)
225 assert (ds->proc_state == PROC_COMMITTED);
227 dict_clear (ds->dict);
228 fh_set_default_handle (NULL);
232 casereader_destroy (ds->source);
235 proc_cancel_all_transformations (ds);
239 dataset_name (const struct dataset *ds)
245 dataset_set_name (struct dataset *ds, const char *name)
247 struct session *session = ds->session;
252 active = session_active_dataset (session) == ds;
254 session_set_active_dataset (session, NULL);
255 dataset_set_session (ds, NULL);
259 ds->name = xstrdup (name);
263 dataset_set_session (ds, session);
265 session_set_active_dataset (session, ds);
270 dataset_session (const struct dataset *ds)
276 dataset_set_session (struct dataset *ds, struct session *session)
278 if (session != ds->session)
280 if (ds->session != NULL)
281 session_remove_dataset (ds->session, ds);
283 session_add_dataset (session, ds);
287 /* Returns the dictionary within DS. This is always nonnull, although it
288 might not contain any variables. */
290 dataset_dict (const struct dataset *ds)
295 /* Replaces DS's dictionary by DICT, discarding any source and
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
300 assert (ds->proc_state == PROC_COMMITTED);
301 assert (ds->dict != dict);
305 dict_unref (ds->dict);
307 dict_set_change_callback (ds->dict, dict_callback, ds);
310 /* Returns the casereader that will be read when a procedure is executed on
311 DS. This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
318 /* Returns true if DS has a data source, false otherwise. */
320 dataset_has_source (const struct dataset *ds)
322 return dataset_source (ds) != NULL;
325 /* Replaces the active dataset's data by READER. READER's cases must have an
326 appropriate format for DS's dictionary. */
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
330 casereader_destroy (ds->source);
333 caseinit_clear (ds->caseinit);
334 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
336 return reader == NULL || !casereader_error (reader);
339 /* Returns the data source from DS and removes it from DS. Returns a null
340 pointer if DS has no data source. */
342 dataset_steal_source (struct dataset *ds)
344 struct casereader *reader = ds->source;
350 /* Returns a number unique to DS. It can be used to distinguish one dataset
351 from any other within a given program run, even datasets that do not exist
354 dataset_seqno (const struct dataset *ds)
360 dataset_set_callbacks (struct dataset *ds,
361 const struct dataset_callbacks *callbacks,
364 ds->callbacks = callbacks;
365 ds->cb_data = cb_data;
369 dataset_get_display (const struct dataset *ds)
375 dataset_set_display (struct dataset *ds, enum dataset_display display)
377 ds->display = display;
380 /* Returns the last time the data was read. */
382 time_of_last_procedure (struct dataset *ds)
386 if (ds->last_proc_invocation == 0)
387 update_last_proc_invocation (ds);
388 return ds->last_proc_invocation;
391 /* Regular procedure. */
393 /* Executes any pending transformations, if necessary.
394 This is not identical to the EXECUTE command in that it won't
395 always read the source data. This can be important when the
396 source data is given inline within BEGIN DATA...END FILE. */
398 proc_execute (struct dataset *ds)
402 if ((!ds->temporary || !ds->temporary_trns_chain.n)
403 && !ds->permanent_trns_chain.n)
406 ds->discard_output = false;
407 dict_set_case_limit (ds->dict, 0);
408 dict_clear_vectors (ds->dict);
412 ok = casereader_destroy (proc_open (ds));
413 return proc_commit (ds) && ok;
416 static const struct casereader_class proc_casereader_class;
418 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
419 cases filtered out with FILTER BY will not be included in the casereader
420 (which is usually desirable). If FILTER is false, all cases will be
421 included regardless of FILTER BY settings.
423 proc_commit must be called when done. */
425 proc_open_filtering (struct dataset *ds, bool filter)
427 struct casereader *reader;
429 assert (ds->n_stack == 0);
430 assert (ds->source != NULL);
431 assert (ds->proc_state == PROC_COMMITTED);
433 update_last_proc_invocation (ds);
435 caseinit_mark_for_init (ds->caseinit, ds->dict);
437 /* Finish up the collection of transformations. */
438 add_case_limit_trns (ds);
440 add_filter_trns (ds);
441 if (!proc_in_temporary_transformations (ds))
442 add_measurement_level_trns (ds, ds->dict);
444 /* Make permanent_dict refer to the dictionary right before
445 data reaches the sink. */
446 if (ds->permanent_dict == NULL)
447 ds->permanent_dict = ds->dict;
450 if (!ds->discard_output)
452 struct dictionary *pd = ds->permanent_dict;
453 size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
454 if (compacted_n_values < dict_get_next_value_idx (pd))
456 struct caseproto *compacted_proto;
457 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
458 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
459 ds->sink = autopaging_writer_create (compacted_proto);
460 caseproto_unref (compacted_proto);
464 ds->compactor = NULL;
465 ds->sink = autopaging_writer_create (dict_get_proto (pd));
470 ds->compactor = NULL;
474 /* Allocate memory for lagged cases. */
475 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
477 ds->proc_state = PROC_OPEN;
478 ds->cases_written = 0;
481 /* FIXME: use taint in dataset in place of `ok'? */
482 /* FIXME: for trivial cases we can just return a clone of
485 /* Create casereader and insert a shim on top. The shim allows us to
486 arbitrarily extend the casereader's lifetime, by slurping the cases into
487 the shim's buffer in proc_commit(). That is especially useful when output
488 table_items are generated directly from the procedure casereader (e.g. by
489 the LIST procedure) when we are using an output driver that keeps a
490 reference to the output items passed to it (e.g. the GUI output driver in
492 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
494 &proc_casereader_class, ds);
495 ds->shim = casereader_shim_insert (reader);
499 /* Opens dataset DS for reading cases with proc_read.
500 proc_commit must be called when done. */
502 proc_open (struct dataset *ds)
504 return proc_open_filtering (ds, true);
507 /* Returns true if a procedure is in progress, that is, if
508 proc_open has been called but proc_commit has not. */
510 proc_is_open (const struct dataset *ds)
512 return ds->proc_state != PROC_COMMITTED;
515 /* "read" function for procedure casereader. */
516 static struct ccase *
517 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
519 struct dataset *ds = ds_;
520 enum trns_result retval = TRNS_DROP_CASE;
523 assert (ds->proc_state == PROC_OPEN);
524 for (; ; case_unref (c))
526 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
527 if (retval == TRNS_ERROR)
532 /* Read a case from source. */
533 c = casereader_read (ds->source);
536 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
537 caseinit_init_vars (ds->caseinit, c);
539 /* Execute permanent transformations. */
540 casenumber case_nr = ds->cases_written + 1;
541 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
542 caseinit_update_left_vars (ds->caseinit, c);
543 if (retval != TRNS_CONTINUE)
546 /* Write case to collection of lagged cases. */
549 while (deque_count (&ds->lag) >= ds->n_lag)
550 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
551 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
554 /* Write case to replacement dataset. */
556 if (ds->sink != NULL)
557 casewriter_write (ds->sink,
558 case_map_execute (ds->compactor, case_ref (c)));
560 /* Execute temporary transformations. */
561 if (ds->temporary_trns_chain.n)
563 retval = trns_chain_execute (&ds->temporary_trns_chain,
564 ds->cases_written, &c);
565 if (retval != TRNS_CONTINUE)
573 /* "destroy" function for procedure casereader. */
575 proc_casereader_destroy (struct casereader *reader, void *ds_)
577 struct dataset *ds = ds_;
580 /* We are always the subreader for a casereader_buffer, so if we're being
581 destroyed then it's because the casereader_buffer has read all the cases
582 that it ever will. */
585 /* Make sure transformations happen for every input case, in
586 case they have side effects, and ensure that the replacement
587 active dataset gets all the cases it should. */
588 while ((c = casereader_read (reader)) != NULL)
591 ds->proc_state = PROC_CLOSED;
592 ds->ok = casereader_destroy (ds->source) && ds->ok;
594 dataset_set_source (ds, NULL);
597 /* Must return false if the source casereader, a transformation,
598 or the sink casewriter signaled an error. (If a temporary
599 transformation signals an error, then the return value is
600 false, but the replacement active dataset may still be
603 proc_commit (struct dataset *ds)
605 if (ds->shim != NULL)
606 casereader_shim_slurp (ds->shim);
608 assert (ds->proc_state == PROC_CLOSED);
609 ds->proc_state = PROC_COMMITTED;
611 dataset_changed__ (ds);
613 /* Free memory for lagged cases. */
614 while (!deque_is_empty (&ds->lag))
615 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
616 free (ds->lag_cases);
618 /* Dictionary from before TEMPORARY becomes permanent. */
619 proc_cancel_temporary_transformations (ds);
620 bool ok = proc_cancel_all_transformations (ds) && ds->ok;
622 if (!ds->discard_output)
624 /* Finish compacting. */
625 if (ds->compactor != NULL)
627 case_map_destroy (ds->compactor);
628 ds->compactor = NULL;
630 dict_delete_scratch_vars (ds->dict);
631 dict_compact_values (ds->dict);
634 /* Old data sink becomes new data source. */
635 if (ds->sink != NULL)
636 ds->source = casewriter_make_reader (ds->sink);
641 ds->discard_output = false;
645 caseinit_clear (ds->caseinit);
646 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
648 dict_clear_vectors (ds->dict);
649 ds->permanent_dict = NULL;
653 /* Casereader class for procedure execution. */
654 static const struct casereader_class proc_casereader_class =
656 proc_casereader_read,
657 proc_casereader_destroy,
662 /* Updates last_proc_invocation. */
664 update_last_proc_invocation (struct dataset *ds)
666 ds->last_proc_invocation = time (NULL);
669 /* Returns a pointer to the lagged case from N_BEFORE cases before the
670 current one, or NULL if there haven't been that many cases yet. */
672 lagged_case (const struct dataset *ds, int n_before)
674 assert (n_before >= 1);
675 assert (n_before <= ds->n_lag);
677 if (n_before <= deque_count (&ds->lag))
678 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
683 /* Adds TRNS to the current set of transformations. */
685 add_transformation (struct dataset *ds,
686 const struct trns_class *class, void *aux)
688 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
689 : ds->temporary ? &ds->temporary_trns_chain
690 : &ds->permanent_trns_chain);
691 struct transformation t = { .class = class, .aux = aux };
692 trns_chain_append (chain, &t);
693 dataset_transformations_changed__ (ds, true);
696 /* Returns true if the next call to add_transformation() will add
697 a temporary transformation, false if it will add a permanent
700 proc_in_temporary_transformations (const struct dataset *ds)
702 return ds->temporary;
705 /* Marks the start of temporary transformations.
706 Further calls to add_transformation() will add temporary
709 proc_start_temporary_transformations (struct dataset *ds)
711 assert (!ds->n_stack);
712 if (!proc_in_temporary_transformations (ds))
714 add_case_limit_trns (ds);
716 ds->permanent_dict = dict_clone (ds->dict);
717 add_measurement_level_trns (ds, ds->permanent_dict);
719 ds->temporary = true;
720 dataset_transformations_changed__ (ds, true);
724 /* Converts all the temporary transformations, if any, to permanent
725 transformations. Further transformations will be permanent.
727 The FILTER command is implemented as a temporary transformation, so a
728 procedure that uses this function should usually use proc_open_filtering()
729 with FILTER false, instead of plain proc_open().
731 Returns true if anything changed, false otherwise. */
733 proc_make_temporary_transformations_permanent (struct dataset *ds)
735 if (proc_in_temporary_transformations (ds))
737 cancel_measurement_level_trns (&ds->permanent_trns_chain);
738 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
740 ds->temporary = false;
742 dict_unref (ds->permanent_dict);
743 ds->permanent_dict = NULL;
751 /* Cancels all temporary transformations, if any. Further
752 transformations will be permanent.
753 Returns true if anything changed, false otherwise. */
755 proc_cancel_temporary_transformations (struct dataset *ds)
757 if (proc_in_temporary_transformations (ds))
759 trns_chain_clear (&ds->temporary_trns_chain);
762 /* XXX remove measurement level transformation from permanent_trns_chain */
763 dict_unref (ds->dict);
764 ds->dict = ds->permanent_dict;
765 ds->permanent_dict = NULL;
767 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
774 /* Cancels all transformations, if any.
775 Returns true if successful, false on I/O error. */
777 proc_cancel_all_transformations (struct dataset *ds)
780 assert (ds->proc_state == PROC_COMMITTED);
781 ok = trns_chain_clear (&ds->permanent_trns_chain);
782 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
783 ds->temporary = false;
784 for (size_t i = 0; i < ds->n_stack; i++)
785 ok = trns_chain_uninit (&ds->stack[i]) && ok;
787 dataset_transformations_changed__ (ds, false);
793 proc_push_transformations (struct dataset *ds)
795 if (ds->n_stack >= ds->allocated_stack)
796 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
798 trns_chain_init (&ds->stack[ds->n_stack++]);
802 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
804 assert (ds->n_stack > 0);
805 *chain = ds->stack[--ds->n_stack];
808 static enum trns_result
809 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
811 struct variable *var = var_;
813 *cc = case_unshare (*cc);
814 *case_num_rw (*cc, var) = case_num;
816 return TRNS_CONTINUE;
819 /* Add a variable which we can sort by to get back the original order. */
821 add_permanent_ordering_transformation (struct dataset *ds)
823 struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
824 struct variable *order_var
825 = (proc_in_temporary_transformations (ds)
826 ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
829 static const struct trns_class trns_class = {
831 .execute = store_case_num
833 const struct transformation t = { .class = &trns_class, .aux = order_var };
834 trns_chain_append (&ds->permanent_trns_chain, &t);
839 /* Causes output from the next procedure to be discarded, instead
840 of being preserved for use as input for the next procedure. */
842 proc_discard_output (struct dataset *ds)
844 ds->discard_output = true;
848 /* Checks whether DS has a corrupted active dataset. If so,
849 discards it and returns false. If not, returns true without
852 dataset_end_of_command (struct dataset *ds)
854 if (ds->source != NULL)
856 if (casereader_error (ds->source))
863 const struct taint *taint = casereader_get_taint (ds->source);
864 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
865 assert (!taint_has_tainted_successor (taint));
871 /* Limits the maximum number of cases processed to
873 static enum trns_result
874 case_limit_trns_proc (void *cases_remaining_,
875 struct ccase **c UNUSED, casenumber case_nr UNUSED)
877 size_t *cases_remaining = cases_remaining_;
878 if (*cases_remaining > 0)
880 (*cases_remaining)--;
881 return TRNS_CONTINUE;
884 return TRNS_DROP_CASE;
887 /* Frees the data associated with a case limit transformation. */
889 case_limit_trns_free (void *cases_remaining_)
891 size_t *cases_remaining = cases_remaining_;
892 free (cases_remaining);
896 /* Adds a transformation that limits the number of cases that may
897 pass through, if DS->DICT has a case limit. */
899 add_case_limit_trns (struct dataset *ds)
901 casenumber case_limit = dict_get_case_limit (ds->dict);
904 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
905 *cases_remaining = case_limit;
907 static const struct trns_class trns_class = {
908 .name = "case limit",
909 .execute = case_limit_trns_proc,
910 .destroy = case_limit_trns_free,
912 add_transformation (ds, &trns_class, cases_remaining);
914 dict_set_case_limit (ds->dict, 0);
919 /* FILTER transformation. */
920 static enum trns_result
921 filter_trns_proc (void *filter_var_,
922 struct ccase **c, casenumber case_nr UNUSED)
925 struct variable *filter_var = filter_var_;
926 double f = case_num (*c, filter_var);
927 return (f != 0.0 && !var_is_num_missing (filter_var, f)
928 ? TRNS_CONTINUE : TRNS_DROP_CASE);
931 /* Adds a temporary transformation to filter data according to
932 the variable specified on FILTER, if any. */
934 add_filter_trns (struct dataset *ds)
936 struct variable *filter_var = dict_get_filter (ds->dict);
937 if (filter_var != NULL)
939 proc_start_temporary_transformations (ds);
941 static const struct trns_class trns_class = {
943 .execute = filter_trns_proc,
945 add_transformation (ds, &trns_class, filter_var);
950 dataset_need_lag (struct dataset *ds, int n_before)
952 ds->n_lag = MAX (ds->n_lag, n_before);
955 /* Transformation for adding measurement level. */
957 struct measurement_level_value
959 struct hmap_node hmap_node;
963 struct measurement_level_var
965 struct variable *var;
970 add_measurement_level_var_uninit (struct measurement_level_var *mlv)
972 struct measurement_level_value *mlvalue, *next;
973 HMAP_FOR_EACH_SAFE (mlvalue, next, struct measurement_level_value, hmap_node,
976 hmap_delete (mlv->values, &mlvalue->hmap_node);
979 hmap_destroy (mlv->values);
984 add_measurement_level_var_interpret (const struct measurement_level_var *mlv)
986 size_t n = hmap_count (mlv->values);
989 /* All missing (or no data). */
990 return MEASURE_NOMINAL;
993 const struct measurement_level_value *mlvalue;
994 HMAP_FOR_EACH (mlvalue, struct measurement_level_value, hmap_node,
996 if (mlvalue->value < 10)
997 return MEASURE_NOMINAL;
998 return MEASURE_SCALE;
1001 struct measurement_level_trns
1003 struct measurement_level_var *vars;
1008 add_measurement_level_trns_proc__ (struct measurement_level_var *mlv, double value)
1010 if (var_is_num_missing (mlv->var, value))
1011 return MEASURE_UNKNOWN;
1012 else if (value < 0 || value != floor (value))
1013 return MEASURE_SCALE;
1015 size_t hash = hash_double (value, 0);
1016 struct measurement_level_value *mlvalue;
1017 HMAP_FOR_EACH_WITH_HASH (mlvalue, struct measurement_level_value, hmap_node,
1019 if (mlvalue->value == value)
1020 return MEASURE_UNKNOWN;
1022 mlvalue = xmalloc (sizeof *mlvalue);
1023 mlvalue->value = value;
1024 hmap_insert (mlv->values, &mlvalue->hmap_node, hash);
1025 if (hmap_count (mlv->values) >= settings_get_scalemin ())
1026 return MEASURE_SCALE;
1028 return MEASURE_UNKNOWN;
1031 static enum trns_result
1032 add_measurement_level_trns_proc (void *mlt_, struct ccase **c,
1033 casenumber case_nr UNUSED)
1035 struct measurement_level_trns *mlt = mlt_;
1036 for (size_t i = 0; i < mlt->n_vars; )
1038 struct measurement_level_var *mlv = &mlt->vars[i];
1039 double value = case_num (*c, mlv->var);
1040 enum measure m = add_measurement_level_trns_proc__ (mlv, value);
1041 if (m != MEASURE_UNKNOWN)
1043 var_set_measure (mlv->var, m);
1045 add_measurement_level_var_uninit (mlv);
1046 *mlv = mlt->vars[--mlt->n_vars];
1051 return TRNS_CONTINUE;
1055 add_measurement_level_trns_free__ (struct measurement_level_trns *mlt)
1057 for (size_t i = 0; i < mlt->n_vars; i++)
1059 struct measurement_level_var *mlv = &mlt->vars[i];
1060 var_set_measure (mlv->var, add_measurement_level_var_interpret (mlv));
1061 add_measurement_level_var_uninit (mlv);
1068 add_measurement_level_trns_free (void *mlt_)
1070 struct measurement_level_trns *mlt = mlt_;
1071 for (size_t i = 0; i < mlt->n_vars; i++)
1073 struct measurement_level_var *mlv = &mlt->vars[i];
1074 var_set_measure (mlv->var, add_measurement_level_var_interpret (mlv));
1076 add_measurement_level_trns_free__ (mlt);
1080 static const struct trns_class add_measurement_level_trns_class = {
1081 .name = "add measurement level",
1082 .execute = add_measurement_level_trns_proc,
1083 .destroy = add_measurement_level_trns_free,
1087 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1089 struct variable **vars = NULL;
1091 size_t allocated_vars = 0;
1093 for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1095 struct variable *var = dict_get_var (dict, i);
1096 if (var_get_measure (var) != MEASURE_UNKNOWN)
1099 const struct fmt_spec *f = var_get_print_format (var);
1100 enum measure m = var_default_measure_for_format (f->type);
1101 if (m != MEASURE_UNKNOWN)
1103 var_set_measure (var, m);
1107 if (n_vars >= allocated_vars)
1108 vars = x2nrealloc (vars, &allocated_vars, sizeof *vars);
1109 vars[n_vars++] = var;
1115 /* We do this as a second step because otherwise we'd be moving hmaps around,
1116 which doesn't work. */
1117 struct measurement_level_var *mlvs = xmalloc (n_vars * sizeof *mlvs);
1118 for (size_t i = 0; i < n_vars; i++)
1120 mlvs[i].var = vars[i];
1121 mlvs[i].values = xmalloc (sizeof *mlvs[i].values);
1122 hmap_init (mlvs[i].values);
1126 struct measurement_level_trns *mlt = xmalloc (sizeof *mlt);
1127 *mlt = (struct measurement_level_trns) {
1131 add_transformation (ds, &add_measurement_level_trns_class, mlt);
1135 cancel_measurement_level_trns (struct trns_chain *chain)
1140 struct transformation *trns = &chain->xforms[chain->n - 1];
1141 if (trns->class != &add_measurement_level_trns_class)
1144 struct measurement_level_trns *mlt = trns->aux;
1145 add_measurement_level_trns_free__ (mlt);
1150 dataset_changed__ (struct dataset *ds)
1152 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1153 ds->callbacks->changed (ds->cb_data);
1157 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1159 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1160 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1163 /* Private interface for use by session code. */
1166 dataset_set_session__ (struct dataset *ds, struct session *session)
1168 ds->session = session;