1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
50 /* A dataset is usually part of a session. Within a session its name must
51 unique. The name must either be a valid PSPP identifier or the empty
52 string. (It must be unique within the session even if it is the empty
53 string; that is, there may only be a single dataset within a session with
54 the empty string as its name.) */
55 struct session *session;
57 enum dataset_display display;
59 /* Cases are read from source,
60 their transformation variables are initialized,
61 pass through permanent_trns_chain (which transforms them into
62 the format described by permanent_dict),
64 pass through temporary_trns_chain (which transforms them into
65 the format described by dict),
66 and are finally passed to the procedure. */
67 struct casereader *source;
68 struct caseinit *caseinit;
69 struct trns_chain permanent_trns_chain;
70 struct dictionary *permanent_dict;
71 struct casewriter *sink;
72 struct trns_chain temporary_trns_chain;
74 struct dictionary *dict;
76 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77 struct trns_chain *stack;
79 size_t allocated_stack;
81 /* If true, cases are discarded instead of being written to
85 /* The case map used to compact a case, if necessary;
86 otherwise a null pointer. */
87 struct case_map *compactor;
89 /* Time at which proc was last invoked. */
90 time_t last_proc_invocation;
92 /* Cases just before ("lagging") the current one. */
93 int n_lag; /* Number of cases to lag. */
94 struct deque lag; /* Deque of lagged cases. */
95 struct ccase **lag_cases; /* Lagged cases managed by deque. */
100 PROC_COMMITTED, /* No procedure in progress. */
101 PROC_OPEN, /* proc_open called, casereader still open. */
102 PROC_CLOSED /* casereader from proc_open destroyed,
103 but proc_commit not yet called. */
106 casenumber cases_written; /* Cases output so far. */
107 bool ok; /* Error status. */
108 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
110 const struct dataset_callbacks *callbacks;
113 /* Uniquely distinguishes datasets. */
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
126 static void update_last_proc_invocation (struct dataset *ds);
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
131 struct dataset *ds = ds_;
132 dataset_changed__ (ds);
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
138 static unsigned int seqno;
140 dict_set_change_callback (ds->dict, dict_callback, ds);
141 proc_cancel_all_transformations (ds);
142 dataset_set_session (ds, session);
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
147 SESSION already contains a dataset named NAME, it is deleted and replaced.
148 The dataset initially has an empty dictionary and no data source. */
150 dataset_create (struct session *session, const char *name)
152 struct dataset *ds = XMALLOC (struct dataset);
153 *ds = (struct dataset) {
154 .name = xstrdup (name),
155 .display = DATASET_FRONT,
156 .dict = dict_create (get_default_encoding ()),
157 .caseinit = caseinit_create (),
159 dataset_create_finish__ (ds, session);
164 /* Creates and returns a new dataset that has the same data and dictionary as
165 OLD named NAME, adds it to the same session as OLD, and returns the new
166 dataset. If SESSION already contains a dataset named NAME, it is deleted
169 OLD must not have any active transformations or temporary state and must
170 not be in the middle of a procedure.
172 Callbacks are not cloned. */
174 dataset_clone (struct dataset *old, const char *name)
178 assert (old->proc_state == PROC_COMMITTED);
179 assert (!old->permanent_trns_chain.n);
180 assert (old->permanent_dict == NULL);
181 assert (old->sink == NULL);
182 assert (!old->temporary);
183 assert (!old->temporary_trns_chain.n);
184 assert (!old->n_stack);
186 new = xzalloc (sizeof *new);
187 new->name = xstrdup (name);
188 new->display = DATASET_FRONT;
189 new->source = casereader_clone (old->source);
190 new->dict = dict_clone (old->dict);
191 new->caseinit = caseinit_clone (old->caseinit);
192 new->last_proc_invocation = old->last_proc_invocation;
195 dataset_create_finish__ (new, old->session);
202 dataset_destroy (struct dataset *ds)
206 dataset_set_session (ds, NULL);
208 dict_unref (ds->dict);
209 dict_unref (ds->permanent_dict);
210 caseinit_destroy (ds->caseinit);
211 trns_chain_uninit (&ds->permanent_trns_chain);
212 for (size_t i = 0; i < ds->n_stack; i++)
213 trns_chain_uninit (&ds->stack[i]);
215 dataset_transformations_changed__ (ds, false);
221 /* Discards the active dataset's dictionary, data, and transformations. */
223 dataset_clear (struct dataset *ds)
225 assert (ds->proc_state == PROC_COMMITTED);
227 dict_clear (ds->dict);
228 fh_set_default_handle (NULL);
232 casereader_destroy (ds->source);
235 proc_cancel_all_transformations (ds);
239 dataset_name (const struct dataset *ds)
245 dataset_set_name (struct dataset *ds, const char *name)
247 struct session *session = ds->session;
252 active = session_active_dataset (session) == ds;
254 session_set_active_dataset (session, NULL);
255 dataset_set_session (ds, NULL);
259 ds->name = xstrdup (name);
263 dataset_set_session (ds, session);
265 session_set_active_dataset (session, ds);
270 dataset_session (const struct dataset *ds)
276 dataset_set_session (struct dataset *ds, struct session *session)
278 if (session != ds->session)
280 if (ds->session != NULL)
281 session_remove_dataset (ds->session, ds);
283 session_add_dataset (session, ds);
287 /* Returns the dictionary within DS. This is always nonnull, although it
288 might not contain any variables. */
290 dataset_dict (const struct dataset *ds)
295 /* Replaces DS's dictionary by DICT, discarding any source and
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
300 assert (ds->proc_state == PROC_COMMITTED);
301 assert (ds->dict != dict);
305 dict_unref (ds->dict);
307 dict_set_change_callback (ds->dict, dict_callback, ds);
310 /* Returns the casereader that will be read when a procedure is executed on
311 DS. This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
318 /* Returns true if DS has a data source, false otherwise. */
320 dataset_has_source (const struct dataset *ds)
322 return dataset_source (ds) != NULL;
325 /* Replaces the active dataset's data by READER. READER's cases must have an
326 appropriate format for DS's dictionary. */
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
330 casereader_destroy (ds->source);
333 caseinit_clear (ds->caseinit);
334 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
336 return reader == NULL || !casereader_error (reader);
339 /* Returns the data source from DS and removes it from DS. Returns a null
340 pointer if DS has no data source. */
342 dataset_steal_source (struct dataset *ds)
344 struct casereader *reader = ds->source;
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
353 assert (!proc_in_temporary_transformations (ds));
354 assert (!proc_has_transformations (ds));
355 assert (n < dict_get_n_vars (ds->dict));
357 caseinit_mark_for_init (ds->caseinit, ds->dict);
358 ds->source = caseinit_translate_casereader_to_init_vars (
359 ds->caseinit, dict_get_proto (ds->dict), ds->source);
360 caseinit_clear (ds->caseinit);
361 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
363 dict_delete_vars (ds->dict, vars, n);
364 ds->source = case_map_create_input_translator (
365 case_map_to_compact_dict (ds->dict, 0), ds->source);
366 dict_compact_values (ds->dict);
367 caseinit_clear (ds->caseinit);
368 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
371 /* Returns a number unique to DS. It can be used to distinguish one dataset
372 from any other within a given program run, even datasets that do not exist
375 dataset_seqno (const struct dataset *ds)
381 dataset_set_callbacks (struct dataset *ds,
382 const struct dataset_callbacks *callbacks,
385 ds->callbacks = callbacks;
386 ds->cb_data = cb_data;
390 dataset_get_display (const struct dataset *ds)
396 dataset_set_display (struct dataset *ds, enum dataset_display display)
398 ds->display = display;
401 /* Returns the last time the data was read. */
403 time_of_last_procedure (struct dataset *ds)
407 if (ds->last_proc_invocation == 0)
408 update_last_proc_invocation (ds);
409 return ds->last_proc_invocation;
412 /* Regular procedure. */
414 /* Executes any pending transformations, if necessary.
415 This is not identical to the EXECUTE command in that it won't
416 always read the source data. This can be important when the
417 source data is given inline within BEGIN DATA...END FILE. */
419 proc_execute (struct dataset *ds)
423 if ((!ds->temporary || !ds->temporary_trns_chain.n)
424 && !ds->permanent_trns_chain.n)
427 ds->discard_output = false;
428 dict_set_case_limit (ds->dict, 0);
429 dict_clear_vectors (ds->dict);
433 ok = casereader_destroy (proc_open (ds));
434 return proc_commit (ds) && ok;
437 static const struct casereader_class proc_casereader_class;
439 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
440 cases filtered out with FILTER BY will not be included in the casereader
441 (which is usually desirable). If FILTER is false, all cases will be
442 included regardless of FILTER BY settings.
444 proc_commit must be called when done. */
446 proc_open_filtering (struct dataset *ds, bool filter)
448 struct casereader *reader;
450 assert (ds->n_stack == 0);
451 assert (ds->source != NULL);
452 assert (ds->proc_state == PROC_COMMITTED);
454 update_last_proc_invocation (ds);
456 caseinit_mark_for_init (ds->caseinit, ds->dict);
457 ds->source = caseinit_translate_casereader_to_init_vars (
458 ds->caseinit, dict_get_proto (ds->dict), ds->source);
460 /* Finish up the collection of transformations. */
461 add_case_limit_trns (ds);
463 add_filter_trns (ds);
464 if (!proc_in_temporary_transformations (ds))
465 add_measurement_level_trns (ds, ds->dict);
467 /* Make permanent_dict refer to the dictionary right before
468 data reaches the sink. */
469 if (ds->permanent_dict == NULL)
470 ds->permanent_dict = ds->dict;
473 if (!ds->discard_output)
475 struct dictionary *pd = ds->permanent_dict;
476 size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
477 if (compacted_n_values < dict_get_next_value_idx (pd))
479 struct caseproto *compacted_proto;
480 compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
481 ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
482 ds->sink = autopaging_writer_create (compacted_proto);
483 caseproto_unref (compacted_proto);
487 ds->compactor = NULL;
488 ds->sink = autopaging_writer_create (dict_get_proto (pd));
493 ds->compactor = NULL;
497 /* Allocate memory for lagged cases. */
498 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
500 ds->proc_state = PROC_OPEN;
501 ds->cases_written = 0;
504 /* FIXME: use taint in dataset in place of `ok'? */
505 /* FIXME: for trivial cases we can just return a clone of
508 /* Create casereader and insert a shim on top. The shim allows us to
509 arbitrarily extend the casereader's lifetime, by slurping the cases into
510 the shim's buffer in proc_commit(). That is especially useful when output
511 table_items are generated directly from the procedure casereader (e.g. by
512 the LIST procedure) when we are using an output driver that keeps a
513 reference to the output items passed to it (e.g. the GUI output driver in
515 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
517 &proc_casereader_class, ds);
518 ds->shim = casereader_shim_insert (reader);
522 /* Opens dataset DS for reading cases with proc_read.
523 proc_commit must be called when done. */
525 proc_open (struct dataset *ds)
527 return proc_open_filtering (ds, true);
530 /* Returns true if a procedure is in progress, that is, if
531 proc_open has been called but proc_commit has not. */
533 proc_is_open (const struct dataset *ds)
535 return ds->proc_state != PROC_COMMITTED;
538 /* "read" function for procedure casereader. */
539 static struct ccase *
540 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
542 struct dataset *ds = ds_;
543 enum trns_result retval = TRNS_DROP_CASE;
546 assert (ds->proc_state == PROC_OPEN);
547 for (; ; case_unref (c))
549 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
550 if (retval == TRNS_ERROR)
555 /* Read a case from source. */
556 c = casereader_read (ds->source);
559 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
560 caseinit_restore_left_vars (ds->caseinit, c);
562 /* Execute permanent transformations. */
563 casenumber case_nr = ds->cases_written + 1;
564 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
565 caseinit_save_left_vars (ds->caseinit, c);
566 if (retval != TRNS_CONTINUE)
569 /* Write case to collection of lagged cases. */
572 while (deque_count (&ds->lag) >= ds->n_lag)
573 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
574 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
577 /* Write case to replacement dataset. */
579 if (ds->sink != NULL)
580 casewriter_write (ds->sink,
581 case_map_execute (ds->compactor, case_ref (c)));
583 /* Execute temporary transformations. */
584 if (ds->temporary_trns_chain.n)
586 retval = trns_chain_execute (&ds->temporary_trns_chain,
587 ds->cases_written, &c);
588 if (retval != TRNS_CONTINUE)
596 /* "destroy" function for procedure casereader. */
598 proc_casereader_destroy (struct casereader *reader, void *ds_)
600 struct dataset *ds = ds_;
603 /* We are always the subreader for a casereader_buffer, so if we're being
604 destroyed then it's because the casereader_buffer has read all the cases
605 that it ever will. */
608 /* Make sure transformations happen for every input case, in
609 case they have side effects, and ensure that the replacement
610 active dataset gets all the cases it should. */
611 while ((c = casereader_read (reader)) != NULL)
614 ds->proc_state = PROC_CLOSED;
615 ds->ok = casereader_destroy (ds->source) && ds->ok;
617 dataset_set_source (ds, NULL);
620 /* Must return false if the source casereader, a transformation,
621 or the sink casewriter signaled an error. (If a temporary
622 transformation signals an error, then the return value is
623 false, but the replacement active dataset may still be
626 proc_commit (struct dataset *ds)
628 if (ds->shim != NULL)
629 casereader_shim_slurp (ds->shim);
631 assert (ds->proc_state == PROC_CLOSED);
632 ds->proc_state = PROC_COMMITTED;
634 dataset_changed__ (ds);
636 /* Free memory for lagged cases. */
637 while (!deque_is_empty (&ds->lag))
638 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
639 free (ds->lag_cases);
641 /* Dictionary from before TEMPORARY becomes permanent. */
642 proc_cancel_temporary_transformations (ds);
643 bool ok = proc_cancel_all_transformations (ds) && ds->ok;
645 if (!ds->discard_output)
647 /* Finish compacting. */
648 if (ds->compactor != NULL)
650 case_map_destroy (ds->compactor);
651 ds->compactor = NULL;
653 dict_delete_scratch_vars (ds->dict);
654 dict_compact_values (ds->dict);
657 /* Old data sink becomes new data source. */
658 if (ds->sink != NULL)
659 ds->source = casewriter_make_reader (ds->sink);
664 ds->discard_output = false;
668 caseinit_clear (ds->caseinit);
669 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
671 dict_clear_vectors (ds->dict);
672 ds->permanent_dict = NULL;
676 /* Casereader class for procedure execution. */
677 static const struct casereader_class proc_casereader_class =
679 proc_casereader_read,
680 proc_casereader_destroy,
685 /* Updates last_proc_invocation. */
687 update_last_proc_invocation (struct dataset *ds)
689 ds->last_proc_invocation = time (NULL);
692 /* Returns a pointer to the lagged case from N_BEFORE cases before the
693 current one, or NULL if there haven't been that many cases yet. */
695 lagged_case (const struct dataset *ds, int n_before)
697 assert (n_before >= 1);
698 assert (n_before <= ds->n_lag);
700 if (n_before <= deque_count (&ds->lag))
701 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
706 /* Adds TRNS to the current set of transformations. */
708 add_transformation (struct dataset *ds,
709 const struct trns_class *class, void *aux)
711 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
712 : ds->temporary ? &ds->temporary_trns_chain
713 : &ds->permanent_trns_chain);
714 struct transformation t = { .class = class, .aux = aux };
715 trns_chain_append (chain, &t);
716 dataset_transformations_changed__ (ds, true);
719 /* Returns true if the next call to add_transformation() will add
720 a temporary transformation, false if it will add a permanent
723 proc_in_temporary_transformations (const struct dataset *ds)
725 return ds->temporary;
728 /* Marks the start of temporary transformations.
729 Further calls to add_transformation() will add temporary
732 proc_start_temporary_transformations (struct dataset *ds)
734 assert (!ds->n_stack);
735 if (!proc_in_temporary_transformations (ds))
737 add_case_limit_trns (ds);
739 ds->permanent_dict = dict_clone (ds->dict);
740 add_measurement_level_trns (ds, ds->permanent_dict);
742 ds->temporary = true;
743 dataset_transformations_changed__ (ds, true);
747 /* Converts all the temporary transformations, if any, to permanent
748 transformations. Further transformations will be permanent.
750 The FILTER command is implemented as a temporary transformation, so a
751 procedure that uses this function should usually use proc_open_filtering()
752 with FILTER false, instead of plain proc_open().
754 Returns true if anything changed, false otherwise. */
756 proc_make_temporary_transformations_permanent (struct dataset *ds)
758 if (proc_in_temporary_transformations (ds))
760 cancel_measurement_level_trns (&ds->permanent_trns_chain);
761 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
763 ds->temporary = false;
765 dict_unref (ds->permanent_dict);
766 ds->permanent_dict = NULL;
774 /* Cancels all temporary transformations, if any. Further
775 transformations will be permanent.
776 Returns true if anything changed, false otherwise. */
778 proc_cancel_temporary_transformations (struct dataset *ds)
780 if (proc_in_temporary_transformations (ds))
782 trns_chain_clear (&ds->temporary_trns_chain);
784 dict_unref (ds->dict);
785 ds->dict = ds->permanent_dict;
786 ds->permanent_dict = NULL;
788 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
795 /* Cancels all transformations, if any.
796 Returns true if successful, false on I/O error. */
798 proc_cancel_all_transformations (struct dataset *ds)
801 assert (ds->proc_state == PROC_COMMITTED);
802 ok = trns_chain_clear (&ds->permanent_trns_chain);
803 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
804 ds->temporary = false;
805 for (size_t i = 0; i < ds->n_stack; i++)
806 ok = trns_chain_uninit (&ds->stack[i]) && ok;
808 dataset_transformations_changed__ (ds, false);
814 proc_push_transformations (struct dataset *ds)
816 if (ds->n_stack >= ds->allocated_stack)
817 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
819 trns_chain_init (&ds->stack[ds->n_stack++]);
823 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
825 assert (ds->n_stack > 0);
826 *chain = ds->stack[--ds->n_stack];
830 proc_has_transformations (const struct dataset *ds)
832 return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
835 static enum trns_result
836 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
838 struct variable *var = var_;
840 *cc = case_unshare (*cc);
841 *case_num_rw (*cc, var) = case_num;
843 return TRNS_CONTINUE;
846 /* Add a variable which we can sort by to get back the original order. */
848 add_permanent_ordering_transformation (struct dataset *ds)
850 struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
851 struct variable *order_var
852 = (proc_in_temporary_transformations (ds)
853 ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
856 static const struct trns_class trns_class = {
858 .execute = store_case_num
860 const struct transformation t = { .class = &trns_class, .aux = order_var };
861 trns_chain_append (&ds->permanent_trns_chain, &t);
866 /* Causes output from the next procedure to be discarded, instead
867 of being preserved for use as input for the next procedure. */
869 proc_discard_output (struct dataset *ds)
871 ds->discard_output = true;
875 /* Checks whether DS has a corrupted active dataset. If so,
876 discards it and returns false. If not, returns true without
879 dataset_end_of_command (struct dataset *ds)
881 if (ds->source != NULL)
883 if (casereader_error (ds->source))
890 const struct taint *taint = casereader_get_taint (ds->source);
891 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
892 assert (!taint_has_tainted_successor (taint));
898 /* Limits the maximum number of cases processed to
900 static enum trns_result
901 case_limit_trns_proc (void *cases_remaining_,
902 struct ccase **c UNUSED, casenumber case_nr UNUSED)
904 size_t *cases_remaining = cases_remaining_;
905 if (*cases_remaining > 0)
907 (*cases_remaining)--;
908 return TRNS_CONTINUE;
911 return TRNS_DROP_CASE;
914 /* Frees the data associated with a case limit transformation. */
916 case_limit_trns_free (void *cases_remaining_)
918 size_t *cases_remaining = cases_remaining_;
919 free (cases_remaining);
923 /* Adds a transformation that limits the number of cases that may
924 pass through, if DS->DICT has a case limit. */
926 add_case_limit_trns (struct dataset *ds)
928 casenumber case_limit = dict_get_case_limit (ds->dict);
931 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
932 *cases_remaining = case_limit;
934 static const struct trns_class trns_class = {
935 .name = "case limit",
936 .execute = case_limit_trns_proc,
937 .destroy = case_limit_trns_free,
939 add_transformation (ds, &trns_class, cases_remaining);
941 dict_set_case_limit (ds->dict, 0);
946 /* FILTER transformation. */
947 static enum trns_result
948 filter_trns_proc (void *filter_var_,
949 struct ccase **c, casenumber case_nr UNUSED)
952 struct variable *filter_var = filter_var_;
953 double f = case_num (*c, filter_var);
954 return (f != 0.0 && !var_is_num_missing (filter_var, f)
955 ? TRNS_CONTINUE : TRNS_DROP_CASE);
958 /* Adds a temporary transformation to filter data according to
959 the variable specified on FILTER, if any. */
961 add_filter_trns (struct dataset *ds)
963 struct variable *filter_var = dict_get_filter (ds->dict);
964 if (filter_var != NULL)
966 proc_start_temporary_transformations (ds);
968 static const struct trns_class trns_class = {
970 .execute = filter_trns_proc,
972 add_transformation (ds, &trns_class, filter_var);
977 dataset_need_lag (struct dataset *ds, int n_before)
979 ds->n_lag = MAX (ds->n_lag, n_before);
982 /* Measurement guesser, for guessing a measurement level from formats and
987 struct hmap_node hmap_node;
993 struct variable *var;
998 mg_var_uninit (struct mg_var *mgv)
1000 struct mg_value *mgvalue, *next;
1001 HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1004 hmap_delete (mgv->values, &mgvalue->hmap_node);
1007 hmap_destroy (mgv->values);
1012 mg_var_interpret (const struct mg_var *mgv)
1014 size_t n = hmap_count (mgv->values);
1017 /* All missing (or no data). */
1018 return MEASURE_NOMINAL;
1021 const struct mg_value *mgvalue;
1022 HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1024 if (mgvalue->value < 10)
1025 return MEASURE_NOMINAL;
1026 return MEASURE_SCALE;
1030 mg_var_add_value (struct mg_var *mgv, double value)
1032 if (var_is_num_missing (mgv->var, value))
1033 return MEASURE_UNKNOWN;
1034 else if (value < 0 || value != floor (value))
1035 return MEASURE_SCALE;
1037 size_t hash = hash_double (value, 0);
1038 struct mg_value *mgvalue;
1039 HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1041 if (mgvalue->value == value)
1042 return MEASURE_UNKNOWN;
1044 mgvalue = xmalloc (sizeof *mgvalue);
1045 mgvalue->value = value;
1046 hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1047 if (hmap_count (mgv->values) >= settings_get_scalemin ())
1048 return MEASURE_SCALE;
1050 return MEASURE_UNKNOWN;
1053 struct measure_guesser
1055 struct mg_var *vars;
1059 static struct measure_guesser *
1060 measure_guesser_create__ (struct dictionary *dict)
1062 struct mg_var *mgvs = NULL;
1064 size_t allocated_mgvs = 0;
1066 for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1068 struct variable *var = dict_get_var (dict, i);
1069 if (var_get_measure (var) != MEASURE_UNKNOWN)
1072 struct fmt_spec f = var_get_print_format (var);
1073 enum measure m = var_default_measure_for_format (f.type);
1074 if (m != MEASURE_UNKNOWN)
1076 var_set_measure (var, m);
1080 if (n_mgvs >= allocated_mgvs)
1081 mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1083 struct mg_var *mgv = &mgvs[n_mgvs++];
1084 *mgv = (struct mg_var) {
1086 .values = xmalloc (sizeof *mgv->values),
1088 hmap_init (mgv->values);
1093 struct measure_guesser *mg = xmalloc (sizeof *mg);
1094 *mg = (struct measure_guesser) {
1101 /* Scans through DS's dictionary for variables that have an unknown measurement
1102 level. For those, if the measurement level can be guessed based on the
1103 variable's type and format, sets a default. If that's enough, returns NULL.
1104 If any remain whose levels are unknown and can't be guessed that way,
1105 creates and returns a structure that the caller should pass to
1106 measure_guesser_add_case() or measure_guesser_run() for guessing a
1107 measurement level based on the data. */
1108 struct measure_guesser *
1109 measure_guesser_create (struct dataset *ds)
1111 return measure_guesser_create__ (dataset_dict (ds));
1114 /* Adds data from case C to MG. */
1116 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1118 for (size_t i = 0; i < mg->n_vars; )
1120 struct mg_var *mgv = &mg->vars[i];
1121 double value = case_num (c, mgv->var);
1122 enum measure m = mg_var_add_value (mgv, value);
1123 if (m != MEASURE_UNKNOWN)
1125 var_set_measure (mgv->var, m);
1127 mg_var_uninit (mgv);
1128 *mgv = mg->vars[--mg->n_vars];
1137 measure_guesser_destroy (struct measure_guesser *mg)
1142 for (size_t i = 0; i < mg->n_vars; i++)
1144 struct mg_var *mgv = &mg->vars[i];
1145 var_set_measure (mgv->var, mg_var_interpret (mgv));
1146 mg_var_uninit (mgv);
1152 /* Adds final measurement levels based on MG, after all the cases have been
1155 measure_guesser_commit (struct measure_guesser *mg)
1157 for (size_t i = 0; i < mg->n_vars; i++)
1159 struct mg_var *mgv = &mg->vars[i];
1160 var_set_measure (mgv->var, mg_var_interpret (mgv));
1164 /* Passes the cases in READER through MG and uses the data in the cases to set
1165 measurement levels for the variables where they were still unknown. */
1167 measure_guesser_run (struct measure_guesser *mg,
1168 const struct casereader *reader)
1170 struct casereader *r = casereader_clone (reader);
1171 while (mg->n_vars > 0)
1173 struct ccase *c = casereader_read (r);
1176 measure_guesser_add_case (mg, c);
1179 casereader_destroy (r);
1181 measure_guesser_commit (mg);
1184 /* A transformation for guessing measurement levels. */
1186 static enum trns_result
1187 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1189 struct measure_guesser *mg = mg_;
1190 measure_guesser_add_case (mg, *c);
1191 return TRNS_CONTINUE;
1195 mg_trns_free (void *mg_)
1197 struct measure_guesser *mg = mg_;
1198 measure_guesser_commit (mg);
1199 measure_guesser_destroy (mg);
1203 static const struct trns_class mg_trns_class = {
1204 .name = "add measurement level",
1205 .execute = mg_trns_proc,
1206 .destroy = mg_trns_free,
1210 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1212 struct measure_guesser *mg = measure_guesser_create__ (dict);
1214 add_transformation (ds, &mg_trns_class, mg);
1218 cancel_measurement_level_trns (struct trns_chain *chain)
1223 struct transformation *trns = &chain->xforms[chain->n - 1];
1224 if (trns->class != &mg_trns_class)
1227 struct measure_guesser *mg = trns->aux;
1228 measure_guesser_destroy (mg);
1233 dataset_changed__ (struct dataset *ds)
1235 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1236 ds->callbacks->changed (ds->cb_data);
1240 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1242 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1243 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1246 /* Private interface for use by session code. */
1249 dataset_set_session__ (struct dataset *ds, struct session *session)
1251 ds->session = session;