dictionary: Always compact immediately upon deletion of a variable.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 void
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   assert (!proc_in_temporary_transformations (ds));
354   assert (!proc_has_transformations (ds));
355   assert (n < dict_get_n_vars (ds->dict));
356
357   caseinit_mark_for_init (ds->caseinit, ds->dict);
358   ds->source = caseinit_translate_casereader_to_init_vars (
359     ds->caseinit, dict_get_proto (ds->dict), ds->source);
360   caseinit_clear (ds->caseinit);
361   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
362
363   struct case_map_stage *stage = case_map_stage_create (ds->dict);
364   dict_delete_vars (ds->dict, vars, n);
365   ds->source = case_map_create_input_translator (
366     case_map_stage_get_case_map (stage), ds->source);
367   case_map_stage_destroy (stage);
368   caseinit_clear (ds->caseinit);
369   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
370 }
371
372 /* Returns a number unique to DS.  It can be used to distinguish one dataset
373    from any other within a given program run, even datasets that do not exist
374    at the same time. */
375 unsigned int
376 dataset_seqno (const struct dataset *ds)
377 {
378   return ds->seqno;
379 }
380
381 void
382 dataset_set_callbacks (struct dataset *ds,
383                        const struct dataset_callbacks *callbacks,
384                        void *cb_data)
385 {
386   ds->callbacks = callbacks;
387   ds->cb_data = cb_data;
388 }
389
390 enum dataset_display
391 dataset_get_display (const struct dataset *ds)
392 {
393   return ds->display;
394 }
395
396 void
397 dataset_set_display (struct dataset *ds, enum dataset_display display)
398 {
399   ds->display = display;
400 }
401 \f
402 /* Returns the last time the data was read. */
403 time_t
404 time_of_last_procedure (struct dataset *ds)
405 {
406   if (!ds)
407     return time (NULL);
408   if (ds->last_proc_invocation == 0)
409     update_last_proc_invocation (ds);
410   return ds->last_proc_invocation;
411 }
412 \f
413 /* Regular procedure. */
414
415 /* Executes any pending transformations, if necessary.
416    This is not identical to the EXECUTE command in that it won't
417    always read the source data.  This can be important when the
418    source data is given inline within BEGIN DATA...END FILE. */
419 bool
420 proc_execute (struct dataset *ds)
421 {
422   bool ok;
423
424   if ((!ds->temporary || !ds->temporary_trns_chain.n)
425       && !ds->permanent_trns_chain.n)
426     {
427       ds->n_lag = 0;
428       ds->discard_output = false;
429       dict_set_case_limit (ds->dict, 0);
430       dict_clear_vectors (ds->dict);
431       return true;
432     }
433
434   ok = casereader_destroy (proc_open (ds));
435   return proc_commit (ds) && ok;
436 }
437
438 static const struct casereader_class proc_casereader_class;
439
440 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
441    cases filtered out with FILTER BY will not be included in the casereader
442    (which is usually desirable).  If FILTER is false, all cases will be
443    included regardless of FILTER BY settings.
444
445    proc_commit must be called when done. */
446 struct casereader *
447 proc_open_filtering (struct dataset *ds, bool filter)
448 {
449   struct casereader *reader;
450
451   assert (ds->n_stack == 0);
452   assert (ds->source != NULL);
453   assert (ds->proc_state == PROC_COMMITTED);
454
455   update_last_proc_invocation (ds);
456
457   caseinit_mark_for_init (ds->caseinit, ds->dict);
458   ds->source = caseinit_translate_casereader_to_init_vars (
459     ds->caseinit, dict_get_proto (ds->dict), ds->source);
460
461   /* Finish up the collection of transformations. */
462   add_case_limit_trns (ds);
463   if (filter)
464     add_filter_trns (ds);
465   if (!proc_in_temporary_transformations (ds))
466     add_measurement_level_trns (ds, ds->dict);
467
468   /* Make permanent_dict refer to the dictionary right before
469      data reaches the sink. */
470   if (ds->permanent_dict == NULL)
471     ds->permanent_dict = ds->dict;
472
473   /* Prepare sink. */
474   if (!ds->discard_output)
475     {
476       struct dictionary *pd = dict_clone (ds->permanent_dict);
477       struct case_map_stage *stage = case_map_stage_create (pd);
478       dict_delete_scratch_vars (pd);
479       ds->compactor = case_map_stage_get_case_map (stage);
480       case_map_stage_destroy (stage);
481       ds->sink = autopaging_writer_create (dict_get_proto (pd));
482       dict_unref (pd);
483     }
484   else
485     {
486       ds->compactor = NULL;
487       ds->sink = NULL;
488     }
489
490   /* Allocate memory for lagged cases. */
491   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
492
493   ds->proc_state = PROC_OPEN;
494   ds->cases_written = 0;
495   ds->ok = true;
496
497   /* FIXME: use taint in dataset in place of `ok'? */
498   /* FIXME: for trivial cases we can just return a clone of
499      ds->source? */
500
501   /* Create casereader and insert a shim on top.  The shim allows us to
502      arbitrarily extend the casereader's lifetime, by slurping the cases into
503      the shim's buffer in proc_commit().  That is especially useful when output
504      table_items are generated directly from the procedure casereader (e.g. by
505      the LIST procedure) when we are using an output driver that keeps a
506      reference to the output items passed to it (e.g. the GUI output driver in
507      PSPPIRE). */
508   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
509                                          CASENUMBER_MAX,
510                                          &proc_casereader_class, ds);
511   ds->shim = casereader_shim_insert (reader);
512   return reader;
513 }
514
515 /* Opens dataset DS for reading cases with proc_read.
516    proc_commit must be called when done. */
517 struct casereader *
518 proc_open (struct dataset *ds)
519 {
520   return proc_open_filtering (ds, true);
521 }
522
523 /* Returns true if a procedure is in progress, that is, if
524    proc_open has been called but proc_commit has not. */
525 bool
526 proc_is_open (const struct dataset *ds)
527 {
528   return ds->proc_state != PROC_COMMITTED;
529 }
530
531 /* "read" function for procedure casereader. */
532 static struct ccase *
533 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
534 {
535   struct dataset *ds = ds_;
536   enum trns_result retval = TRNS_DROP_CASE;
537   struct ccase *c;
538
539   assert (ds->proc_state == PROC_OPEN);
540   for (; ; case_unref (c))
541     {
542       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
543       if (retval == TRNS_ERROR)
544         ds->ok = false;
545       if (!ds->ok)
546         return NULL;
547
548       /* Read a case from source. */
549       c = casereader_read (ds->source);
550       if (c == NULL)
551         return NULL;
552       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
553       caseinit_restore_left_vars (ds->caseinit, c);
554
555       /* Execute permanent transformations.  */
556       casenumber case_nr = ds->cases_written + 1;
557       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
558       caseinit_save_left_vars (ds->caseinit, c);
559       if (retval != TRNS_CONTINUE)
560         continue;
561
562       /* Write case to collection of lagged cases. */
563       if (ds->n_lag > 0)
564         {
565           while (deque_count (&ds->lag) >= ds->n_lag)
566             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
567           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
568         }
569
570       /* Write case to replacement dataset. */
571       ds->cases_written++;
572       if (ds->sink != NULL)
573         casewriter_write (ds->sink,
574                           case_map_execute (ds->compactor, case_ref (c)));
575
576       /* Execute temporary transformations. */
577       if (ds->temporary_trns_chain.n)
578         {
579           retval = trns_chain_execute (&ds->temporary_trns_chain,
580                                        ds->cases_written, &c);
581           if (retval != TRNS_CONTINUE)
582             continue;
583         }
584
585       return c;
586     }
587 }
588
589 /* "destroy" function for procedure casereader. */
590 static void
591 proc_casereader_destroy (struct casereader *reader, void *ds_)
592 {
593   struct dataset *ds = ds_;
594   struct ccase *c;
595
596   /* We are always the subreader for a casereader_buffer, so if we're being
597      destroyed then it's because the casereader_buffer has read all the cases
598      that it ever will. */
599   ds->shim = NULL;
600
601   /* Make sure transformations happen for every input case, in
602      case they have side effects, and ensure that the replacement
603      active dataset gets all the cases it should. */
604   while ((c = casereader_read (reader)) != NULL)
605     case_unref (c);
606
607   ds->proc_state = PROC_CLOSED;
608   ds->ok = casereader_destroy (ds->source) && ds->ok;
609   ds->source = NULL;
610   dataset_set_source (ds, NULL);
611 }
612
613 /* Must return false if the source casereader, a transformation,
614    or the sink casewriter signaled an error.  (If a temporary
615    transformation signals an error, then the return value is
616    false, but the replacement active dataset may still be
617    untainted.) */
618 bool
619 proc_commit (struct dataset *ds)
620 {
621   if (ds->shim != NULL)
622     casereader_shim_slurp (ds->shim);
623
624   assert (ds->proc_state == PROC_CLOSED);
625   ds->proc_state = PROC_COMMITTED;
626
627   dataset_changed__ (ds);
628
629   /* Free memory for lagged cases. */
630   while (!deque_is_empty (&ds->lag))
631     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
632   free (ds->lag_cases);
633
634   /* Dictionary from before TEMPORARY becomes permanent. */
635   proc_cancel_temporary_transformations (ds);
636   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
637
638   if (!ds->discard_output)
639     {
640       /* Finish compacting. */
641       if (ds->compactor != NULL)
642         {
643           case_map_destroy (ds->compactor);
644           ds->compactor = NULL;
645
646           dict_delete_scratch_vars (ds->dict);
647         }
648
649       /* Old data sink becomes new data source. */
650       if (ds->sink != NULL)
651         ds->source = casewriter_make_reader (ds->sink);
652     }
653   else
654     {
655       ds->source = NULL;
656       ds->discard_output = false;
657     }
658   ds->sink = NULL;
659
660   caseinit_clear (ds->caseinit);
661   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
662
663   dict_clear_vectors (ds->dict);
664   ds->permanent_dict = NULL;
665   return ok;
666 }
667
668 /* Casereader class for procedure execution. */
669 static const struct casereader_class proc_casereader_class =
670   {
671     proc_casereader_read,
672     proc_casereader_destroy,
673     NULL,
674     NULL,
675   };
676
677 /* Updates last_proc_invocation. */
678 static void
679 update_last_proc_invocation (struct dataset *ds)
680 {
681   ds->last_proc_invocation = time (NULL);
682 }
683 \f
684 /* Returns a pointer to the lagged case from N_BEFORE cases before the
685    current one, or NULL if there haven't been that many cases yet. */
686 const struct ccase *
687 lagged_case (const struct dataset *ds, int n_before)
688 {
689   assert (n_before >= 1);
690   assert (n_before <= ds->n_lag);
691
692   if (n_before <= deque_count (&ds->lag))
693     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
694   else
695     return NULL;
696 }
697 \f
698 /* Adds TRNS to the current set of transformations. */
699 void
700 add_transformation (struct dataset *ds,
701                     const struct trns_class *class, void *aux)
702 {
703   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
704                               : ds->temporary ? &ds->temporary_trns_chain
705                               : &ds->permanent_trns_chain);
706   struct transformation t = { .class = class, .aux = aux };
707   trns_chain_append (chain, &t);
708   dataset_transformations_changed__ (ds, true);
709 }
710
711 /* Returns true if the next call to add_transformation() will add
712    a temporary transformation, false if it will add a permanent
713    transformation. */
714 bool
715 proc_in_temporary_transformations (const struct dataset *ds)
716 {
717   return ds->temporary;
718 }
719
720 /* Marks the start of temporary transformations.
721    Further calls to add_transformation() will add temporary
722    transformations. */
723 void
724 proc_start_temporary_transformations (struct dataset *ds)
725 {
726   assert (!ds->n_stack);
727   if (!proc_in_temporary_transformations (ds))
728     {
729       add_case_limit_trns (ds);
730
731       ds->permanent_dict = dict_clone (ds->dict);
732       add_measurement_level_trns (ds, ds->permanent_dict);
733
734       ds->temporary = true;
735       dataset_transformations_changed__ (ds, true);
736     }
737 }
738
739 /* Converts all the temporary transformations, if any, to permanent
740    transformations.  Further transformations will be permanent.
741
742    The FILTER command is implemented as a temporary transformation, so a
743    procedure that uses this function should usually use proc_open_filtering()
744    with FILTER false, instead of plain proc_open().
745
746    Returns true if anything changed, false otherwise. */
747 bool
748 proc_make_temporary_transformations_permanent (struct dataset *ds)
749 {
750   if (proc_in_temporary_transformations (ds))
751     {
752       cancel_measurement_level_trns (&ds->permanent_trns_chain);
753       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
754
755       ds->temporary = false;
756
757       dict_unref (ds->permanent_dict);
758       ds->permanent_dict = NULL;
759
760       return true;
761     }
762   else
763     return false;
764 }
765
766 /* Cancels all temporary transformations, if any.  Further
767    transformations will be permanent.
768    Returns true if anything changed, false otherwise. */
769 bool
770 proc_cancel_temporary_transformations (struct dataset *ds)
771 {
772   if (proc_in_temporary_transformations (ds))
773     {
774       trns_chain_clear (&ds->temporary_trns_chain);
775
776       dict_unref (ds->dict);
777       ds->dict = ds->permanent_dict;
778       ds->permanent_dict = NULL;
779
780       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
781       return true;
782     }
783   else
784     return false;
785 }
786
787 /* Cancels all transformations, if any.
788    Returns true if successful, false on I/O error. */
789 bool
790 proc_cancel_all_transformations (struct dataset *ds)
791 {
792   bool ok;
793   assert (ds->proc_state == PROC_COMMITTED);
794   ok = trns_chain_clear (&ds->permanent_trns_chain);
795   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
796   ds->temporary = false;
797   for (size_t i = 0; i < ds->n_stack; i++)
798     ok = trns_chain_uninit (&ds->stack[i]) && ok;
799   ds->n_stack = 0;
800   dataset_transformations_changed__ (ds, false);
801
802   return ok;
803 }
804
805 void
806 proc_push_transformations (struct dataset *ds)
807 {
808   if (ds->n_stack >= ds->allocated_stack)
809     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
810                             sizeof *ds->stack);
811   trns_chain_init (&ds->stack[ds->n_stack++]);
812 }
813
814 void
815 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
816 {
817   assert (ds->n_stack > 0);
818   *chain = ds->stack[--ds->n_stack];
819 }
820
821 bool
822 proc_has_transformations (const struct dataset *ds)
823 {
824   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
825 }
826
827 static enum trns_result
828 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
829 {
830   struct variable *var = var_;
831
832   *cc = case_unshare (*cc);
833   *case_num_rw (*cc, var) = case_num;
834
835   return TRNS_CONTINUE;
836 }
837
838 /* Add a variable which we can sort by to get back the original order. */
839 struct variable *
840 add_permanent_ordering_transformation (struct dataset *ds)
841 {
842   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
843   struct variable *order_var
844     = (proc_in_temporary_transformations (ds)
845        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
846        : temp_var);
847
848   static const struct trns_class trns_class = {
849     .name = "ordering",
850     .execute = store_case_num
851   };
852   const struct transformation t = { .class = &trns_class, .aux = order_var };
853   trns_chain_append (&ds->permanent_trns_chain, &t);
854
855   return temp_var;
856 }
857 \f
858 /* Causes output from the next procedure to be discarded, instead
859    of being preserved for use as input for the next procedure. */
860 void
861 proc_discard_output (struct dataset *ds)
862 {
863   ds->discard_output = true;
864 }
865
866
867 /* Checks whether DS has a corrupted active dataset.  If so,
868    discards it and returns false.  If not, returns true without
869    doing anything. */
870 bool
871 dataset_end_of_command (struct dataset *ds)
872 {
873   if (ds->source != NULL)
874     {
875       if (casereader_error (ds->source))
876         {
877           dataset_clear (ds);
878           return false;
879         }
880       else
881         {
882           const struct taint *taint = casereader_get_taint (ds->source);
883           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
884           assert (!taint_has_tainted_successor (taint));
885         }
886     }
887   return true;
888 }
889 \f
890 /* Limits the maximum number of cases processed to
891    *CASES_REMAINING. */
892 static enum trns_result
893 case_limit_trns_proc (void *cases_remaining_,
894                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
895 {
896   size_t *cases_remaining = cases_remaining_;
897   if (*cases_remaining > 0)
898     {
899       (*cases_remaining)--;
900       return TRNS_CONTINUE;
901     }
902   else
903     return TRNS_DROP_CASE;
904 }
905
906 /* Frees the data associated with a case limit transformation. */
907 static bool
908 case_limit_trns_free (void *cases_remaining_)
909 {
910   size_t *cases_remaining = cases_remaining_;
911   free (cases_remaining);
912   return true;
913 }
914
915 /* Adds a transformation that limits the number of cases that may
916    pass through, if DS->DICT has a case limit. */
917 static void
918 add_case_limit_trns (struct dataset *ds)
919 {
920   casenumber case_limit = dict_get_case_limit (ds->dict);
921   if (case_limit != 0)
922     {
923       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
924       *cases_remaining = case_limit;
925
926       static const struct trns_class trns_class = {
927         .name = "case limit",
928         .execute = case_limit_trns_proc,
929         .destroy = case_limit_trns_free,
930       };
931       add_transformation (ds, &trns_class, cases_remaining);
932
933       dict_set_case_limit (ds->dict, 0);
934     }
935 }
936
937 \f
938 /* FILTER transformation. */
939 static enum trns_result
940 filter_trns_proc (void *filter_var_,
941                   struct ccase **c, casenumber case_nr UNUSED)
942
943 {
944   struct variable *filter_var = filter_var_;
945   double f = case_num (*c, filter_var);
946   return (f != 0.0 && !var_is_num_missing (filter_var, f)
947           ? TRNS_CONTINUE : TRNS_DROP_CASE);
948 }
949
950 /* Adds a temporary transformation to filter data according to
951    the variable specified on FILTER, if any. */
952 static void
953 add_filter_trns (struct dataset *ds)
954 {
955   struct variable *filter_var = dict_get_filter (ds->dict);
956   if (filter_var != NULL)
957     {
958       proc_start_temporary_transformations (ds);
959
960       static const struct trns_class trns_class = {
961         .name = "FILTER",
962         .execute = filter_trns_proc,
963       };
964       add_transformation (ds, &trns_class, filter_var);
965     }
966 }
967
968 void
969 dataset_need_lag (struct dataset *ds, int n_before)
970 {
971   ds->n_lag = MAX (ds->n_lag, n_before);
972 }
973 \f
974 /* Measurement guesser, for guessing a measurement level from formats and
975    data. */
976
977 struct mg_value
978   {
979     struct hmap_node hmap_node;
980     double value;
981   };
982
983 struct mg_var
984   {
985     struct variable *var;
986     struct hmap *values;
987   };
988
989 static void
990 mg_var_uninit (struct mg_var *mgv)
991 {
992   struct mg_value *mgvalue, *next;
993   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
994                       mgv->values)
995     {
996       hmap_delete (mgv->values, &mgvalue->hmap_node);
997       free (mgvalue);
998     }
999   hmap_destroy (mgv->values);
1000   free (mgv->values);
1001 }
1002
1003 static enum measure
1004 mg_var_interpret (const struct mg_var *mgv)
1005 {
1006   size_t n = hmap_count (mgv->values);
1007   if (!n)
1008     {
1009       /* All missing (or no data). */
1010       return MEASURE_NOMINAL;
1011     }
1012
1013   const struct mg_value *mgvalue;
1014   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1015                  mgv->values)
1016     if (mgvalue->value < 10)
1017       return MEASURE_NOMINAL;
1018   return MEASURE_SCALE;
1019 }
1020
1021 static enum measure
1022 mg_var_add_value (struct mg_var *mgv, double value)
1023 {
1024   if (var_is_num_missing (mgv->var, value))
1025     return MEASURE_UNKNOWN;
1026   else if (value < 0 || value != floor (value))
1027     return MEASURE_SCALE;
1028
1029   size_t hash = hash_double (value, 0);
1030   struct mg_value *mgvalue;
1031   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1032                            hash, mgv->values)
1033     if (mgvalue->value == value)
1034       return MEASURE_UNKNOWN;
1035
1036   mgvalue = xmalloc (sizeof *mgvalue);
1037   mgvalue->value = value;
1038   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1039   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1040     return MEASURE_SCALE;
1041
1042   return MEASURE_UNKNOWN;
1043 }
1044
1045 struct measure_guesser
1046   {
1047     struct mg_var *vars;
1048     size_t n_vars;
1049   };
1050
1051 static struct measure_guesser *
1052 measure_guesser_create__ (struct dictionary *dict)
1053 {
1054   struct mg_var *mgvs = NULL;
1055   size_t n_mgvs = 0;
1056   size_t allocated_mgvs = 0;
1057
1058   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1059     {
1060       struct variable *var = dict_get_var (dict, i);
1061       if (var_get_measure (var) != MEASURE_UNKNOWN)
1062         continue;
1063
1064       struct fmt_spec f = var_get_print_format (var);
1065       enum measure m = var_default_measure_for_format (f.type);
1066       if (m != MEASURE_UNKNOWN)
1067         {
1068           var_set_measure (var, m);
1069           continue;
1070         }
1071
1072       if (n_mgvs >= allocated_mgvs)
1073         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1074
1075       struct mg_var *mgv = &mgvs[n_mgvs++];
1076       *mgv = (struct mg_var) {
1077         .var = var,
1078         .values = xmalloc (sizeof *mgv->values),
1079       };
1080       hmap_init (mgv->values);
1081     }
1082   if (!n_mgvs)
1083     return NULL;
1084
1085   struct measure_guesser *mg = xmalloc (sizeof *mg);
1086   *mg = (struct measure_guesser) {
1087     .vars = mgvs,
1088     .n_vars = n_mgvs,
1089   };
1090   return mg;
1091 }
1092
1093 /* Scans through DS's dictionary for variables that have an unknown measurement
1094    level.  For those, if the measurement level can be guessed based on the
1095    variable's type and format, sets a default.  If that's enough, returns NULL.
1096    If any remain whose levels are unknown and can't be guessed that way,
1097    creates and returns a structure that the caller should pass to
1098    measure_guesser_add_case() or measure_guesser_run() for guessing a
1099    measurement level based on the data.  */
1100 struct measure_guesser *
1101 measure_guesser_create (struct dataset *ds)
1102 {
1103   return measure_guesser_create__ (dataset_dict (ds));
1104 }
1105
1106 /* Adds data from case C to MG. */
1107 static void
1108 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1109 {
1110   for (size_t i = 0; i < mg->n_vars; )
1111     {
1112       struct mg_var *mgv = &mg->vars[i];
1113       double value = case_num (c, mgv->var);
1114       enum measure m = mg_var_add_value (mgv, value);
1115       if (m != MEASURE_UNKNOWN)
1116         {
1117           var_set_measure (mgv->var, m);
1118
1119           mg_var_uninit (mgv);
1120           *mgv = mg->vars[--mg->n_vars];
1121         }
1122       else
1123         i++;
1124     }
1125 }
1126
1127 /* Destroys MG. */
1128 void
1129 measure_guesser_destroy (struct measure_guesser *mg)
1130 {
1131   if (!mg)
1132     return;
1133
1134   for (size_t i = 0; i < mg->n_vars; i++)
1135     {
1136       struct mg_var *mgv = &mg->vars[i];
1137       var_set_measure (mgv->var, mg_var_interpret (mgv));
1138       mg_var_uninit (mgv);
1139     }
1140   free (mg->vars);
1141   free (mg);
1142 }
1143
1144 /* Adds final measurement levels based on MG, after all the cases have been
1145    added. */
1146 static void
1147 measure_guesser_commit (struct measure_guesser *mg)
1148 {
1149   for (size_t i = 0; i < mg->n_vars; i++)
1150     {
1151       struct mg_var *mgv = &mg->vars[i];
1152       var_set_measure (mgv->var, mg_var_interpret (mgv));
1153     }
1154 }
1155
1156 /* Passes the cases in READER through MG and uses the data in the cases to set
1157    measurement levels for the variables where they were still unknown. */
1158 void
1159 measure_guesser_run (struct measure_guesser *mg,
1160                      const struct casereader *reader)
1161 {
1162   struct casereader *r = casereader_clone (reader);
1163   while (mg->n_vars > 0)
1164     {
1165       struct ccase *c = casereader_read (r);
1166       if (!c)
1167         break;
1168       measure_guesser_add_case (mg, c);
1169       case_unref (c);
1170     }
1171   casereader_destroy (r);
1172
1173   measure_guesser_commit (mg);
1174 }
1175 \f
1176 /* A transformation for guessing measurement levels. */
1177
1178 static enum trns_result
1179 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1180 {
1181   struct measure_guesser *mg = mg_;
1182   measure_guesser_add_case (mg, *c);
1183   return TRNS_CONTINUE;
1184 }
1185
1186 static bool
1187 mg_trns_free (void *mg_)
1188 {
1189   struct measure_guesser *mg = mg_;
1190   measure_guesser_commit (mg);
1191   measure_guesser_destroy (mg);
1192   return true;
1193 }
1194
1195 static const struct trns_class mg_trns_class = {
1196   .name = "add measurement level",
1197   .execute = mg_trns_proc,
1198   .destroy = mg_trns_free,
1199 };
1200
1201 static void
1202 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1203 {
1204   struct measure_guesser *mg = measure_guesser_create__ (dict);
1205   if (mg)
1206     add_transformation (ds, &mg_trns_class, mg);
1207 }
1208
1209 static void
1210 cancel_measurement_level_trns (struct trns_chain *chain)
1211 {
1212   if (!chain->n)
1213     return;
1214
1215   struct transformation *trns = &chain->xforms[chain->n - 1];
1216   if (trns->class != &mg_trns_class)
1217     return;
1218
1219   struct measure_guesser *mg = trns->aux;
1220   measure_guesser_destroy (mg);
1221   chain->n--;
1222 }
1223 \f
1224 static void
1225 dataset_changed__ (struct dataset *ds)
1226 {
1227   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1228     ds->callbacks->changed (ds->cb_data);
1229 }
1230
1231 static void
1232 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1233 {
1234   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1235     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1236 }
1237 \f
1238 /* Private interface for use by session code. */
1239
1240 void
1241 dataset_set_session__ (struct dataset *ds, struct session *session)
1242 {
1243   ds->session = session;
1244 }