1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
50 /* A dataset is usually part of a session. Within a session its name must
51 unique. The name must either be a valid PSPP identifier or the empty
52 string. (It must be unique within the session even if it is the empty
53 string; that is, there may only be a single dataset within a session with
54 the empty string as its name.) */
55 struct session *session;
57 enum dataset_display display;
59 /* Cases are read from source,
60 their transformation variables are initialized,
61 pass through permanent_trns_chain (which transforms them into
62 the format described by permanent_dict),
64 pass through temporary_trns_chain (which transforms them into
65 the format described by dict),
66 and are finally passed to the procedure. */
67 struct casereader *source;
68 struct caseinit *caseinit;
69 struct trns_chain permanent_trns_chain;
70 struct dictionary *permanent_dict;
71 struct variable *order_var;
72 struct casewriter *sink;
73 struct trns_chain temporary_trns_chain;
75 struct dictionary *dict;
77 /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
78 struct trns_chain *stack;
80 size_t allocated_stack;
82 /* If true, cases are discarded instead of being written to
86 /* The case map used to compact a case, if necessary;
87 otherwise a null pointer. */
88 struct case_map *compactor;
90 /* Time at which proc was last invoked. */
91 time_t last_proc_invocation;
93 /* Cases just before ("lagging") the current one. */
94 int n_lag; /* Number of cases to lag. */
95 struct deque lag; /* Deque of lagged cases. */
96 struct ccase **lag_cases; /* Lagged cases managed by deque. */
101 PROC_COMMITTED, /* No procedure in progress. */
102 PROC_OPEN, /* proc_open called, casereader still open. */
103 PROC_CLOSED /* casereader from proc_open destroyed,
104 but proc_commit not yet called. */
107 casenumber cases_written; /* Cases output so far. */
108 bool ok; /* Error status. */
109 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
111 const struct dataset_callbacks *callbacks;
114 /* Uniquely distinguishes datasets. */
118 static void dataset_changed__ (struct dataset *);
119 static void dataset_transformations_changed__ (struct dataset *,
122 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
123 static void cancel_measurement_level_trns (struct trns_chain *);
124 static void add_case_limit_trns (struct dataset *ds);
125 static void add_filter_trns (struct dataset *ds);
127 static void update_last_proc_invocation (struct dataset *ds);
130 dict_callback (struct dictionary *d UNUSED, void *ds_)
132 struct dataset *ds = ds_;
133 dataset_changed__ (ds);
137 dataset_create_finish__ (struct dataset *ds, struct session *session)
139 static unsigned int seqno;
141 dict_set_change_callback (ds->dict, dict_callback, ds);
142 proc_cancel_all_transformations (ds);
143 dataset_set_session (ds, session);
147 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
148 SESSION already contains a dataset named NAME, it is deleted and replaced.
149 The dataset initially has an empty dictionary and no data source. */
151 dataset_create (struct session *session, const char *name)
153 struct dataset *ds = XMALLOC (struct dataset);
154 *ds = (struct dataset) {
155 .name = xstrdup (name),
156 .display = DATASET_FRONT,
157 .dict = dict_create (get_default_encoding ()),
158 .caseinit = caseinit_create (),
160 dataset_create_finish__ (ds, session);
165 /* Creates and returns a new dataset that has the same data and dictionary as
166 OLD named NAME, adds it to the same session as OLD, and returns the new
167 dataset. If SESSION already contains a dataset named NAME, it is deleted
170 OLD must not have any active transformations or temporary state and must
171 not be in the middle of a procedure.
173 Callbacks are not cloned. */
175 dataset_clone (struct dataset *old, const char *name)
179 assert (old->proc_state == PROC_COMMITTED);
180 assert (!old->permanent_trns_chain.n);
181 assert (old->permanent_dict == NULL);
182 assert (old->sink == NULL);
183 assert (!old->temporary);
184 assert (!old->temporary_trns_chain.n);
185 assert (!old->n_stack);
187 new = xzalloc (sizeof *new);
188 new->name = xstrdup (name);
189 new->display = DATASET_FRONT;
190 new->source = casereader_clone (old->source);
191 new->dict = dict_clone (old->dict);
192 new->caseinit = caseinit_clone (old->caseinit);
193 new->last_proc_invocation = old->last_proc_invocation;
196 dataset_create_finish__ (new, old->session);
203 dataset_destroy (struct dataset *ds)
207 dataset_set_session (ds, NULL);
209 dict_unref (ds->dict);
210 dict_unref (ds->permanent_dict);
211 caseinit_destroy (ds->caseinit);
212 trns_chain_uninit (&ds->permanent_trns_chain);
213 for (size_t i = 0; i < ds->n_stack; i++)
214 trns_chain_uninit (&ds->stack[i]);
216 dataset_transformations_changed__ (ds, false);
222 /* Discards the active dataset's dictionary, data, and transformations. */
224 dataset_clear (struct dataset *ds)
226 assert (ds->proc_state == PROC_COMMITTED);
228 dict_clear (ds->dict);
229 fh_set_default_handle (NULL);
233 casereader_destroy (ds->source);
236 proc_cancel_all_transformations (ds);
240 dataset_name (const struct dataset *ds)
246 dataset_set_name (struct dataset *ds, const char *name)
248 struct session *session = ds->session;
253 active = session_active_dataset (session) == ds;
255 session_set_active_dataset (session, NULL);
256 dataset_set_session (ds, NULL);
260 ds->name = xstrdup (name);
264 dataset_set_session (ds, session);
266 session_set_active_dataset (session, ds);
271 dataset_session (const struct dataset *ds)
277 dataset_set_session (struct dataset *ds, struct session *session)
279 if (session != ds->session)
281 if (ds->session != NULL)
282 session_remove_dataset (ds->session, ds);
284 session_add_dataset (session, ds);
288 /* Returns the dictionary within DS. This is always nonnull, although it
289 might not contain any variables. */
291 dataset_dict (const struct dataset *ds)
296 /* Replaces DS's dictionary by DICT, discarding any source and
299 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
301 assert (ds->proc_state == PROC_COMMITTED);
302 assert (ds->dict != dict);
306 dict_unref (ds->dict);
308 dict_set_change_callback (ds->dict, dict_callback, ds);
311 /* Returns the casereader that will be read when a procedure is executed on
312 DS. This can be NULL if none has been set up yet. */
313 const struct casereader *
314 dataset_source (const struct dataset *ds)
319 /* Returns true if DS has a data source, false otherwise. */
321 dataset_has_source (const struct dataset *ds)
323 return dataset_source (ds) != NULL;
326 /* Replaces the active dataset's data by READER. READER's cases must have an
327 appropriate format for DS's dictionary. */
329 dataset_set_source (struct dataset *ds, struct casereader *reader)
331 casereader_destroy (ds->source);
334 caseinit_clear (ds->caseinit);
335 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
337 return reader == NULL || !casereader_error (reader);
340 /* Returns the data source from DS and removes it from DS. Returns a null
341 pointer if DS has no data source. */
343 dataset_steal_source (struct dataset *ds)
345 struct casereader *reader = ds->source;
352 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
354 assert (!proc_in_temporary_transformations (ds));
355 assert (!proc_has_transformations (ds));
356 assert (n < dict_get_n_vars (ds->dict));
358 caseinit_mark_for_init (ds->caseinit, ds->dict);
359 ds->source = caseinit_translate_casereader_to_init_vars (
360 ds->caseinit, dict_get_proto (ds->dict), ds->source);
361 caseinit_clear (ds->caseinit);
362 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
364 struct case_map_stage *stage = case_map_stage_create (ds->dict);
365 dict_delete_vars (ds->dict, vars, n);
366 ds->source = case_map_create_input_translator (
367 case_map_stage_get_case_map (stage), ds->source);
368 case_map_stage_destroy (stage);
369 caseinit_clear (ds->caseinit);
370 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
374 dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
376 assert (!proc_in_temporary_transformations (ds));
377 assert (!proc_has_transformations (ds));
378 assert (n <= dict_get_n_vars (ds->dict));
380 caseinit_mark_for_init (ds->caseinit, ds->dict);
381 ds->source = caseinit_translate_casereader_to_init_vars (
382 ds->caseinit, dict_get_proto (ds->dict), ds->source);
383 caseinit_clear (ds->caseinit);
384 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
386 struct case_map_stage *stage = case_map_stage_create (ds->dict);
387 dict_reorder_vars (ds->dict, vars, n);
388 ds->source = case_map_create_input_translator (
389 case_map_stage_get_case_map (stage), ds->source);
390 case_map_stage_destroy (stage);
391 caseinit_clear (ds->caseinit);
392 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
395 /* Returns a number unique to DS. It can be used to distinguish one dataset
396 from any other within a given program run, even datasets that do not exist
399 dataset_seqno (const struct dataset *ds)
405 dataset_set_callbacks (struct dataset *ds,
406 const struct dataset_callbacks *callbacks,
409 ds->callbacks = callbacks;
410 ds->cb_data = cb_data;
414 dataset_get_display (const struct dataset *ds)
420 dataset_set_display (struct dataset *ds, enum dataset_display display)
422 ds->display = display;
425 /* Returns the last time the data was read. */
427 time_of_last_procedure (struct dataset *ds)
431 if (ds->last_proc_invocation == 0)
432 update_last_proc_invocation (ds);
433 return ds->last_proc_invocation;
436 /* Regular procedure. */
438 /* Executes any pending transformations, if necessary.
439 This is not identical to the EXECUTE command in that it won't
440 always read the source data. This can be important when the
441 source data is given inline within BEGIN DATA...END FILE. */
443 proc_execute (struct dataset *ds)
447 if ((!ds->temporary || !ds->temporary_trns_chain.n)
448 && !ds->permanent_trns_chain.n)
451 ds->discard_output = false;
452 dict_set_case_limit (ds->dict, 0);
453 dict_clear_vectors (ds->dict);
457 ok = casereader_destroy (proc_open (ds));
458 return proc_commit (ds) && ok;
461 static const struct casereader_class proc_casereader_class;
463 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
464 cases filtered out with FILTER BY will not be included in the casereader
465 (which is usually desirable). If FILTER is false, all cases will be
466 included regardless of FILTER BY settings.
468 proc_commit must be called when done. */
470 proc_open_filtering (struct dataset *ds, bool filter)
472 struct casereader *reader;
474 assert (ds->n_stack == 0);
475 assert (ds->source != NULL);
476 assert (ds->proc_state == PROC_COMMITTED);
478 update_last_proc_invocation (ds);
480 caseinit_mark_for_init (ds->caseinit, ds->dict);
481 ds->source = caseinit_translate_casereader_to_init_vars (
482 ds->caseinit, dict_get_proto (ds->dict), ds->source);
484 /* Finish up the collection of transformations. */
485 add_case_limit_trns (ds);
487 add_filter_trns (ds);
488 if (!proc_in_temporary_transformations (ds))
489 add_measurement_level_trns (ds, ds->dict);
491 /* Make permanent_dict refer to the dictionary right before
492 data reaches the sink. */
493 if (ds->permanent_dict == NULL)
494 ds->permanent_dict = ds->dict;
497 if (!ds->discard_output)
499 struct dictionary *pd = dict_clone (ds->permanent_dict);
500 struct case_map_stage *stage = case_map_stage_create (pd);
501 dict_delete_scratch_vars (pd);
502 ds->compactor = case_map_stage_get_case_map (stage);
503 case_map_stage_destroy (stage);
504 ds->sink = autopaging_writer_create (dict_get_proto (pd));
509 ds->compactor = NULL;
513 /* Allocate memory for lagged cases. */
514 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
516 ds->proc_state = PROC_OPEN;
517 ds->cases_written = 0;
520 /* FIXME: use taint in dataset in place of `ok'? */
521 /* FIXME: for trivial cases we can just return a clone of
524 /* Create casereader and insert a shim on top. The shim allows us to
525 arbitrarily extend the casereader's lifetime, by slurping the cases into
526 the shim's buffer in proc_commit(). That is especially useful when output
527 table_items are generated directly from the procedure casereader (e.g. by
528 the LIST procedure) when we are using an output driver that keeps a
529 reference to the output items passed to it (e.g. the GUI output driver in
531 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
533 &proc_casereader_class, ds);
534 ds->shim = casereader_shim_insert (reader);
538 /* Opens dataset DS for reading cases with proc_read.
539 proc_commit must be called when done. */
541 proc_open (struct dataset *ds)
543 return proc_open_filtering (ds, true);
546 /* Returns true if a procedure is in progress, that is, if
547 proc_open has been called but proc_commit has not. */
549 proc_is_open (const struct dataset *ds)
551 return ds->proc_state != PROC_COMMITTED;
554 /* "read" function for procedure casereader. */
555 static struct ccase *
556 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
558 struct dataset *ds = ds_;
559 enum trns_result retval = TRNS_DROP_CASE;
562 assert (ds->proc_state == PROC_OPEN);
563 for (; ; case_unref (c))
565 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
566 if (retval == TRNS_ERROR)
571 /* Read a case from source. */
572 c = casereader_read (ds->source);
575 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
576 caseinit_restore_left_vars (ds->caseinit, c);
578 /* Execute permanent transformations. */
579 casenumber case_nr = ds->cases_written + 1;
580 retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
581 caseinit_save_left_vars (ds->caseinit, c);
582 if (retval != TRNS_CONTINUE)
585 /* Write case to collection of lagged cases. */
588 while (deque_count (&ds->lag) >= ds->n_lag)
589 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
590 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
593 /* Write case to replacement dataset. */
595 if (ds->sink != NULL)
598 *case_num_rw (c, ds->order_var) = case_nr;
599 casewriter_write (ds->sink,
600 case_map_execute (ds->compactor, case_ref (c)));
603 /* Execute temporary transformations. */
604 if (ds->temporary_trns_chain.n)
606 retval = trns_chain_execute (&ds->temporary_trns_chain,
607 ds->cases_written, &c);
608 if (retval != TRNS_CONTINUE)
616 /* "destroy" function for procedure casereader. */
618 proc_casereader_destroy (struct casereader *reader, void *ds_)
620 struct dataset *ds = ds_;
623 /* We are always the subreader for a casereader_buffer, so if we're being
624 destroyed then it's because the casereader_buffer has read all the cases
625 that it ever will. */
628 /* Make sure transformations happen for every input case, in
629 case they have side effects, and ensure that the replacement
630 active dataset gets all the cases it should. */
631 while ((c = casereader_read (reader)) != NULL)
634 ds->proc_state = PROC_CLOSED;
635 ds->ok = casereader_destroy (ds->source) && ds->ok;
637 dataset_set_source (ds, NULL);
640 /* Must return false if the source casereader, a transformation,
641 or the sink casewriter signaled an error. (If a temporary
642 transformation signals an error, then the return value is
643 false, but the replacement active dataset may still be
646 proc_commit (struct dataset *ds)
648 if (ds->shim != NULL)
649 casereader_shim_slurp (ds->shim);
651 assert (ds->proc_state == PROC_CLOSED);
652 ds->proc_state = PROC_COMMITTED;
654 dataset_changed__ (ds);
656 /* Free memory for lagged cases. */
657 while (!deque_is_empty (&ds->lag))
658 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
659 free (ds->lag_cases);
661 /* Dictionary from before TEMPORARY becomes permanent. */
662 proc_cancel_temporary_transformations (ds);
663 bool ok = proc_cancel_all_transformations (ds) && ds->ok;
665 if (!ds->discard_output)
667 /* Finish compacting. */
668 if (ds->compactor != NULL)
670 case_map_destroy (ds->compactor);
671 ds->compactor = NULL;
673 dict_delete_scratch_vars (ds->dict);
676 /* Old data sink becomes new data source. */
677 if (ds->sink != NULL)
678 ds->source = casewriter_make_reader (ds->sink);
683 ds->discard_output = false;
687 caseinit_clear (ds->caseinit);
688 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
690 dict_clear_vectors (ds->dict);
691 ds->permanent_dict = NULL;
692 ds->order_var = NULL;
696 /* Casereader class for procedure execution. */
697 static const struct casereader_class proc_casereader_class =
699 proc_casereader_read,
700 proc_casereader_destroy,
705 /* Updates last_proc_invocation. */
707 update_last_proc_invocation (struct dataset *ds)
709 ds->last_proc_invocation = time (NULL);
712 /* Returns a pointer to the lagged case from N_BEFORE cases before the
713 current one, or NULL if there haven't been that many cases yet. */
715 lagged_case (const struct dataset *ds, int n_before)
717 assert (n_before >= 1);
718 assert (n_before <= ds->n_lag);
720 if (n_before <= deque_count (&ds->lag))
721 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
726 /* Adds TRNS to the current set of transformations. */
728 add_transformation (struct dataset *ds,
729 const struct trns_class *class, void *aux)
731 struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
732 : ds->temporary ? &ds->temporary_trns_chain
733 : &ds->permanent_trns_chain);
734 struct transformation t = { .class = class, .aux = aux };
735 trns_chain_append (chain, &t);
736 dataset_transformations_changed__ (ds, true);
739 /* Returns true if the next call to add_transformation() will add
740 a temporary transformation, false if it will add a permanent
743 proc_in_temporary_transformations (const struct dataset *ds)
745 return ds->temporary;
748 /* Marks the start of temporary transformations.
749 Further calls to add_transformation() will add temporary
752 proc_start_temporary_transformations (struct dataset *ds)
754 assert (!ds->n_stack);
755 if (!proc_in_temporary_transformations (ds))
757 add_case_limit_trns (ds);
759 ds->permanent_dict = dict_clone (ds->dict);
760 add_measurement_level_trns (ds, ds->permanent_dict);
762 ds->temporary = true;
763 dataset_transformations_changed__ (ds, true);
767 /* Converts all the temporary transformations, if any, to permanent
768 transformations. Further transformations will be permanent.
770 The FILTER command is implemented as a temporary transformation, so a
771 procedure that uses this function should usually use proc_open_filtering()
772 with FILTER false, instead of plain proc_open().
774 Returns true if anything changed, false otherwise. */
776 proc_make_temporary_transformations_permanent (struct dataset *ds)
778 if (proc_in_temporary_transformations (ds))
780 cancel_measurement_level_trns (&ds->permanent_trns_chain);
781 trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
783 ds->temporary = false;
785 dict_unref (ds->permanent_dict);
786 ds->permanent_dict = NULL;
794 /* Cancels all temporary transformations, if any. Further
795 transformations will be permanent.
796 Returns true if anything changed, false otherwise. */
798 proc_cancel_temporary_transformations (struct dataset *ds)
800 if (proc_in_temporary_transformations (ds))
802 trns_chain_clear (&ds->temporary_trns_chain);
804 dict_unref (ds->dict);
805 ds->dict = ds->permanent_dict;
806 ds->permanent_dict = NULL;
808 dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
815 /* Cancels all transformations, if any.
816 Returns true if successful, false on I/O error. */
818 proc_cancel_all_transformations (struct dataset *ds)
821 assert (ds->proc_state == PROC_COMMITTED);
822 ok = trns_chain_clear (&ds->permanent_trns_chain);
823 ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
824 ds->temporary = false;
825 for (size_t i = 0; i < ds->n_stack; i++)
826 ok = trns_chain_uninit (&ds->stack[i]) && ok;
828 dataset_transformations_changed__ (ds, false);
834 proc_push_transformations (struct dataset *ds)
836 if (ds->n_stack >= ds->allocated_stack)
837 ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
839 trns_chain_init (&ds->stack[ds->n_stack++]);
843 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
845 assert (ds->n_stack > 0);
846 *chain = ds->stack[--ds->n_stack];
850 proc_has_transformations (const struct dataset *ds)
852 return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
855 static enum trns_result
856 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
858 struct variable *var = var_;
860 *cc = case_unshare (*cc);
861 *case_num_rw (*cc, var) = case_num;
863 return TRNS_CONTINUE;
866 /* Add a variable $ORDERING which we can sort by to get back the original order. */
868 add_permanent_ordering_transformation (struct dataset *ds)
870 struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
871 struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
872 ds->order_var = order_var;
874 if (ds->permanent_dict)
876 order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
877 static const struct trns_class trns_class = {
879 .execute = store_case_num
881 const struct transformation t = { .class = &trns_class, .aux = order_var };
882 trns_chain_prepend (&ds->temporary_trns_chain, &t);
888 /* Causes output from the next procedure to be discarded, instead
889 of being preserved for use as input for the next procedure. */
891 proc_discard_output (struct dataset *ds)
893 ds->discard_output = true;
897 /* Checks whether DS has a corrupted active dataset. If so,
898 discards it and returns false. If not, returns true without
901 dataset_end_of_command (struct dataset *ds)
903 if (ds->source != NULL)
905 if (casereader_error (ds->source))
912 const struct taint *taint = casereader_get_taint (ds->source);
913 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
914 assert (!taint_has_tainted_successor (taint));
920 /* Limits the maximum number of cases processed to
922 static enum trns_result
923 case_limit_trns_proc (void *cases_remaining_,
924 struct ccase **c UNUSED, casenumber case_nr UNUSED)
926 size_t *cases_remaining = cases_remaining_;
927 if (*cases_remaining > 0)
929 (*cases_remaining)--;
930 return TRNS_CONTINUE;
933 return TRNS_DROP_CASE;
936 /* Frees the data associated with a case limit transformation. */
938 case_limit_trns_free (void *cases_remaining_)
940 size_t *cases_remaining = cases_remaining_;
941 free (cases_remaining);
945 /* Adds a transformation that limits the number of cases that may
946 pass through, if DS->DICT has a case limit. */
948 add_case_limit_trns (struct dataset *ds)
950 casenumber case_limit = dict_get_case_limit (ds->dict);
953 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
954 *cases_remaining = case_limit;
956 static const struct trns_class trns_class = {
957 .name = "case limit",
958 .execute = case_limit_trns_proc,
959 .destroy = case_limit_trns_free,
961 add_transformation (ds, &trns_class, cases_remaining);
963 dict_set_case_limit (ds->dict, 0);
968 /* FILTER transformation. */
969 static enum trns_result
970 filter_trns_proc (void *filter_var_,
971 struct ccase **c, casenumber case_nr UNUSED)
974 struct variable *filter_var = filter_var_;
975 double f = case_num (*c, filter_var);
976 return (f != 0.0 && !var_is_num_missing (filter_var, f)
977 ? TRNS_CONTINUE : TRNS_DROP_CASE);
980 /* Adds a temporary transformation to filter data according to
981 the variable specified on FILTER, if any. */
983 add_filter_trns (struct dataset *ds)
985 struct variable *filter_var = dict_get_filter (ds->dict);
986 if (filter_var != NULL)
988 proc_start_temporary_transformations (ds);
990 static const struct trns_class trns_class = {
992 .execute = filter_trns_proc,
994 add_transformation (ds, &trns_class, filter_var);
999 dataset_need_lag (struct dataset *ds, int n_before)
1001 ds->n_lag = MAX (ds->n_lag, n_before);
1004 /* Measurement guesser, for guessing a measurement level from formats and
1009 struct hmap_node hmap_node;
1015 struct variable *var;
1016 struct hmap *values;
1020 mg_var_uninit (struct mg_var *mgv)
1022 struct mg_value *mgvalue, *next;
1023 HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1026 hmap_delete (mgv->values, &mgvalue->hmap_node);
1029 hmap_destroy (mgv->values);
1034 mg_var_interpret (const struct mg_var *mgv)
1036 size_t n = hmap_count (mgv->values);
1039 /* All missing (or no data). */
1040 return MEASURE_NOMINAL;
1043 const struct mg_value *mgvalue;
1044 HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1046 if (mgvalue->value < 10)
1047 return MEASURE_NOMINAL;
1048 return MEASURE_SCALE;
1052 mg_var_add_value (struct mg_var *mgv, double value)
1054 if (var_is_num_missing (mgv->var, value))
1055 return MEASURE_UNKNOWN;
1056 else if (value < 0 || value != floor (value))
1057 return MEASURE_SCALE;
1059 size_t hash = hash_double (value, 0);
1060 struct mg_value *mgvalue;
1061 HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1063 if (mgvalue->value == value)
1064 return MEASURE_UNKNOWN;
1066 mgvalue = xmalloc (sizeof *mgvalue);
1067 mgvalue->value = value;
1068 hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1069 if (hmap_count (mgv->values) >= settings_get_scalemin ())
1070 return MEASURE_SCALE;
1072 return MEASURE_UNKNOWN;
1075 struct measure_guesser
1077 struct mg_var *vars;
1081 static struct measure_guesser *
1082 measure_guesser_create__ (struct dictionary *dict)
1084 struct mg_var *mgvs = NULL;
1086 size_t allocated_mgvs = 0;
1088 for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1090 struct variable *var = dict_get_var (dict, i);
1091 if (var_get_measure (var) != MEASURE_UNKNOWN)
1094 struct fmt_spec f = var_get_print_format (var);
1095 enum measure m = var_default_measure_for_format (f.type);
1096 if (m != MEASURE_UNKNOWN)
1098 var_set_measure (var, m);
1102 if (n_mgvs >= allocated_mgvs)
1103 mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1105 struct mg_var *mgv = &mgvs[n_mgvs++];
1106 *mgv = (struct mg_var) {
1108 .values = xmalloc (sizeof *mgv->values),
1110 hmap_init (mgv->values);
1115 struct measure_guesser *mg = xmalloc (sizeof *mg);
1116 *mg = (struct measure_guesser) {
1123 /* Scans through DS's dictionary for variables that have an unknown measurement
1124 level. For those, if the measurement level can be guessed based on the
1125 variable's type and format, sets a default. If that's enough, returns NULL.
1126 If any remain whose levels are unknown and can't be guessed that way,
1127 creates and returns a structure that the caller should pass to
1128 measure_guesser_add_case() or measure_guesser_run() for guessing a
1129 measurement level based on the data. */
1130 struct measure_guesser *
1131 measure_guesser_create (struct dataset *ds)
1133 return measure_guesser_create__ (dataset_dict (ds));
1136 /* Adds data from case C to MG. */
1138 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1140 for (size_t i = 0; i < mg->n_vars; )
1142 struct mg_var *mgv = &mg->vars[i];
1143 double value = case_num (c, mgv->var);
1144 enum measure m = mg_var_add_value (mgv, value);
1145 if (m != MEASURE_UNKNOWN)
1147 var_set_measure (mgv->var, m);
1149 mg_var_uninit (mgv);
1150 *mgv = mg->vars[--mg->n_vars];
1159 measure_guesser_destroy (struct measure_guesser *mg)
1164 for (size_t i = 0; i < mg->n_vars; i++)
1166 struct mg_var *mgv = &mg->vars[i];
1167 var_set_measure (mgv->var, mg_var_interpret (mgv));
1168 mg_var_uninit (mgv);
1174 /* Adds final measurement levels based on MG, after all the cases have been
1177 measure_guesser_commit (struct measure_guesser *mg)
1179 for (size_t i = 0; i < mg->n_vars; i++)
1181 struct mg_var *mgv = &mg->vars[i];
1182 var_set_measure (mgv->var, mg_var_interpret (mgv));
1186 /* Passes the cases in READER through MG and uses the data in the cases to set
1187 measurement levels for the variables where they were still unknown. */
1189 measure_guesser_run (struct measure_guesser *mg,
1190 const struct casereader *reader)
1192 struct casereader *r = casereader_clone (reader);
1193 while (mg->n_vars > 0)
1195 struct ccase *c = casereader_read (r);
1198 measure_guesser_add_case (mg, c);
1201 casereader_destroy (r);
1203 measure_guesser_commit (mg);
1206 /* A transformation for guessing measurement levels. */
1208 static enum trns_result
1209 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1211 struct measure_guesser *mg = mg_;
1212 measure_guesser_add_case (mg, *c);
1213 return TRNS_CONTINUE;
1217 mg_trns_free (void *mg_)
1219 struct measure_guesser *mg = mg_;
1220 measure_guesser_commit (mg);
1221 measure_guesser_destroy (mg);
1225 static const struct trns_class mg_trns_class = {
1226 .name = "add measurement level",
1227 .execute = mg_trns_proc,
1228 .destroy = mg_trns_free,
1232 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1234 struct measure_guesser *mg = measure_guesser_create__ (dict);
1236 add_transformation (ds, &mg_trns_class, mg);
1240 cancel_measurement_level_trns (struct trns_chain *chain)
1245 struct transformation *trns = &chain->xforms[chain->n - 1];
1246 if (trns->class != &mg_trns_class)
1249 struct measure_guesser *mg = trns->aux;
1250 measure_guesser_destroy (mg);
1255 dataset_changed__ (struct dataset *ds)
1257 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1258 ds->callbacks->changed (ds->cb_data);
1262 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1264 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1265 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1268 /* Private interface for use by session code. */
1271 dataset_set_session__ (struct dataset *ds, struct session *session)
1273 ds->session = session;