dataset: Before deleting variables, make sure new values are added.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 void
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   assert (!proc_in_temporary_transformations (ds));
354   assert (!proc_has_transformations (ds));
355   assert (n < dict_get_n_vars (ds->dict));
356
357   caseinit_mark_for_init (ds->caseinit, ds->dict);
358   ds->source = caseinit_translate_casereader_to_init_vars (
359     ds->caseinit, dict_get_proto (ds->dict), ds->source);
360   caseinit_clear (ds->caseinit);
361   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
362
363   dict_delete_vars (ds->dict, vars, n);
364   ds->source = case_map_create_input_translator (
365     case_map_to_compact_dict (ds->dict, 0), ds->source);
366   dict_compact_values (ds->dict);
367   caseinit_clear (ds->caseinit);
368   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
369 }
370
371 /* Returns a number unique to DS.  It can be used to distinguish one dataset
372    from any other within a given program run, even datasets that do not exist
373    at the same time. */
374 unsigned int
375 dataset_seqno (const struct dataset *ds)
376 {
377   return ds->seqno;
378 }
379
380 void
381 dataset_set_callbacks (struct dataset *ds,
382                        const struct dataset_callbacks *callbacks,
383                        void *cb_data)
384 {
385   ds->callbacks = callbacks;
386   ds->cb_data = cb_data;
387 }
388
389 enum dataset_display
390 dataset_get_display (const struct dataset *ds)
391 {
392   return ds->display;
393 }
394
395 void
396 dataset_set_display (struct dataset *ds, enum dataset_display display)
397 {
398   ds->display = display;
399 }
400 \f
401 /* Returns the last time the data was read. */
402 time_t
403 time_of_last_procedure (struct dataset *ds)
404 {
405   if (!ds)
406     return time (NULL);
407   if (ds->last_proc_invocation == 0)
408     update_last_proc_invocation (ds);
409   return ds->last_proc_invocation;
410 }
411 \f
412 /* Regular procedure. */
413
414 /* Executes any pending transformations, if necessary.
415    This is not identical to the EXECUTE command in that it won't
416    always read the source data.  This can be important when the
417    source data is given inline within BEGIN DATA...END FILE. */
418 bool
419 proc_execute (struct dataset *ds)
420 {
421   bool ok;
422
423   if ((!ds->temporary || !ds->temporary_trns_chain.n)
424       && !ds->permanent_trns_chain.n)
425     {
426       ds->n_lag = 0;
427       ds->discard_output = false;
428       dict_set_case_limit (ds->dict, 0);
429       dict_clear_vectors (ds->dict);
430       return true;
431     }
432
433   ok = casereader_destroy (proc_open (ds));
434   return proc_commit (ds) && ok;
435 }
436
437 static const struct casereader_class proc_casereader_class;
438
439 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
440    cases filtered out with FILTER BY will not be included in the casereader
441    (which is usually desirable).  If FILTER is false, all cases will be
442    included regardless of FILTER BY settings.
443
444    proc_commit must be called when done. */
445 struct casereader *
446 proc_open_filtering (struct dataset *ds, bool filter)
447 {
448   struct casereader *reader;
449
450   assert (ds->n_stack == 0);
451   assert (ds->source != NULL);
452   assert (ds->proc_state == PROC_COMMITTED);
453
454   update_last_proc_invocation (ds);
455
456   caseinit_mark_for_init (ds->caseinit, ds->dict);
457   ds->source = caseinit_translate_casereader_to_init_vars (
458     ds->caseinit, dict_get_proto (ds->dict), ds->source);
459
460   /* Finish up the collection of transformations. */
461   add_case_limit_trns (ds);
462   if (filter)
463     add_filter_trns (ds);
464   if (!proc_in_temporary_transformations (ds))
465     add_measurement_level_trns (ds, ds->dict);
466
467   /* Make permanent_dict refer to the dictionary right before
468      data reaches the sink. */
469   if (ds->permanent_dict == NULL)
470     ds->permanent_dict = ds->dict;
471
472   /* Prepare sink. */
473   if (!ds->discard_output)
474     {
475       struct dictionary *pd = ds->permanent_dict;
476       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
477       if (compacted_n_values < dict_get_next_value_idx (pd))
478         {
479           struct caseproto *compacted_proto;
480           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
481           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
482           ds->sink = autopaging_writer_create (compacted_proto);
483           caseproto_unref (compacted_proto);
484         }
485       else
486         {
487           ds->compactor = NULL;
488           ds->sink = autopaging_writer_create (dict_get_proto (pd));
489         }
490     }
491   else
492     {
493       ds->compactor = NULL;
494       ds->sink = NULL;
495     }
496
497   /* Allocate memory for lagged cases. */
498   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
499
500   ds->proc_state = PROC_OPEN;
501   ds->cases_written = 0;
502   ds->ok = true;
503
504   /* FIXME: use taint in dataset in place of `ok'? */
505   /* FIXME: for trivial cases we can just return a clone of
506      ds->source? */
507
508   /* Create casereader and insert a shim on top.  The shim allows us to
509      arbitrarily extend the casereader's lifetime, by slurping the cases into
510      the shim's buffer in proc_commit().  That is especially useful when output
511      table_items are generated directly from the procedure casereader (e.g. by
512      the LIST procedure) when we are using an output driver that keeps a
513      reference to the output items passed to it (e.g. the GUI output driver in
514      PSPPIRE). */
515   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
516                                          CASENUMBER_MAX,
517                                          &proc_casereader_class, ds);
518   ds->shim = casereader_shim_insert (reader);
519   return reader;
520 }
521
522 /* Opens dataset DS for reading cases with proc_read.
523    proc_commit must be called when done. */
524 struct casereader *
525 proc_open (struct dataset *ds)
526 {
527   return proc_open_filtering (ds, true);
528 }
529
530 /* Returns true if a procedure is in progress, that is, if
531    proc_open has been called but proc_commit has not. */
532 bool
533 proc_is_open (const struct dataset *ds)
534 {
535   return ds->proc_state != PROC_COMMITTED;
536 }
537
538 /* "read" function for procedure casereader. */
539 static struct ccase *
540 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
541 {
542   struct dataset *ds = ds_;
543   enum trns_result retval = TRNS_DROP_CASE;
544   struct ccase *c;
545
546   assert (ds->proc_state == PROC_OPEN);
547   for (; ; case_unref (c))
548     {
549       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
550       if (retval == TRNS_ERROR)
551         ds->ok = false;
552       if (!ds->ok)
553         return NULL;
554
555       /* Read a case from source. */
556       c = casereader_read (ds->source);
557       if (c == NULL)
558         return NULL;
559       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
560       caseinit_restore_left_vars (ds->caseinit, c);
561
562       /* Execute permanent transformations.  */
563       casenumber case_nr = ds->cases_written + 1;
564       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
565       caseinit_save_left_vars (ds->caseinit, c);
566       if (retval != TRNS_CONTINUE)
567         continue;
568
569       /* Write case to collection of lagged cases. */
570       if (ds->n_lag > 0)
571         {
572           while (deque_count (&ds->lag) >= ds->n_lag)
573             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
574           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
575         }
576
577       /* Write case to replacement dataset. */
578       ds->cases_written++;
579       if (ds->sink != NULL)
580         casewriter_write (ds->sink,
581                           case_map_execute (ds->compactor, case_ref (c)));
582
583       /* Execute temporary transformations. */
584       if (ds->temporary_trns_chain.n)
585         {
586           retval = trns_chain_execute (&ds->temporary_trns_chain,
587                                        ds->cases_written, &c);
588           if (retval != TRNS_CONTINUE)
589             continue;
590         }
591
592       return c;
593     }
594 }
595
596 /* "destroy" function for procedure casereader. */
597 static void
598 proc_casereader_destroy (struct casereader *reader, void *ds_)
599 {
600   struct dataset *ds = ds_;
601   struct ccase *c;
602
603   /* We are always the subreader for a casereader_buffer, so if we're being
604      destroyed then it's because the casereader_buffer has read all the cases
605      that it ever will. */
606   ds->shim = NULL;
607
608   /* Make sure transformations happen for every input case, in
609      case they have side effects, and ensure that the replacement
610      active dataset gets all the cases it should. */
611   while ((c = casereader_read (reader)) != NULL)
612     case_unref (c);
613
614   ds->proc_state = PROC_CLOSED;
615   ds->ok = casereader_destroy (ds->source) && ds->ok;
616   ds->source = NULL;
617   dataset_set_source (ds, NULL);
618 }
619
620 /* Must return false if the source casereader, a transformation,
621    or the sink casewriter signaled an error.  (If a temporary
622    transformation signals an error, then the return value is
623    false, but the replacement active dataset may still be
624    untainted.) */
625 bool
626 proc_commit (struct dataset *ds)
627 {
628   if (ds->shim != NULL)
629     casereader_shim_slurp (ds->shim);
630
631   assert (ds->proc_state == PROC_CLOSED);
632   ds->proc_state = PROC_COMMITTED;
633
634   dataset_changed__ (ds);
635
636   /* Free memory for lagged cases. */
637   while (!deque_is_empty (&ds->lag))
638     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
639   free (ds->lag_cases);
640
641   /* Dictionary from before TEMPORARY becomes permanent. */
642   proc_cancel_temporary_transformations (ds);
643   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
644
645   if (!ds->discard_output)
646     {
647       /* Finish compacting. */
648       if (ds->compactor != NULL)
649         {
650           case_map_destroy (ds->compactor);
651           ds->compactor = NULL;
652
653           dict_delete_scratch_vars (ds->dict);
654           dict_compact_values (ds->dict);
655         }
656
657       /* Old data sink becomes new data source. */
658       if (ds->sink != NULL)
659         ds->source = casewriter_make_reader (ds->sink);
660     }
661   else
662     {
663       ds->source = NULL;
664       ds->discard_output = false;
665     }
666   ds->sink = NULL;
667
668   caseinit_clear (ds->caseinit);
669   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
670
671   dict_clear_vectors (ds->dict);
672   ds->permanent_dict = NULL;
673   return ok;
674 }
675
676 /* Casereader class for procedure execution. */
677 static const struct casereader_class proc_casereader_class =
678   {
679     proc_casereader_read,
680     proc_casereader_destroy,
681     NULL,
682     NULL,
683   };
684
685 /* Updates last_proc_invocation. */
686 static void
687 update_last_proc_invocation (struct dataset *ds)
688 {
689   ds->last_proc_invocation = time (NULL);
690 }
691 \f
692 /* Returns a pointer to the lagged case from N_BEFORE cases before the
693    current one, or NULL if there haven't been that many cases yet. */
694 const struct ccase *
695 lagged_case (const struct dataset *ds, int n_before)
696 {
697   assert (n_before >= 1);
698   assert (n_before <= ds->n_lag);
699
700   if (n_before <= deque_count (&ds->lag))
701     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
702   else
703     return NULL;
704 }
705 \f
706 /* Adds TRNS to the current set of transformations. */
707 void
708 add_transformation (struct dataset *ds,
709                     const struct trns_class *class, void *aux)
710 {
711   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
712                               : ds->temporary ? &ds->temporary_trns_chain
713                               : &ds->permanent_trns_chain);
714   struct transformation t = { .class = class, .aux = aux };
715   trns_chain_append (chain, &t);
716   dataset_transformations_changed__ (ds, true);
717 }
718
719 /* Returns true if the next call to add_transformation() will add
720    a temporary transformation, false if it will add a permanent
721    transformation. */
722 bool
723 proc_in_temporary_transformations (const struct dataset *ds)
724 {
725   return ds->temporary;
726 }
727
728 /* Marks the start of temporary transformations.
729    Further calls to add_transformation() will add temporary
730    transformations. */
731 void
732 proc_start_temporary_transformations (struct dataset *ds)
733 {
734   assert (!ds->n_stack);
735   if (!proc_in_temporary_transformations (ds))
736     {
737       add_case_limit_trns (ds);
738
739       ds->permanent_dict = dict_clone (ds->dict);
740       add_measurement_level_trns (ds, ds->permanent_dict);
741
742       ds->temporary = true;
743       dataset_transformations_changed__ (ds, true);
744     }
745 }
746
747 /* Converts all the temporary transformations, if any, to permanent
748    transformations.  Further transformations will be permanent.
749
750    The FILTER command is implemented as a temporary transformation, so a
751    procedure that uses this function should usually use proc_open_filtering()
752    with FILTER false, instead of plain proc_open().
753
754    Returns true if anything changed, false otherwise. */
755 bool
756 proc_make_temporary_transformations_permanent (struct dataset *ds)
757 {
758   if (proc_in_temporary_transformations (ds))
759     {
760       cancel_measurement_level_trns (&ds->permanent_trns_chain);
761       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
762
763       ds->temporary = false;
764
765       dict_unref (ds->permanent_dict);
766       ds->permanent_dict = NULL;
767
768       return true;
769     }
770   else
771     return false;
772 }
773
774 /* Cancels all temporary transformations, if any.  Further
775    transformations will be permanent.
776    Returns true if anything changed, false otherwise. */
777 bool
778 proc_cancel_temporary_transformations (struct dataset *ds)
779 {
780   if (proc_in_temporary_transformations (ds))
781     {
782       trns_chain_clear (&ds->temporary_trns_chain);
783
784       dict_unref (ds->dict);
785       ds->dict = ds->permanent_dict;
786       ds->permanent_dict = NULL;
787
788       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
789       return true;
790     }
791   else
792     return false;
793 }
794
795 /* Cancels all transformations, if any.
796    Returns true if successful, false on I/O error. */
797 bool
798 proc_cancel_all_transformations (struct dataset *ds)
799 {
800   bool ok;
801   assert (ds->proc_state == PROC_COMMITTED);
802   ok = trns_chain_clear (&ds->permanent_trns_chain);
803   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
804   ds->temporary = false;
805   for (size_t i = 0; i < ds->n_stack; i++)
806     ok = trns_chain_uninit (&ds->stack[i]) && ok;
807   ds->n_stack = 0;
808   dataset_transformations_changed__ (ds, false);
809
810   return ok;
811 }
812
813 void
814 proc_push_transformations (struct dataset *ds)
815 {
816   if (ds->n_stack >= ds->allocated_stack)
817     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
818                             sizeof *ds->stack);
819   trns_chain_init (&ds->stack[ds->n_stack++]);
820 }
821
822 void
823 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
824 {
825   assert (ds->n_stack > 0);
826   *chain = ds->stack[--ds->n_stack];
827 }
828
829 bool
830 proc_has_transformations (const struct dataset *ds)
831 {
832   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
833 }
834
835 static enum trns_result
836 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
837 {
838   struct variable *var = var_;
839
840   *cc = case_unshare (*cc);
841   *case_num_rw (*cc, var) = case_num;
842
843   return TRNS_CONTINUE;
844 }
845
846 /* Add a variable which we can sort by to get back the original order. */
847 struct variable *
848 add_permanent_ordering_transformation (struct dataset *ds)
849 {
850   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
851   struct variable *order_var
852     = (proc_in_temporary_transformations (ds)
853        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
854        : temp_var);
855
856   static const struct trns_class trns_class = {
857     .name = "ordering",
858     .execute = store_case_num
859   };
860   const struct transformation t = { .class = &trns_class, .aux = order_var };
861   trns_chain_append (&ds->permanent_trns_chain, &t);
862
863   return temp_var;
864 }
865 \f
866 /* Causes output from the next procedure to be discarded, instead
867    of being preserved for use as input for the next procedure. */
868 void
869 proc_discard_output (struct dataset *ds)
870 {
871   ds->discard_output = true;
872 }
873
874
875 /* Checks whether DS has a corrupted active dataset.  If so,
876    discards it and returns false.  If not, returns true without
877    doing anything. */
878 bool
879 dataset_end_of_command (struct dataset *ds)
880 {
881   if (ds->source != NULL)
882     {
883       if (casereader_error (ds->source))
884         {
885           dataset_clear (ds);
886           return false;
887         }
888       else
889         {
890           const struct taint *taint = casereader_get_taint (ds->source);
891           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
892           assert (!taint_has_tainted_successor (taint));
893         }
894     }
895   return true;
896 }
897 \f
898 /* Limits the maximum number of cases processed to
899    *CASES_REMAINING. */
900 static enum trns_result
901 case_limit_trns_proc (void *cases_remaining_,
902                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
903 {
904   size_t *cases_remaining = cases_remaining_;
905   if (*cases_remaining > 0)
906     {
907       (*cases_remaining)--;
908       return TRNS_CONTINUE;
909     }
910   else
911     return TRNS_DROP_CASE;
912 }
913
914 /* Frees the data associated with a case limit transformation. */
915 static bool
916 case_limit_trns_free (void *cases_remaining_)
917 {
918   size_t *cases_remaining = cases_remaining_;
919   free (cases_remaining);
920   return true;
921 }
922
923 /* Adds a transformation that limits the number of cases that may
924    pass through, if DS->DICT has a case limit. */
925 static void
926 add_case_limit_trns (struct dataset *ds)
927 {
928   casenumber case_limit = dict_get_case_limit (ds->dict);
929   if (case_limit != 0)
930     {
931       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
932       *cases_remaining = case_limit;
933
934       static const struct trns_class trns_class = {
935         .name = "case limit",
936         .execute = case_limit_trns_proc,
937         .destroy = case_limit_trns_free,
938       };
939       add_transformation (ds, &trns_class, cases_remaining);
940
941       dict_set_case_limit (ds->dict, 0);
942     }
943 }
944
945 \f
946 /* FILTER transformation. */
947 static enum trns_result
948 filter_trns_proc (void *filter_var_,
949                   struct ccase **c, casenumber case_nr UNUSED)
950
951 {
952   struct variable *filter_var = filter_var_;
953   double f = case_num (*c, filter_var);
954   return (f != 0.0 && !var_is_num_missing (filter_var, f)
955           ? TRNS_CONTINUE : TRNS_DROP_CASE);
956 }
957
958 /* Adds a temporary transformation to filter data according to
959    the variable specified on FILTER, if any. */
960 static void
961 add_filter_trns (struct dataset *ds)
962 {
963   struct variable *filter_var = dict_get_filter (ds->dict);
964   if (filter_var != NULL)
965     {
966       proc_start_temporary_transformations (ds);
967
968       static const struct trns_class trns_class = {
969         .name = "FILTER",
970         .execute = filter_trns_proc,
971       };
972       add_transformation (ds, &trns_class, filter_var);
973     }
974 }
975
976 void
977 dataset_need_lag (struct dataset *ds, int n_before)
978 {
979   ds->n_lag = MAX (ds->n_lag, n_before);
980 }
981 \f
982 /* Measurement guesser, for guessing a measurement level from formats and
983    data. */
984
985 struct mg_value
986   {
987     struct hmap_node hmap_node;
988     double value;
989   };
990
991 struct mg_var
992   {
993     struct variable *var;
994     struct hmap *values;
995   };
996
997 static void
998 mg_var_uninit (struct mg_var *mgv)
999 {
1000   struct mg_value *mgvalue, *next;
1001   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1002                       mgv->values)
1003     {
1004       hmap_delete (mgv->values, &mgvalue->hmap_node);
1005       free (mgvalue);
1006     }
1007   hmap_destroy (mgv->values);
1008   free (mgv->values);
1009 }
1010
1011 static enum measure
1012 mg_var_interpret (const struct mg_var *mgv)
1013 {
1014   size_t n = hmap_count (mgv->values);
1015   if (!n)
1016     {
1017       /* All missing (or no data). */
1018       return MEASURE_NOMINAL;
1019     }
1020
1021   const struct mg_value *mgvalue;
1022   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1023                  mgv->values)
1024     if (mgvalue->value < 10)
1025       return MEASURE_NOMINAL;
1026   return MEASURE_SCALE;
1027 }
1028
1029 static enum measure
1030 mg_var_add_value (struct mg_var *mgv, double value)
1031 {
1032   if (var_is_num_missing (mgv->var, value))
1033     return MEASURE_UNKNOWN;
1034   else if (value < 0 || value != floor (value))
1035     return MEASURE_SCALE;
1036
1037   size_t hash = hash_double (value, 0);
1038   struct mg_value *mgvalue;
1039   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1040                            hash, mgv->values)
1041     if (mgvalue->value == value)
1042       return MEASURE_UNKNOWN;
1043
1044   mgvalue = xmalloc (sizeof *mgvalue);
1045   mgvalue->value = value;
1046   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1047   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1048     return MEASURE_SCALE;
1049
1050   return MEASURE_UNKNOWN;
1051 }
1052
1053 struct measure_guesser
1054   {
1055     struct mg_var *vars;
1056     size_t n_vars;
1057   };
1058
1059 static struct measure_guesser *
1060 measure_guesser_create__ (struct dictionary *dict)
1061 {
1062   struct mg_var *mgvs = NULL;
1063   size_t n_mgvs = 0;
1064   size_t allocated_mgvs = 0;
1065
1066   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1067     {
1068       struct variable *var = dict_get_var (dict, i);
1069       if (var_get_measure (var) != MEASURE_UNKNOWN)
1070         continue;
1071
1072       struct fmt_spec f = var_get_print_format (var);
1073       enum measure m = var_default_measure_for_format (f.type);
1074       if (m != MEASURE_UNKNOWN)
1075         {
1076           var_set_measure (var, m);
1077           continue;
1078         }
1079
1080       if (n_mgvs >= allocated_mgvs)
1081         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1082
1083       struct mg_var *mgv = &mgvs[n_mgvs++];
1084       *mgv = (struct mg_var) {
1085         .var = var,
1086         .values = xmalloc (sizeof *mgv->values),
1087       };
1088       hmap_init (mgv->values);
1089     }
1090   if (!n_mgvs)
1091     return NULL;
1092
1093   struct measure_guesser *mg = xmalloc (sizeof *mg);
1094   *mg = (struct measure_guesser) {
1095     .vars = mgvs,
1096     .n_vars = n_mgvs,
1097   };
1098   return mg;
1099 }
1100
1101 /* Scans through DS's dictionary for variables that have an unknown measurement
1102    level.  For those, if the measurement level can be guessed based on the
1103    variable's type and format, sets a default.  If that's enough, returns NULL.
1104    If any remain whose levels are unknown and can't be guessed that way,
1105    creates and returns a structure that the caller should pass to
1106    measure_guesser_add_case() or measure_guesser_run() for guessing a
1107    measurement level based on the data.  */
1108 struct measure_guesser *
1109 measure_guesser_create (struct dataset *ds)
1110 {
1111   return measure_guesser_create__ (dataset_dict (ds));
1112 }
1113
1114 /* Adds data from case C to MG. */
1115 static void
1116 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1117 {
1118   for (size_t i = 0; i < mg->n_vars; )
1119     {
1120       struct mg_var *mgv = &mg->vars[i];
1121       double value = case_num (c, mgv->var);
1122       enum measure m = mg_var_add_value (mgv, value);
1123       if (m != MEASURE_UNKNOWN)
1124         {
1125           var_set_measure (mgv->var, m);
1126
1127           mg_var_uninit (mgv);
1128           *mgv = mg->vars[--mg->n_vars];
1129         }
1130       else
1131         i++;
1132     }
1133 }
1134
1135 /* Destroys MG. */
1136 void
1137 measure_guesser_destroy (struct measure_guesser *mg)
1138 {
1139   if (!mg)
1140     return;
1141
1142   for (size_t i = 0; i < mg->n_vars; i++)
1143     {
1144       struct mg_var *mgv = &mg->vars[i];
1145       var_set_measure (mgv->var, mg_var_interpret (mgv));
1146       mg_var_uninit (mgv);
1147     }
1148   free (mg->vars);
1149   free (mg);
1150 }
1151
1152 /* Adds final measurement levels based on MG, after all the cases have been
1153    added. */
1154 static void
1155 measure_guesser_commit (struct measure_guesser *mg)
1156 {
1157   for (size_t i = 0; i < mg->n_vars; i++)
1158     {
1159       struct mg_var *mgv = &mg->vars[i];
1160       var_set_measure (mgv->var, mg_var_interpret (mgv));
1161     }
1162 }
1163
1164 /* Passes the cases in READER through MG and uses the data in the cases to set
1165    measurement levels for the variables where they were still unknown. */
1166 void
1167 measure_guesser_run (struct measure_guesser *mg,
1168                      const struct casereader *reader)
1169 {
1170   struct casereader *r = casereader_clone (reader);
1171   while (mg->n_vars > 0)
1172     {
1173       struct ccase *c = casereader_read (r);
1174       if (!c)
1175         break;
1176       measure_guesser_add_case (mg, c);
1177       case_unref (c);
1178     }
1179   casereader_destroy (r);
1180
1181   measure_guesser_commit (mg);
1182 }
1183 \f
1184 /* A transformation for guessing measurement levels. */
1185
1186 static enum trns_result
1187 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1188 {
1189   struct measure_guesser *mg = mg_;
1190   measure_guesser_add_case (mg, *c);
1191   return TRNS_CONTINUE;
1192 }
1193
1194 static bool
1195 mg_trns_free (void *mg_)
1196 {
1197   struct measure_guesser *mg = mg_;
1198   measure_guesser_commit (mg);
1199   measure_guesser_destroy (mg);
1200   return true;
1201 }
1202
1203 static const struct trns_class mg_trns_class = {
1204   .name = "add measurement level",
1205   .execute = mg_trns_proc,
1206   .destroy = mg_trns_free,
1207 };
1208
1209 static void
1210 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1211 {
1212   struct measure_guesser *mg = measure_guesser_create__ (dict);
1213   if (mg)
1214     add_transformation (ds, &mg_trns_class, mg);
1215 }
1216
1217 static void
1218 cancel_measurement_level_trns (struct trns_chain *chain)
1219 {
1220   if (!chain->n)
1221     return;
1222
1223   struct transformation *trns = &chain->xforms[chain->n - 1];
1224   if (trns->class != &mg_trns_class)
1225     return;
1226
1227   struct measure_guesser *mg = trns->aux;
1228   measure_guesser_destroy (mg);
1229   chain->n--;
1230 }
1231 \f
1232 static void
1233 dataset_changed__ (struct dataset *ds)
1234 {
1235   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1236     ds->callbacks->changed (ds->cb_data);
1237 }
1238
1239 static void
1240 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1241 {
1242   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1243     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1244 }
1245 \f
1246 /* Private interface for use by session code. */
1247
1248 void
1249 dataset_set_session__ (struct dataset *ds, struct session *session)
1250 {
1251   ds->session = session;
1252 }