Automatic inference of measeurement level works.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 /* Returns a number unique to DS.  It can be used to distinguish one dataset
351    from any other within a given program run, even datasets that do not exist
352    at the same time. */
353 unsigned int
354 dataset_seqno (const struct dataset *ds)
355 {
356   return ds->seqno;
357 }
358
359 void
360 dataset_set_callbacks (struct dataset *ds,
361                        const struct dataset_callbacks *callbacks,
362                        void *cb_data)
363 {
364   ds->callbacks = callbacks;
365   ds->cb_data = cb_data;
366 }
367
368 enum dataset_display
369 dataset_get_display (const struct dataset *ds)
370 {
371   return ds->display;
372 }
373
374 void
375 dataset_set_display (struct dataset *ds, enum dataset_display display)
376 {
377   ds->display = display;
378 }
379 \f
380 /* Returns the last time the data was read. */
381 time_t
382 time_of_last_procedure (struct dataset *ds)
383 {
384   if (!ds)
385     return time (NULL);
386   if (ds->last_proc_invocation == 0)
387     update_last_proc_invocation (ds);
388   return ds->last_proc_invocation;
389 }
390 \f
391 /* Regular procedure. */
392
393 /* Executes any pending transformations, if necessary.
394    This is not identical to the EXECUTE command in that it won't
395    always read the source data.  This can be important when the
396    source data is given inline within BEGIN DATA...END FILE. */
397 bool
398 proc_execute (struct dataset *ds)
399 {
400   bool ok;
401
402   if ((!ds->temporary || !ds->temporary_trns_chain.n)
403       && !ds->permanent_trns_chain.n)
404     {
405       ds->n_lag = 0;
406       ds->discard_output = false;
407       dict_set_case_limit (ds->dict, 0);
408       dict_clear_vectors (ds->dict);
409       return true;
410     }
411
412   ok = casereader_destroy (proc_open (ds));
413   return proc_commit (ds) && ok;
414 }
415
416 static const struct casereader_class proc_casereader_class;
417
418 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
419    cases filtered out with FILTER BY will not be included in the casereader
420    (which is usually desirable).  If FILTER is false, all cases will be
421    included regardless of FILTER BY settings.
422
423    proc_commit must be called when done. */
424 struct casereader *
425 proc_open_filtering (struct dataset *ds, bool filter)
426 {
427   struct casereader *reader;
428
429   assert (ds->n_stack == 0);
430   assert (ds->source != NULL);
431   assert (ds->proc_state == PROC_COMMITTED);
432
433   update_last_proc_invocation (ds);
434
435   caseinit_mark_for_init (ds->caseinit, ds->dict);
436
437   /* Finish up the collection of transformations. */
438   add_case_limit_trns (ds);
439   if (filter)
440     add_filter_trns (ds);
441   if (!proc_in_temporary_transformations (ds))
442     add_measurement_level_trns (ds, ds->dict);
443
444   /* Make permanent_dict refer to the dictionary right before
445      data reaches the sink. */
446   if (ds->permanent_dict == NULL)
447     ds->permanent_dict = ds->dict;
448
449   /* Prepare sink. */
450   if (!ds->discard_output)
451     {
452       struct dictionary *pd = ds->permanent_dict;
453       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
454       if (compacted_n_values < dict_get_next_value_idx (pd))
455         {
456           struct caseproto *compacted_proto;
457           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
458           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
459           ds->sink = autopaging_writer_create (compacted_proto);
460           caseproto_unref (compacted_proto);
461         }
462       else
463         {
464           ds->compactor = NULL;
465           ds->sink = autopaging_writer_create (dict_get_proto (pd));
466         }
467     }
468   else
469     {
470       ds->compactor = NULL;
471       ds->sink = NULL;
472     }
473
474   /* Allocate memory for lagged cases. */
475   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
476
477   ds->proc_state = PROC_OPEN;
478   ds->cases_written = 0;
479   ds->ok = true;
480
481   /* FIXME: use taint in dataset in place of `ok'? */
482   /* FIXME: for trivial cases we can just return a clone of
483      ds->source? */
484
485   /* Create casereader and insert a shim on top.  The shim allows us to
486      arbitrarily extend the casereader's lifetime, by slurping the cases into
487      the shim's buffer in proc_commit().  That is especially useful when output
488      table_items are generated directly from the procedure casereader (e.g. by
489      the LIST procedure) when we are using an output driver that keeps a
490      reference to the output items passed to it (e.g. the GUI output driver in
491      PSPPIRE). */
492   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
493                                          CASENUMBER_MAX,
494                                          &proc_casereader_class, ds);
495   ds->shim = casereader_shim_insert (reader);
496   return reader;
497 }
498
499 /* Opens dataset DS for reading cases with proc_read.
500    proc_commit must be called when done. */
501 struct casereader *
502 proc_open (struct dataset *ds)
503 {
504   return proc_open_filtering (ds, true);
505 }
506
507 /* Returns true if a procedure is in progress, that is, if
508    proc_open has been called but proc_commit has not. */
509 bool
510 proc_is_open (const struct dataset *ds)
511 {
512   return ds->proc_state != PROC_COMMITTED;
513 }
514
515 /* "read" function for procedure casereader. */
516 static struct ccase *
517 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
518 {
519   struct dataset *ds = ds_;
520   enum trns_result retval = TRNS_DROP_CASE;
521   struct ccase *c;
522
523   assert (ds->proc_state == PROC_OPEN);
524   for (; ; case_unref (c))
525     {
526       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
527       if (retval == TRNS_ERROR)
528         ds->ok = false;
529       if (!ds->ok)
530         return NULL;
531
532       /* Read a case from source. */
533       c = casereader_read (ds->source);
534       if (c == NULL)
535         return NULL;
536       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
537       caseinit_init_vars (ds->caseinit, c);
538
539       /* Execute permanent transformations.  */
540       casenumber case_nr = ds->cases_written + 1;
541       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
542       caseinit_update_left_vars (ds->caseinit, c);
543       if (retval != TRNS_CONTINUE)
544         continue;
545
546       /* Write case to collection of lagged cases. */
547       if (ds->n_lag > 0)
548         {
549           while (deque_count (&ds->lag) >= ds->n_lag)
550             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
551           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
552         }
553
554       /* Write case to replacement dataset. */
555       ds->cases_written++;
556       if (ds->sink != NULL)
557         casewriter_write (ds->sink,
558                           case_map_execute (ds->compactor, case_ref (c)));
559
560       /* Execute temporary transformations. */
561       if (ds->temporary_trns_chain.n)
562         {
563           retval = trns_chain_execute (&ds->temporary_trns_chain,
564                                        ds->cases_written, &c);
565           if (retval != TRNS_CONTINUE)
566             continue;
567         }
568
569       return c;
570     }
571 }
572
573 /* "destroy" function for procedure casereader. */
574 static void
575 proc_casereader_destroy (struct casereader *reader, void *ds_)
576 {
577   struct dataset *ds = ds_;
578   struct ccase *c;
579
580   /* We are always the subreader for a casereader_buffer, so if we're being
581      destroyed then it's because the casereader_buffer has read all the cases
582      that it ever will. */
583   ds->shim = NULL;
584
585   /* Make sure transformations happen for every input case, in
586      case they have side effects, and ensure that the replacement
587      active dataset gets all the cases it should. */
588   while ((c = casereader_read (reader)) != NULL)
589     case_unref (c);
590
591   ds->proc_state = PROC_CLOSED;
592   ds->ok = casereader_destroy (ds->source) && ds->ok;
593   ds->source = NULL;
594   dataset_set_source (ds, NULL);
595 }
596
597 /* Must return false if the source casereader, a transformation,
598    or the sink casewriter signaled an error.  (If a temporary
599    transformation signals an error, then the return value is
600    false, but the replacement active dataset may still be
601    untainted.) */
602 bool
603 proc_commit (struct dataset *ds)
604 {
605   if (ds->shim != NULL)
606     casereader_shim_slurp (ds->shim);
607
608   assert (ds->proc_state == PROC_CLOSED);
609   ds->proc_state = PROC_COMMITTED;
610
611   dataset_changed__ (ds);
612
613   /* Free memory for lagged cases. */
614   while (!deque_is_empty (&ds->lag))
615     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
616   free (ds->lag_cases);
617
618   /* Dictionary from before TEMPORARY becomes permanent. */
619   proc_cancel_temporary_transformations (ds);
620   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
621
622   if (!ds->discard_output)
623     {
624       /* Finish compacting. */
625       if (ds->compactor != NULL)
626         {
627           case_map_destroy (ds->compactor);
628           ds->compactor = NULL;
629
630           dict_delete_scratch_vars (ds->dict);
631           dict_compact_values (ds->dict);
632         }
633
634       /* Old data sink becomes new data source. */
635       if (ds->sink != NULL)
636         ds->source = casewriter_make_reader (ds->sink);
637     }
638   else
639     {
640       ds->source = NULL;
641       ds->discard_output = false;
642     }
643   ds->sink = NULL;
644
645   caseinit_clear (ds->caseinit);
646   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
647
648   dict_clear_vectors (ds->dict);
649   ds->permanent_dict = NULL;
650   return ok;
651 }
652
653 /* Casereader class for procedure execution. */
654 static const struct casereader_class proc_casereader_class =
655   {
656     proc_casereader_read,
657     proc_casereader_destroy,
658     NULL,
659     NULL,
660   };
661
662 /* Updates last_proc_invocation. */
663 static void
664 update_last_proc_invocation (struct dataset *ds)
665 {
666   ds->last_proc_invocation = time (NULL);
667 }
668 \f
669 /* Returns a pointer to the lagged case from N_BEFORE cases before the
670    current one, or NULL if there haven't been that many cases yet. */
671 const struct ccase *
672 lagged_case (const struct dataset *ds, int n_before)
673 {
674   assert (n_before >= 1);
675   assert (n_before <= ds->n_lag);
676
677   if (n_before <= deque_count (&ds->lag))
678     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
679   else
680     return NULL;
681 }
682 \f
683 /* Adds TRNS to the current set of transformations. */
684 void
685 add_transformation (struct dataset *ds,
686                     const struct trns_class *class, void *aux)
687 {
688   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
689                               : ds->temporary ? &ds->temporary_trns_chain
690                               : &ds->permanent_trns_chain);
691   struct transformation t = { .class = class, .aux = aux };
692   trns_chain_append (chain, &t);
693   dataset_transformations_changed__ (ds, true);
694 }
695
696 /* Returns true if the next call to add_transformation() will add
697    a temporary transformation, false if it will add a permanent
698    transformation. */
699 bool
700 proc_in_temporary_transformations (const struct dataset *ds)
701 {
702   return ds->temporary;
703 }
704
705 /* Marks the start of temporary transformations.
706    Further calls to add_transformation() will add temporary
707    transformations. */
708 void
709 proc_start_temporary_transformations (struct dataset *ds)
710 {
711   assert (!ds->n_stack);
712   if (!proc_in_temporary_transformations (ds))
713     {
714       add_case_limit_trns (ds);
715
716       ds->permanent_dict = dict_clone (ds->dict);
717       add_measurement_level_trns (ds, ds->permanent_dict);
718
719       ds->temporary = true;
720       dataset_transformations_changed__ (ds, true);
721     }
722 }
723
724 /* Converts all the temporary transformations, if any, to permanent
725    transformations.  Further transformations will be permanent.
726
727    The FILTER command is implemented as a temporary transformation, so a
728    procedure that uses this function should usually use proc_open_filtering()
729    with FILTER false, instead of plain proc_open().
730
731    Returns true if anything changed, false otherwise. */
732 bool
733 proc_make_temporary_transformations_permanent (struct dataset *ds)
734 {
735   if (proc_in_temporary_transformations (ds))
736     {
737       cancel_measurement_level_trns (&ds->permanent_trns_chain);
738       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
739
740       ds->temporary = false;
741
742       dict_unref (ds->permanent_dict);
743       ds->permanent_dict = NULL;
744
745       return true;
746     }
747   else
748     return false;
749 }
750
751 /* Cancels all temporary transformations, if any.  Further
752    transformations will be permanent.
753    Returns true if anything changed, false otherwise. */
754 bool
755 proc_cancel_temporary_transformations (struct dataset *ds)
756 {
757   if (proc_in_temporary_transformations (ds))
758     {
759       trns_chain_clear (&ds->temporary_trns_chain);
760
761       
762       /* XXX remove measurement level transformation from permanent_trns_chain */
763       dict_unref (ds->dict);
764       ds->dict = ds->permanent_dict;
765       ds->permanent_dict = NULL;
766
767       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
768       return true;
769     }
770   else
771     return false;
772 }
773
774 /* Cancels all transformations, if any.
775    Returns true if successful, false on I/O error. */
776 bool
777 proc_cancel_all_transformations (struct dataset *ds)
778 {
779   bool ok;
780   assert (ds->proc_state == PROC_COMMITTED);
781   ok = trns_chain_clear (&ds->permanent_trns_chain);
782   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
783   ds->temporary = false;
784   for (size_t i = 0; i < ds->n_stack; i++)
785     ok = trns_chain_uninit (&ds->stack[i]) && ok;
786   ds->n_stack = 0;
787   dataset_transformations_changed__ (ds, false);
788
789   return ok;
790 }
791
792 void
793 proc_push_transformations (struct dataset *ds)
794 {
795   if (ds->n_stack >= ds->allocated_stack)
796     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
797                             sizeof *ds->stack);
798   trns_chain_init (&ds->stack[ds->n_stack++]);
799 }
800
801 void
802 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
803 {
804   assert (ds->n_stack > 0);
805   *chain = ds->stack[--ds->n_stack];
806 }
807
808 static enum trns_result
809 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
810 {
811   struct variable *var = var_;
812
813   *cc = case_unshare (*cc);
814   *case_num_rw (*cc, var) = case_num;
815
816   return TRNS_CONTINUE;
817 }
818
819 /* Add a variable which we can sort by to get back the original order. */
820 struct variable *
821 add_permanent_ordering_transformation (struct dataset *ds)
822 {
823   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
824   struct variable *order_var
825     = (proc_in_temporary_transformations (ds)
826        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
827        : temp_var);
828
829   static const struct trns_class trns_class = {
830     .name = "ordering",
831     .execute = store_case_num
832   };
833   const struct transformation t = { .class = &trns_class, .aux = order_var };
834   trns_chain_append (&ds->permanent_trns_chain, &t);
835
836   return temp_var;
837 }
838 \f
839 /* Causes output from the next procedure to be discarded, instead
840    of being preserved for use as input for the next procedure. */
841 void
842 proc_discard_output (struct dataset *ds)
843 {
844   ds->discard_output = true;
845 }
846
847
848 /* Checks whether DS has a corrupted active dataset.  If so,
849    discards it and returns false.  If not, returns true without
850    doing anything. */
851 bool
852 dataset_end_of_command (struct dataset *ds)
853 {
854   if (ds->source != NULL)
855     {
856       if (casereader_error (ds->source))
857         {
858           dataset_clear (ds);
859           return false;
860         }
861       else
862         {
863           const struct taint *taint = casereader_get_taint (ds->source);
864           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
865           assert (!taint_has_tainted_successor (taint));
866         }
867     }
868   return true;
869 }
870 \f
871 /* Limits the maximum number of cases processed to
872    *CASES_REMAINING. */
873 static enum trns_result
874 case_limit_trns_proc (void *cases_remaining_,
875                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
876 {
877   size_t *cases_remaining = cases_remaining_;
878   if (*cases_remaining > 0)
879     {
880       (*cases_remaining)--;
881       return TRNS_CONTINUE;
882     }
883   else
884     return TRNS_DROP_CASE;
885 }
886
887 /* Frees the data associated with a case limit transformation. */
888 static bool
889 case_limit_trns_free (void *cases_remaining_)
890 {
891   size_t *cases_remaining = cases_remaining_;
892   free (cases_remaining);
893   return true;
894 }
895
896 /* Adds a transformation that limits the number of cases that may
897    pass through, if DS->DICT has a case limit. */
898 static void
899 add_case_limit_trns (struct dataset *ds)
900 {
901   casenumber case_limit = dict_get_case_limit (ds->dict);
902   if (case_limit != 0)
903     {
904       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
905       *cases_remaining = case_limit;
906
907       static const struct trns_class trns_class = {
908         .name = "case limit",
909         .execute = case_limit_trns_proc,
910         .destroy = case_limit_trns_free,
911       };
912       add_transformation (ds, &trns_class, cases_remaining);
913
914       dict_set_case_limit (ds->dict, 0);
915     }
916 }
917
918 \f
919 /* FILTER transformation. */
920 static enum trns_result
921 filter_trns_proc (void *filter_var_,
922                   struct ccase **c, casenumber case_nr UNUSED)
923
924 {
925   struct variable *filter_var = filter_var_;
926   double f = case_num (*c, filter_var);
927   return (f != 0.0 && !var_is_num_missing (filter_var, f)
928           ? TRNS_CONTINUE : TRNS_DROP_CASE);
929 }
930
931 /* Adds a temporary transformation to filter data according to
932    the variable specified on FILTER, if any. */
933 static void
934 add_filter_trns (struct dataset *ds)
935 {
936   struct variable *filter_var = dict_get_filter (ds->dict);
937   if (filter_var != NULL)
938     {
939       proc_start_temporary_transformations (ds);
940
941       static const struct trns_class trns_class = {
942         .name = "FILTER",
943         .execute = filter_trns_proc,
944       };
945       add_transformation (ds, &trns_class, filter_var);
946     }
947 }
948
949 void
950 dataset_need_lag (struct dataset *ds, int n_before)
951 {
952   ds->n_lag = MAX (ds->n_lag, n_before);
953 }
954 \f
955 /* Transformation for adding measurement level. */
956
957 struct measurement_level_value
958   {
959     struct hmap_node hmap_node;
960     double value;
961   };
962
963 struct measurement_level_var
964   {
965     struct variable *var;
966     struct hmap *values;
967   };
968
969 static void
970 add_measurement_level_var_uninit (struct measurement_level_var *mlv)
971 {
972   struct measurement_level_value *mlvalue, *next;
973   HMAP_FOR_EACH_SAFE (mlvalue, next, struct measurement_level_value, hmap_node,
974                       mlv->values)
975     {
976       hmap_delete (mlv->values, &mlvalue->hmap_node);
977       free (mlvalue);
978     }
979   hmap_destroy (mlv->values);
980   free (mlv->values);
981 }
982
983 static enum measure
984 add_measurement_level_var_interpret (const struct measurement_level_var *mlv)
985 {
986   size_t n = hmap_count (mlv->values);
987   if (!n)
988     {
989       /* All missing (or no data). */
990       return MEASURE_NOMINAL;
991     }
992
993   const struct measurement_level_value *mlvalue;
994   HMAP_FOR_EACH (mlvalue, struct measurement_level_value, hmap_node,
995                  mlv->values)
996     if (mlvalue->value < 10)
997       return MEASURE_NOMINAL;
998   return MEASURE_SCALE;
999 }
1000
1001 struct measurement_level_trns
1002   {
1003     struct measurement_level_var *vars;
1004     size_t n_vars;
1005   };
1006
1007 static enum measure
1008 add_measurement_level_trns_proc__ (struct measurement_level_var *mlv, double value)
1009 {
1010   if (var_is_num_missing (mlv->var, value))
1011     return MEASURE_UNKNOWN;
1012   else if (value < 0 || value != floor (value))
1013     return MEASURE_SCALE;
1014
1015   size_t hash = hash_double (value, 0);
1016   struct measurement_level_value *mlvalue;
1017   HMAP_FOR_EACH_WITH_HASH (mlvalue, struct measurement_level_value, hmap_node,
1018                            hash, mlv->values)
1019     if (mlvalue->value == value)
1020       return MEASURE_UNKNOWN;
1021
1022   mlvalue = xmalloc (sizeof *mlvalue);
1023   mlvalue->value = value;
1024   hmap_insert (mlv->values, &mlvalue->hmap_node, hash);
1025   if (hmap_count (mlv->values) >= settings_get_scalemin ())
1026     return MEASURE_SCALE;
1027
1028   return MEASURE_UNKNOWN;
1029 }
1030
1031 static enum trns_result
1032 add_measurement_level_trns_proc (void *mlt_, struct ccase **c,
1033                                  casenumber case_nr UNUSED)
1034 {
1035   struct measurement_level_trns *mlt = mlt_;
1036   for (size_t i = 0; i < mlt->n_vars; )
1037     {
1038       struct measurement_level_var *mlv = &mlt->vars[i];
1039       double value = case_num (*c, mlv->var);
1040       enum measure m = add_measurement_level_trns_proc__ (mlv, value);
1041       if (m != MEASURE_UNKNOWN)
1042         {
1043           var_set_measure (mlv->var, m);
1044
1045           add_measurement_level_var_uninit (mlv);
1046           *mlv = mlt->vars[--mlt->n_vars];
1047         }
1048       else
1049         i++;
1050     }
1051   return TRNS_CONTINUE;
1052 }
1053
1054 static void
1055 add_measurement_level_trns_free__ (struct measurement_level_trns *mlt)
1056 {
1057   for (size_t i = 0; i < mlt->n_vars; i++)
1058     {
1059       struct measurement_level_var *mlv = &mlt->vars[i];
1060       var_set_measure (mlv->var, add_measurement_level_var_interpret (mlv));
1061       add_measurement_level_var_uninit (mlv);
1062     }
1063   free (mlt->vars);
1064   free (mlt);
1065 }
1066
1067 static bool
1068 add_measurement_level_trns_free (void *mlt_)
1069 {
1070   struct measurement_level_trns *mlt = mlt_;
1071   for (size_t i = 0; i < mlt->n_vars; i++)
1072     {
1073       struct measurement_level_var *mlv = &mlt->vars[i];
1074       var_set_measure (mlv->var, add_measurement_level_var_interpret (mlv));
1075     }
1076   add_measurement_level_trns_free__ (mlt);
1077   return true;
1078 }
1079
1080 static const struct trns_class add_measurement_level_trns_class = {
1081   .name = "add measurement level",
1082   .execute = add_measurement_level_trns_proc,
1083   .destroy = add_measurement_level_trns_free,
1084 };
1085
1086 static void
1087 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1088 {
1089   struct variable **vars = NULL;
1090   size_t n_vars = 0;
1091   size_t allocated_vars = 0;
1092
1093   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1094     {
1095       struct variable *var = dict_get_var (dict, i);
1096       if (var_get_measure (var) != MEASURE_UNKNOWN)
1097         continue;
1098
1099       const struct fmt_spec *f = var_get_print_format (var);
1100       enum measure m = var_default_measure_for_format (f->type);
1101       if (m != MEASURE_UNKNOWN)
1102         {
1103           var_set_measure (var, m);
1104           continue;
1105         }
1106
1107       if (n_vars >= allocated_vars)
1108         vars = x2nrealloc (vars, &allocated_vars, sizeof *vars);
1109       vars[n_vars++] = var;
1110     }
1111
1112   if (!n_vars)
1113     return;
1114
1115   /* We do this as a second step because otherwise we'd be moving hmaps around,
1116      which doesn't work. */
1117   struct measurement_level_var *mlvs = xmalloc (n_vars * sizeof *mlvs);
1118   for (size_t i = 0; i < n_vars; i++)
1119     {
1120       mlvs[i].var = vars[i];
1121       mlvs[i].values = xmalloc (sizeof *mlvs[i].values);
1122       hmap_init (mlvs[i].values);
1123     }
1124   free (vars);
1125
1126   struct measurement_level_trns *mlt = xmalloc (sizeof *mlt);
1127   *mlt = (struct measurement_level_trns) {
1128     .vars = mlvs,
1129     .n_vars = n_vars,
1130   };
1131   add_transformation (ds, &add_measurement_level_trns_class, mlt);
1132 }
1133
1134 static void
1135 cancel_measurement_level_trns (struct trns_chain *chain)
1136 {
1137   if (!chain->n)
1138     return;
1139
1140   struct transformation *trns = &chain->xforms[chain->n - 1];
1141   if (trns->class != &add_measurement_level_trns_class)
1142     return;
1143
1144   struct measurement_level_trns *mlt = trns->aux;
1145   add_measurement_level_trns_free__ (mlt);
1146   chain->n--;
1147 }
1148 \f
1149 static void
1150 dataset_changed__ (struct dataset *ds)
1151 {
1152   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1153     ds->callbacks->changed (ds->cb_data);
1154 }
1155
1156 static void
1157 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1158 {
1159   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1160     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1161 }
1162 \f
1163 /* Private interface for use by session code. */
1164
1165 void
1166 dataset_set_session__ (struct dataset *ds, struct session *session)
1167 {
1168   ds->session = session;
1169 }