Automatically infer variables' measurement level from format and data.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 /* Returns a number unique to DS.  It can be used to distinguish one dataset
351    from any other within a given program run, even datasets that do not exist
352    at the same time. */
353 unsigned int
354 dataset_seqno (const struct dataset *ds)
355 {
356   return ds->seqno;
357 }
358
359 void
360 dataset_set_callbacks (struct dataset *ds,
361                        const struct dataset_callbacks *callbacks,
362                        void *cb_data)
363 {
364   ds->callbacks = callbacks;
365   ds->cb_data = cb_data;
366 }
367
368 enum dataset_display
369 dataset_get_display (const struct dataset *ds)
370 {
371   return ds->display;
372 }
373
374 void
375 dataset_set_display (struct dataset *ds, enum dataset_display display)
376 {
377   ds->display = display;
378 }
379 \f
380 /* Returns the last time the data was read. */
381 time_t
382 time_of_last_procedure (struct dataset *ds)
383 {
384   if (!ds)
385     return time (NULL);
386   if (ds->last_proc_invocation == 0)
387     update_last_proc_invocation (ds);
388   return ds->last_proc_invocation;
389 }
390 \f
391 /* Regular procedure. */
392
393 /* Executes any pending transformations, if necessary.
394    This is not identical to the EXECUTE command in that it won't
395    always read the source data.  This can be important when the
396    source data is given inline within BEGIN DATA...END FILE. */
397 bool
398 proc_execute (struct dataset *ds)
399 {
400   bool ok;
401
402   if ((!ds->temporary || !ds->temporary_trns_chain.n)
403       && !ds->permanent_trns_chain.n)
404     {
405       ds->n_lag = 0;
406       ds->discard_output = false;
407       dict_set_case_limit (ds->dict, 0);
408       dict_clear_vectors (ds->dict);
409       return true;
410     }
411
412   ok = casereader_destroy (proc_open (ds));
413   return proc_commit (ds) && ok;
414 }
415
416 static const struct casereader_class proc_casereader_class;
417
418 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
419    cases filtered out with FILTER BY will not be included in the casereader
420    (which is usually desirable).  If FILTER is false, all cases will be
421    included regardless of FILTER BY settings.
422
423    proc_commit must be called when done. */
424 struct casereader *
425 proc_open_filtering (struct dataset *ds, bool filter)
426 {
427   struct casereader *reader;
428
429   assert (ds->n_stack == 0);
430   assert (ds->source != NULL);
431   assert (ds->proc_state == PROC_COMMITTED);
432
433   update_last_proc_invocation (ds);
434
435   caseinit_mark_for_init (ds->caseinit, ds->dict);
436
437   /* Finish up the collection of transformations. */
438   add_case_limit_trns (ds);
439   if (filter)
440     add_filter_trns (ds);
441   if (!proc_in_temporary_transformations (ds))
442     add_measurement_level_trns (ds, ds->dict);
443
444   /* Make permanent_dict refer to the dictionary right before
445      data reaches the sink. */
446   if (ds->permanent_dict == NULL)
447     ds->permanent_dict = ds->dict;
448
449   /* Prepare sink. */
450   if (!ds->discard_output)
451     {
452       struct dictionary *pd = ds->permanent_dict;
453       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
454       if (compacted_n_values < dict_get_next_value_idx (pd))
455         {
456           struct caseproto *compacted_proto;
457           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
458           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
459           ds->sink = autopaging_writer_create (compacted_proto);
460           caseproto_unref (compacted_proto);
461         }
462       else
463         {
464           ds->compactor = NULL;
465           ds->sink = autopaging_writer_create (dict_get_proto (pd));
466         }
467     }
468   else
469     {
470       ds->compactor = NULL;
471       ds->sink = NULL;
472     }
473
474   /* Allocate memory for lagged cases. */
475   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
476
477   ds->proc_state = PROC_OPEN;
478   ds->cases_written = 0;
479   ds->ok = true;
480
481   /* FIXME: use taint in dataset in place of `ok'? */
482   /* FIXME: for trivial cases we can just return a clone of
483      ds->source? */
484
485   /* Create casereader and insert a shim on top.  The shim allows us to
486      arbitrarily extend the casereader's lifetime, by slurping the cases into
487      the shim's buffer in proc_commit().  That is especially useful when output
488      table_items are generated directly from the procedure casereader (e.g. by
489      the LIST procedure) when we are using an output driver that keeps a
490      reference to the output items passed to it (e.g. the GUI output driver in
491      PSPPIRE). */
492   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
493                                          CASENUMBER_MAX,
494                                          &proc_casereader_class, ds);
495   ds->shim = casereader_shim_insert (reader);
496   return reader;
497 }
498
499 /* Opens dataset DS for reading cases with proc_read.
500    proc_commit must be called when done. */
501 struct casereader *
502 proc_open (struct dataset *ds)
503 {
504   return proc_open_filtering (ds, true);
505 }
506
507 /* Returns true if a procedure is in progress, that is, if
508    proc_open has been called but proc_commit has not. */
509 bool
510 proc_is_open (const struct dataset *ds)
511 {
512   return ds->proc_state != PROC_COMMITTED;
513 }
514
515 /* "read" function for procedure casereader. */
516 static struct ccase *
517 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
518 {
519   struct dataset *ds = ds_;
520   enum trns_result retval = TRNS_DROP_CASE;
521   struct ccase *c;
522
523   assert (ds->proc_state == PROC_OPEN);
524   for (; ; case_unref (c))
525     {
526       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
527       if (retval == TRNS_ERROR)
528         ds->ok = false;
529       if (!ds->ok)
530         return NULL;
531
532       /* Read a case from source. */
533       c = casereader_read (ds->source);
534       if (c == NULL)
535         return NULL;
536       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
537       caseinit_init_vars (ds->caseinit, c);
538
539       /* Execute permanent transformations.  */
540       casenumber case_nr = ds->cases_written + 1;
541       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
542       caseinit_update_left_vars (ds->caseinit, c);
543       if (retval != TRNS_CONTINUE)
544         continue;
545
546       /* Write case to collection of lagged cases. */
547       if (ds->n_lag > 0)
548         {
549           while (deque_count (&ds->lag) >= ds->n_lag)
550             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
551           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
552         }
553
554       /* Write case to replacement dataset. */
555       ds->cases_written++;
556       if (ds->sink != NULL)
557         casewriter_write (ds->sink,
558                           case_map_execute (ds->compactor, case_ref (c)));
559
560       /* Execute temporary transformations. */
561       if (ds->temporary_trns_chain.n)
562         {
563           retval = trns_chain_execute (&ds->temporary_trns_chain,
564                                        ds->cases_written, &c);
565           if (retval != TRNS_CONTINUE)
566             continue;
567         }
568
569       return c;
570     }
571 }
572
573 /* "destroy" function for procedure casereader. */
574 static void
575 proc_casereader_destroy (struct casereader *reader, void *ds_)
576 {
577   struct dataset *ds = ds_;
578   struct ccase *c;
579
580   /* We are always the subreader for a casereader_buffer, so if we're being
581      destroyed then it's because the casereader_buffer has read all the cases
582      that it ever will. */
583   ds->shim = NULL;
584
585   /* Make sure transformations happen for every input case, in
586      case they have side effects, and ensure that the replacement
587      active dataset gets all the cases it should. */
588   while ((c = casereader_read (reader)) != NULL)
589     case_unref (c);
590
591   ds->proc_state = PROC_CLOSED;
592   ds->ok = casereader_destroy (ds->source) && ds->ok;
593   ds->source = NULL;
594   dataset_set_source (ds, NULL);
595 }
596
597 /* Must return false if the source casereader, a transformation,
598    or the sink casewriter signaled an error.  (If a temporary
599    transformation signals an error, then the return value is
600    false, but the replacement active dataset may still be
601    untainted.) */
602 bool
603 proc_commit (struct dataset *ds)
604 {
605   if (ds->shim != NULL)
606     casereader_shim_slurp (ds->shim);
607
608   assert (ds->proc_state == PROC_CLOSED);
609   ds->proc_state = PROC_COMMITTED;
610
611   dataset_changed__ (ds);
612
613   /* Free memory for lagged cases. */
614   while (!deque_is_empty (&ds->lag))
615     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
616   free (ds->lag_cases);
617
618   /* Dictionary from before TEMPORARY becomes permanent. */
619   proc_cancel_temporary_transformations (ds);
620   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
621
622   if (!ds->discard_output)
623     {
624       /* Finish compacting. */
625       if (ds->compactor != NULL)
626         {
627           case_map_destroy (ds->compactor);
628           ds->compactor = NULL;
629
630           dict_delete_scratch_vars (ds->dict);
631           dict_compact_values (ds->dict);
632         }
633
634       /* Old data sink becomes new data source. */
635       if (ds->sink != NULL)
636         ds->source = casewriter_make_reader (ds->sink);
637     }
638   else
639     {
640       ds->source = NULL;
641       ds->discard_output = false;
642     }
643   ds->sink = NULL;
644
645   caseinit_clear (ds->caseinit);
646   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
647
648   dict_clear_vectors (ds->dict);
649   ds->permanent_dict = NULL;
650   return ok;
651 }
652
653 /* Casereader class for procedure execution. */
654 static const struct casereader_class proc_casereader_class =
655   {
656     proc_casereader_read,
657     proc_casereader_destroy,
658     NULL,
659     NULL,
660   };
661
662 /* Updates last_proc_invocation. */
663 static void
664 update_last_proc_invocation (struct dataset *ds)
665 {
666   ds->last_proc_invocation = time (NULL);
667 }
668 \f
669 /* Returns a pointer to the lagged case from N_BEFORE cases before the
670    current one, or NULL if there haven't been that many cases yet. */
671 const struct ccase *
672 lagged_case (const struct dataset *ds, int n_before)
673 {
674   assert (n_before >= 1);
675   assert (n_before <= ds->n_lag);
676
677   if (n_before <= deque_count (&ds->lag))
678     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
679   else
680     return NULL;
681 }
682 \f
683 /* Adds TRNS to the current set of transformations. */
684 void
685 add_transformation (struct dataset *ds,
686                     const struct trns_class *class, void *aux)
687 {
688   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
689                               : ds->temporary ? &ds->temporary_trns_chain
690                               : &ds->permanent_trns_chain);
691   struct transformation t = { .class = class, .aux = aux };
692   trns_chain_append (chain, &t);
693   dataset_transformations_changed__ (ds, true);
694 }
695
696 /* Returns true if the next call to add_transformation() will add
697    a temporary transformation, false if it will add a permanent
698    transformation. */
699 bool
700 proc_in_temporary_transformations (const struct dataset *ds)
701 {
702   return ds->temporary;
703 }
704
705 /* Marks the start of temporary transformations.
706    Further calls to add_transformation() will add temporary
707    transformations. */
708 void
709 proc_start_temporary_transformations (struct dataset *ds)
710 {
711   assert (!ds->n_stack);
712   if (!proc_in_temporary_transformations (ds))
713     {
714       add_case_limit_trns (ds);
715
716       ds->permanent_dict = dict_clone (ds->dict);
717       add_measurement_level_trns (ds, ds->permanent_dict);
718
719       ds->temporary = true;
720       dataset_transformations_changed__ (ds, true);
721     }
722 }
723
724 /* Converts all the temporary transformations, if any, to permanent
725    transformations.  Further transformations will be permanent.
726
727    The FILTER command is implemented as a temporary transformation, so a
728    procedure that uses this function should usually use proc_open_filtering()
729    with FILTER false, instead of plain proc_open().
730
731    Returns true if anything changed, false otherwise. */
732 bool
733 proc_make_temporary_transformations_permanent (struct dataset *ds)
734 {
735   if (proc_in_temporary_transformations (ds))
736     {
737       cancel_measurement_level_trns (&ds->permanent_trns_chain);
738       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
739
740       ds->temporary = false;
741
742       dict_unref (ds->permanent_dict);
743       ds->permanent_dict = NULL;
744
745       return true;
746     }
747   else
748     return false;
749 }
750
751 /* Cancels all temporary transformations, if any.  Further
752    transformations will be permanent.
753    Returns true if anything changed, false otherwise. */
754 bool
755 proc_cancel_temporary_transformations (struct dataset *ds)
756 {
757   if (proc_in_temporary_transformations (ds))
758     {
759       trns_chain_clear (&ds->temporary_trns_chain);
760
761       dict_unref (ds->dict);
762       ds->dict = ds->permanent_dict;
763       ds->permanent_dict = NULL;
764
765       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
766       return true;
767     }
768   else
769     return false;
770 }
771
772 /* Cancels all transformations, if any.
773    Returns true if successful, false on I/O error. */
774 bool
775 proc_cancel_all_transformations (struct dataset *ds)
776 {
777   bool ok;
778   assert (ds->proc_state == PROC_COMMITTED);
779   ok = trns_chain_clear (&ds->permanent_trns_chain);
780   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
781   ds->temporary = false;
782   for (size_t i = 0; i < ds->n_stack; i++)
783     ok = trns_chain_uninit (&ds->stack[i]) && ok;
784   ds->n_stack = 0;
785   dataset_transformations_changed__ (ds, false);
786
787   return ok;
788 }
789
790 void
791 proc_push_transformations (struct dataset *ds)
792 {
793   if (ds->n_stack >= ds->allocated_stack)
794     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
795                             sizeof *ds->stack);
796   trns_chain_init (&ds->stack[ds->n_stack++]);
797 }
798
799 void
800 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
801 {
802   assert (ds->n_stack > 0);
803   *chain = ds->stack[--ds->n_stack];
804 }
805
806 static enum trns_result
807 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
808 {
809   struct variable *var = var_;
810
811   *cc = case_unshare (*cc);
812   *case_num_rw (*cc, var) = case_num;
813
814   return TRNS_CONTINUE;
815 }
816
817 /* Add a variable which we can sort by to get back the original order. */
818 struct variable *
819 add_permanent_ordering_transformation (struct dataset *ds)
820 {
821   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
822   struct variable *order_var
823     = (proc_in_temporary_transformations (ds)
824        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
825        : temp_var);
826
827   static const struct trns_class trns_class = {
828     .name = "ordering",
829     .execute = store_case_num
830   };
831   const struct transformation t = { .class = &trns_class, .aux = order_var };
832   trns_chain_append (&ds->permanent_trns_chain, &t);
833
834   return temp_var;
835 }
836 \f
837 /* Causes output from the next procedure to be discarded, instead
838    of being preserved for use as input for the next procedure. */
839 void
840 proc_discard_output (struct dataset *ds)
841 {
842   ds->discard_output = true;
843 }
844
845
846 /* Checks whether DS has a corrupted active dataset.  If so,
847    discards it and returns false.  If not, returns true without
848    doing anything. */
849 bool
850 dataset_end_of_command (struct dataset *ds)
851 {
852   if (ds->source != NULL)
853     {
854       if (casereader_error (ds->source))
855         {
856           dataset_clear (ds);
857           return false;
858         }
859       else
860         {
861           const struct taint *taint = casereader_get_taint (ds->source);
862           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
863           assert (!taint_has_tainted_successor (taint));
864         }
865     }
866   return true;
867 }
868 \f
869 /* Limits the maximum number of cases processed to
870    *CASES_REMAINING. */
871 static enum trns_result
872 case_limit_trns_proc (void *cases_remaining_,
873                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
874 {
875   size_t *cases_remaining = cases_remaining_;
876   if (*cases_remaining > 0)
877     {
878       (*cases_remaining)--;
879       return TRNS_CONTINUE;
880     }
881   else
882     return TRNS_DROP_CASE;
883 }
884
885 /* Frees the data associated with a case limit transformation. */
886 static bool
887 case_limit_trns_free (void *cases_remaining_)
888 {
889   size_t *cases_remaining = cases_remaining_;
890   free (cases_remaining);
891   return true;
892 }
893
894 /* Adds a transformation that limits the number of cases that may
895    pass through, if DS->DICT has a case limit. */
896 static void
897 add_case_limit_trns (struct dataset *ds)
898 {
899   casenumber case_limit = dict_get_case_limit (ds->dict);
900   if (case_limit != 0)
901     {
902       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
903       *cases_remaining = case_limit;
904
905       static const struct trns_class trns_class = {
906         .name = "case limit",
907         .execute = case_limit_trns_proc,
908         .destroy = case_limit_trns_free,
909       };
910       add_transformation (ds, &trns_class, cases_remaining);
911
912       dict_set_case_limit (ds->dict, 0);
913     }
914 }
915
916 \f
917 /* FILTER transformation. */
918 static enum trns_result
919 filter_trns_proc (void *filter_var_,
920                   struct ccase **c, casenumber case_nr UNUSED)
921
922 {
923   struct variable *filter_var = filter_var_;
924   double f = case_num (*c, filter_var);
925   return (f != 0.0 && !var_is_num_missing (filter_var, f)
926           ? TRNS_CONTINUE : TRNS_DROP_CASE);
927 }
928
929 /* Adds a temporary transformation to filter data according to
930    the variable specified on FILTER, if any. */
931 static void
932 add_filter_trns (struct dataset *ds)
933 {
934   struct variable *filter_var = dict_get_filter (ds->dict);
935   if (filter_var != NULL)
936     {
937       proc_start_temporary_transformations (ds);
938
939       static const struct trns_class trns_class = {
940         .name = "FILTER",
941         .execute = filter_trns_proc,
942       };
943       add_transformation (ds, &trns_class, filter_var);
944     }
945 }
946
947 void
948 dataset_need_lag (struct dataset *ds, int n_before)
949 {
950   ds->n_lag = MAX (ds->n_lag, n_before);
951 }
952 \f
953 /* Measurement guesser, for guessing a measurement level from formats and
954    data. */
955
956 struct mg_value
957   {
958     struct hmap_node hmap_node;
959     double value;
960   };
961
962 struct mg_var
963   {
964     struct variable *var;
965     struct hmap *values;
966   };
967
968 static void
969 mg_var_uninit (struct mg_var *mgv)
970 {
971   struct mg_value *mgvalue, *next;
972   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
973                       mgv->values)
974     {
975       hmap_delete (mgv->values, &mgvalue->hmap_node);
976       free (mgvalue);
977     }
978   hmap_destroy (mgv->values);
979   free (mgv->values);
980 }
981
982 static enum measure
983 mg_var_interpret (const struct mg_var *mgv)
984 {
985   size_t n = hmap_count (mgv->values);
986   if (!n)
987     {
988       /* All missing (or no data). */
989       return MEASURE_NOMINAL;
990     }
991
992   const struct mg_value *mgvalue;
993   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
994                  mgv->values)
995     if (mgvalue->value < 10)
996       return MEASURE_NOMINAL;
997   return MEASURE_SCALE;
998 }
999
1000 static enum measure
1001 mg_var_add_value (struct mg_var *mgv, double value)
1002 {
1003   if (var_is_num_missing (mgv->var, value))
1004     return MEASURE_UNKNOWN;
1005   else if (value < 0 || value != floor (value))
1006     return MEASURE_SCALE;
1007
1008   size_t hash = hash_double (value, 0);
1009   struct mg_value *mgvalue;
1010   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1011                            hash, mgv->values)
1012     if (mgvalue->value == value)
1013       return MEASURE_UNKNOWN;
1014
1015   mgvalue = xmalloc (sizeof *mgvalue);
1016   mgvalue->value = value;
1017   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1018   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1019     return MEASURE_SCALE;
1020
1021   return MEASURE_UNKNOWN;
1022 }
1023
1024 struct measure_guesser
1025   {
1026     struct mg_var *vars;
1027     size_t n_vars;
1028   };
1029
1030 static struct measure_guesser *
1031 measure_guesser_create__ (struct dictionary *dict)
1032 {
1033   struct mg_var *mgvs = NULL;
1034   size_t n_mgvs = 0;
1035   size_t allocated_mgvs = 0;
1036
1037   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1038     {
1039       struct variable *var = dict_get_var (dict, i);
1040       if (var_get_measure (var) != MEASURE_UNKNOWN)
1041         continue;
1042
1043       const struct fmt_spec *f = var_get_print_format (var);
1044       enum measure m = var_default_measure_for_format (f->type);
1045       if (m != MEASURE_UNKNOWN)
1046         {
1047           var_set_measure (var, m);
1048           continue;
1049         }
1050
1051       if (n_mgvs >= allocated_mgvs)
1052         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1053
1054       struct mg_var *mgv = &mgvs[n_mgvs++];
1055       *mgv = (struct mg_var) {
1056         .var = var,
1057         .values = xmalloc (sizeof *mgv->values),
1058       };
1059       hmap_init (mgv->values);
1060     }
1061   if (!n_mgvs)
1062     return NULL;
1063
1064   struct measure_guesser *mg = xmalloc (sizeof *mg);
1065   *mg = (struct measure_guesser) {
1066     .vars = mgvs,
1067     .n_vars = n_mgvs,
1068   };
1069   return mg;
1070 }
1071
1072 /* Scans through DS's dictionary for variables that have an unknown measurement
1073    level.  For those, if the measurement level can be guessed based on the
1074    variable's type and format, sets a default.  If that's enough, returns NULL.
1075    If any remain whose levels are unknown and can't be guessed that way,
1076    creates and returns a structure that the caller should pass to
1077    measure_guesser_add_case() or measure_guesser_run() for guessing a
1078    measurement level based on the data.  */
1079 struct measure_guesser *
1080 measure_guesser_create (struct dataset *ds)
1081 {
1082   return measure_guesser_create__ (dataset_dict (ds));
1083 }
1084
1085 /* Adds data from case C to MG. */
1086 static void
1087 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1088 {
1089   for (size_t i = 0; i < mg->n_vars; )
1090     {
1091       struct mg_var *mgv = &mg->vars[i];
1092       double value = case_num (c, mgv->var);
1093       enum measure m = mg_var_add_value (mgv, value);
1094       if (m != MEASURE_UNKNOWN)
1095         {
1096           var_set_measure (mgv->var, m);
1097
1098           mg_var_uninit (mgv);
1099           *mgv = mg->vars[--mg->n_vars];
1100         }
1101       else
1102         i++;
1103     }
1104 }
1105
1106 /* Destroys MG. */
1107 void
1108 measure_guesser_destroy (struct measure_guesser *mg)
1109 {
1110   if (!mg)
1111     return;
1112
1113   for (size_t i = 0; i < mg->n_vars; i++)
1114     {
1115       struct mg_var *mgv = &mg->vars[i];
1116       var_set_measure (mgv->var, mg_var_interpret (mgv));
1117       mg_var_uninit (mgv);
1118     }
1119   free (mg->vars);
1120   free (mg);
1121 }
1122
1123 /* Adds final measurement levels based on MG, after all the cases have been
1124    added. */
1125 static void
1126 measure_guesser_commit (struct measure_guesser *mg)
1127 {
1128   for (size_t i = 0; i < mg->n_vars; i++)
1129     {
1130       struct mg_var *mgv = &mg->vars[i];
1131       var_set_measure (mgv->var, mg_var_interpret (mgv));
1132     }
1133 }
1134
1135 /* Passes the cases in READER through MG and uses the data in the cases to set
1136    measurement levels for the variables where they were still unknown. */
1137 void
1138 measure_guesser_run (struct measure_guesser *mg,
1139                      const struct casereader *reader)
1140 {
1141   struct casereader *r = casereader_clone (reader);
1142   while (mg->n_vars > 0)
1143     {
1144       struct ccase *c = casereader_read (r);
1145       if (!c)
1146         break;
1147       measure_guesser_add_case (mg, c);
1148       case_unref (c);
1149     }
1150   casereader_destroy (r);
1151
1152   measure_guesser_commit (mg);
1153 }
1154 \f
1155 /* A transformation for guessing measurement levels. */
1156
1157 static enum trns_result
1158 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1159 {
1160   struct measure_guesser *mg = mg_;
1161   measure_guesser_add_case (mg, *c);
1162   return TRNS_CONTINUE;
1163 }
1164
1165 static bool
1166 mg_trns_free (void *mg_)
1167 {
1168   struct measure_guesser *mg = mg_;
1169   measure_guesser_commit (mg);
1170   measure_guesser_destroy (mg);
1171   return true;
1172 }
1173
1174 static const struct trns_class mg_trns_class = {
1175   .name = "add measurement level",
1176   .execute = mg_trns_proc,
1177   .destroy = mg_trns_free,
1178 };
1179
1180 static void
1181 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1182 {
1183   struct measure_guesser *mg = measure_guesser_create__ (dict);
1184   if (mg)
1185     add_transformation (ds, &mg_trns_class, mg);
1186 }
1187
1188 static void
1189 cancel_measurement_level_trns (struct trns_chain *chain)
1190 {
1191   if (!chain->n)
1192     return;
1193
1194   struct transformation *trns = &chain->xforms[chain->n - 1];
1195   if (trns->class != &mg_trns_class)
1196     return;
1197
1198   struct measure_guesser *mg = trns->aux;
1199   measure_guesser_destroy (mg);
1200   chain->n--;
1201 }
1202 \f
1203 static void
1204 dataset_changed__ (struct dataset *ds)
1205 {
1206   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1207     ds->callbacks->changed (ds->cb_data);
1208 }
1209
1210 static void
1211 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1212 {
1213   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1214     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1215 }
1216 \f
1217 /* Private interface for use by session code. */
1218
1219 void
1220 dataset_set_session__ (struct dataset *ds, struct session *session)
1221 {
1222   ds->session = session;
1223 }