DELETE VARIABLES: Fix bugs related to details of case compaction.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 void
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   assert (!proc_in_temporary_transformations (ds));
354   assert (!proc_has_transformations (ds));
355   assert (n < dict_get_n_vars (ds->dict));
356
357   dict_delete_vars (ds->dict, vars, n);
358   ds->source = case_map_create_input_translator (
359     case_map_to_compact_dict (ds->dict, 0), ds->source);
360   dict_compact_values (ds->dict);
361   caseinit_clear (ds->caseinit);
362   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
363 }
364
365 /* Returns a number unique to DS.  It can be used to distinguish one dataset
366    from any other within a given program run, even datasets that do not exist
367    at the same time. */
368 unsigned int
369 dataset_seqno (const struct dataset *ds)
370 {
371   return ds->seqno;
372 }
373
374 void
375 dataset_set_callbacks (struct dataset *ds,
376                        const struct dataset_callbacks *callbacks,
377                        void *cb_data)
378 {
379   ds->callbacks = callbacks;
380   ds->cb_data = cb_data;
381 }
382
383 enum dataset_display
384 dataset_get_display (const struct dataset *ds)
385 {
386   return ds->display;
387 }
388
389 void
390 dataset_set_display (struct dataset *ds, enum dataset_display display)
391 {
392   ds->display = display;
393 }
394 \f
395 /* Returns the last time the data was read. */
396 time_t
397 time_of_last_procedure (struct dataset *ds)
398 {
399   if (!ds)
400     return time (NULL);
401   if (ds->last_proc_invocation == 0)
402     update_last_proc_invocation (ds);
403   return ds->last_proc_invocation;
404 }
405 \f
406 /* Regular procedure. */
407
408 /* Executes any pending transformations, if necessary.
409    This is not identical to the EXECUTE command in that it won't
410    always read the source data.  This can be important when the
411    source data is given inline within BEGIN DATA...END FILE. */
412 bool
413 proc_execute (struct dataset *ds)
414 {
415   bool ok;
416
417   if ((!ds->temporary || !ds->temporary_trns_chain.n)
418       && !ds->permanent_trns_chain.n)
419     {
420       ds->n_lag = 0;
421       ds->discard_output = false;
422       dict_set_case_limit (ds->dict, 0);
423       dict_clear_vectors (ds->dict);
424       return true;
425     }
426
427   ok = casereader_destroy (proc_open (ds));
428   return proc_commit (ds) && ok;
429 }
430
431 static const struct casereader_class proc_casereader_class;
432
433 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
434    cases filtered out with FILTER BY will not be included in the casereader
435    (which is usually desirable).  If FILTER is false, all cases will be
436    included regardless of FILTER BY settings.
437
438    proc_commit must be called when done. */
439 struct casereader *
440 proc_open_filtering (struct dataset *ds, bool filter)
441 {
442   struct casereader *reader;
443
444   assert (ds->n_stack == 0);
445   assert (ds->source != NULL);
446   assert (ds->proc_state == PROC_COMMITTED);
447
448   update_last_proc_invocation (ds);
449
450   caseinit_mark_for_init (ds->caseinit, ds->dict);
451
452   /* Finish up the collection of transformations. */
453   add_case_limit_trns (ds);
454   if (filter)
455     add_filter_trns (ds);
456   if (!proc_in_temporary_transformations (ds))
457     add_measurement_level_trns (ds, ds->dict);
458
459   /* Make permanent_dict refer to the dictionary right before
460      data reaches the sink. */
461   if (ds->permanent_dict == NULL)
462     ds->permanent_dict = ds->dict;
463
464   /* Prepare sink. */
465   if (!ds->discard_output)
466     {
467       struct dictionary *pd = ds->permanent_dict;
468       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
469       if (compacted_n_values < dict_get_next_value_idx (pd))
470         {
471           struct caseproto *compacted_proto;
472           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
473           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
474           ds->sink = autopaging_writer_create (compacted_proto);
475           caseproto_unref (compacted_proto);
476         }
477       else
478         {
479           ds->compactor = NULL;
480           ds->sink = autopaging_writer_create (dict_get_proto (pd));
481         }
482     }
483   else
484     {
485       ds->compactor = NULL;
486       ds->sink = NULL;
487     }
488
489   /* Allocate memory for lagged cases. */
490   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
491
492   ds->proc_state = PROC_OPEN;
493   ds->cases_written = 0;
494   ds->ok = true;
495
496   /* FIXME: use taint in dataset in place of `ok'? */
497   /* FIXME: for trivial cases we can just return a clone of
498      ds->source? */
499
500   /* Create casereader and insert a shim on top.  The shim allows us to
501      arbitrarily extend the casereader's lifetime, by slurping the cases into
502      the shim's buffer in proc_commit().  That is especially useful when output
503      table_items are generated directly from the procedure casereader (e.g. by
504      the LIST procedure) when we are using an output driver that keeps a
505      reference to the output items passed to it (e.g. the GUI output driver in
506      PSPPIRE). */
507   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
508                                          CASENUMBER_MAX,
509                                          &proc_casereader_class, ds);
510   ds->shim = casereader_shim_insert (reader);
511   return reader;
512 }
513
514 /* Opens dataset DS for reading cases with proc_read.
515    proc_commit must be called when done. */
516 struct casereader *
517 proc_open (struct dataset *ds)
518 {
519   return proc_open_filtering (ds, true);
520 }
521
522 /* Returns true if a procedure is in progress, that is, if
523    proc_open has been called but proc_commit has not. */
524 bool
525 proc_is_open (const struct dataset *ds)
526 {
527   return ds->proc_state != PROC_COMMITTED;
528 }
529
530 /* "read" function for procedure casereader. */
531 static struct ccase *
532 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
533 {
534   struct dataset *ds = ds_;
535   enum trns_result retval = TRNS_DROP_CASE;
536   struct ccase *c;
537
538   assert (ds->proc_state == PROC_OPEN);
539   for (; ; case_unref (c))
540     {
541       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
542       if (retval == TRNS_ERROR)
543         ds->ok = false;
544       if (!ds->ok)
545         return NULL;
546
547       /* Read a case from source. */
548       c = casereader_read (ds->source);
549       if (c == NULL)
550         return NULL;
551       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
552       caseinit_init_vars (ds->caseinit, c);
553
554       /* Execute permanent transformations.  */
555       casenumber case_nr = ds->cases_written + 1;
556       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
557       caseinit_update_left_vars (ds->caseinit, c);
558       if (retval != TRNS_CONTINUE)
559         continue;
560
561       /* Write case to collection of lagged cases. */
562       if (ds->n_lag > 0)
563         {
564           while (deque_count (&ds->lag) >= ds->n_lag)
565             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
566           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
567         }
568
569       /* Write case to replacement dataset. */
570       ds->cases_written++;
571       if (ds->sink != NULL)
572         casewriter_write (ds->sink,
573                           case_map_execute (ds->compactor, case_ref (c)));
574
575       /* Execute temporary transformations. */
576       if (ds->temporary_trns_chain.n)
577         {
578           retval = trns_chain_execute (&ds->temporary_trns_chain,
579                                        ds->cases_written, &c);
580           if (retval != TRNS_CONTINUE)
581             continue;
582         }
583
584       return c;
585     }
586 }
587
588 /* "destroy" function for procedure casereader. */
589 static void
590 proc_casereader_destroy (struct casereader *reader, void *ds_)
591 {
592   struct dataset *ds = ds_;
593   struct ccase *c;
594
595   /* We are always the subreader for a casereader_buffer, so if we're being
596      destroyed then it's because the casereader_buffer has read all the cases
597      that it ever will. */
598   ds->shim = NULL;
599
600   /* Make sure transformations happen for every input case, in
601      case they have side effects, and ensure that the replacement
602      active dataset gets all the cases it should. */
603   while ((c = casereader_read (reader)) != NULL)
604     case_unref (c);
605
606   ds->proc_state = PROC_CLOSED;
607   ds->ok = casereader_destroy (ds->source) && ds->ok;
608   ds->source = NULL;
609   dataset_set_source (ds, NULL);
610 }
611
612 /* Must return false if the source casereader, a transformation,
613    or the sink casewriter signaled an error.  (If a temporary
614    transformation signals an error, then the return value is
615    false, but the replacement active dataset may still be
616    untainted.) */
617 bool
618 proc_commit (struct dataset *ds)
619 {
620   if (ds->shim != NULL)
621     casereader_shim_slurp (ds->shim);
622
623   assert (ds->proc_state == PROC_CLOSED);
624   ds->proc_state = PROC_COMMITTED;
625
626   dataset_changed__ (ds);
627
628   /* Free memory for lagged cases. */
629   while (!deque_is_empty (&ds->lag))
630     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
631   free (ds->lag_cases);
632
633   /* Dictionary from before TEMPORARY becomes permanent. */
634   proc_cancel_temporary_transformations (ds);
635   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
636
637   if (!ds->discard_output)
638     {
639       /* Finish compacting. */
640       if (ds->compactor != NULL)
641         {
642           case_map_destroy (ds->compactor);
643           ds->compactor = NULL;
644
645           dict_delete_scratch_vars (ds->dict);
646           dict_compact_values (ds->dict);
647         }
648
649       /* Old data sink becomes new data source. */
650       if (ds->sink != NULL)
651         ds->source = casewriter_make_reader (ds->sink);
652     }
653   else
654     {
655       ds->source = NULL;
656       ds->discard_output = false;
657     }
658   ds->sink = NULL;
659
660   caseinit_clear (ds->caseinit);
661   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
662
663   dict_clear_vectors (ds->dict);
664   ds->permanent_dict = NULL;
665   return ok;
666 }
667
668 /* Casereader class for procedure execution. */
669 static const struct casereader_class proc_casereader_class =
670   {
671     proc_casereader_read,
672     proc_casereader_destroy,
673     NULL,
674     NULL,
675   };
676
677 /* Updates last_proc_invocation. */
678 static void
679 update_last_proc_invocation (struct dataset *ds)
680 {
681   ds->last_proc_invocation = time (NULL);
682 }
683 \f
684 /* Returns a pointer to the lagged case from N_BEFORE cases before the
685    current one, or NULL if there haven't been that many cases yet. */
686 const struct ccase *
687 lagged_case (const struct dataset *ds, int n_before)
688 {
689   assert (n_before >= 1);
690   assert (n_before <= ds->n_lag);
691
692   if (n_before <= deque_count (&ds->lag))
693     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
694   else
695     return NULL;
696 }
697 \f
698 /* Adds TRNS to the current set of transformations. */
699 void
700 add_transformation (struct dataset *ds,
701                     const struct trns_class *class, void *aux)
702 {
703   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
704                               : ds->temporary ? &ds->temporary_trns_chain
705                               : &ds->permanent_trns_chain);
706   struct transformation t = { .class = class, .aux = aux };
707   trns_chain_append (chain, &t);
708   dataset_transformations_changed__ (ds, true);
709 }
710
711 /* Returns true if the next call to add_transformation() will add
712    a temporary transformation, false if it will add a permanent
713    transformation. */
714 bool
715 proc_in_temporary_transformations (const struct dataset *ds)
716 {
717   return ds->temporary;
718 }
719
720 /* Marks the start of temporary transformations.
721    Further calls to add_transformation() will add temporary
722    transformations. */
723 void
724 proc_start_temporary_transformations (struct dataset *ds)
725 {
726   assert (!ds->n_stack);
727   if (!proc_in_temporary_transformations (ds))
728     {
729       add_case_limit_trns (ds);
730
731       ds->permanent_dict = dict_clone (ds->dict);
732       add_measurement_level_trns (ds, ds->permanent_dict);
733
734       ds->temporary = true;
735       dataset_transformations_changed__ (ds, true);
736     }
737 }
738
739 /* Converts all the temporary transformations, if any, to permanent
740    transformations.  Further transformations will be permanent.
741
742    The FILTER command is implemented as a temporary transformation, so a
743    procedure that uses this function should usually use proc_open_filtering()
744    with FILTER false, instead of plain proc_open().
745
746    Returns true if anything changed, false otherwise. */
747 bool
748 proc_make_temporary_transformations_permanent (struct dataset *ds)
749 {
750   if (proc_in_temporary_transformations (ds))
751     {
752       cancel_measurement_level_trns (&ds->permanent_trns_chain);
753       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
754
755       ds->temporary = false;
756
757       dict_unref (ds->permanent_dict);
758       ds->permanent_dict = NULL;
759
760       return true;
761     }
762   else
763     return false;
764 }
765
766 /* Cancels all temporary transformations, if any.  Further
767    transformations will be permanent.
768    Returns true if anything changed, false otherwise. */
769 bool
770 proc_cancel_temporary_transformations (struct dataset *ds)
771 {
772   if (proc_in_temporary_transformations (ds))
773     {
774       trns_chain_clear (&ds->temporary_trns_chain);
775
776       dict_unref (ds->dict);
777       ds->dict = ds->permanent_dict;
778       ds->permanent_dict = NULL;
779
780       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
781       return true;
782     }
783   else
784     return false;
785 }
786
787 /* Cancels all transformations, if any.
788    Returns true if successful, false on I/O error. */
789 bool
790 proc_cancel_all_transformations (struct dataset *ds)
791 {
792   bool ok;
793   assert (ds->proc_state == PROC_COMMITTED);
794   ok = trns_chain_clear (&ds->permanent_trns_chain);
795   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
796   ds->temporary = false;
797   for (size_t i = 0; i < ds->n_stack; i++)
798     ok = trns_chain_uninit (&ds->stack[i]) && ok;
799   ds->n_stack = 0;
800   dataset_transformations_changed__ (ds, false);
801
802   return ok;
803 }
804
805 void
806 proc_push_transformations (struct dataset *ds)
807 {
808   if (ds->n_stack >= ds->allocated_stack)
809     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
810                             sizeof *ds->stack);
811   trns_chain_init (&ds->stack[ds->n_stack++]);
812 }
813
814 void
815 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
816 {
817   assert (ds->n_stack > 0);
818   *chain = ds->stack[--ds->n_stack];
819 }
820
821 bool
822 proc_has_transformations (const struct dataset *ds)
823 {
824   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
825 }
826
827 static enum trns_result
828 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
829 {
830   struct variable *var = var_;
831
832   *cc = case_unshare (*cc);
833   *case_num_rw (*cc, var) = case_num;
834
835   return TRNS_CONTINUE;
836 }
837
838 /* Add a variable which we can sort by to get back the original order. */
839 struct variable *
840 add_permanent_ordering_transformation (struct dataset *ds)
841 {
842   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
843   struct variable *order_var
844     = (proc_in_temporary_transformations (ds)
845        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
846        : temp_var);
847
848   static const struct trns_class trns_class = {
849     .name = "ordering",
850     .execute = store_case_num
851   };
852   const struct transformation t = { .class = &trns_class, .aux = order_var };
853   trns_chain_append (&ds->permanent_trns_chain, &t);
854
855   return temp_var;
856 }
857 \f
858 /* Causes output from the next procedure to be discarded, instead
859    of being preserved for use as input for the next procedure. */
860 void
861 proc_discard_output (struct dataset *ds)
862 {
863   ds->discard_output = true;
864 }
865
866
867 /* Checks whether DS has a corrupted active dataset.  If so,
868    discards it and returns false.  If not, returns true without
869    doing anything. */
870 bool
871 dataset_end_of_command (struct dataset *ds)
872 {
873   if (ds->source != NULL)
874     {
875       if (casereader_error (ds->source))
876         {
877           dataset_clear (ds);
878           return false;
879         }
880       else
881         {
882           const struct taint *taint = casereader_get_taint (ds->source);
883           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
884           assert (!taint_has_tainted_successor (taint));
885         }
886     }
887   return true;
888 }
889 \f
890 /* Limits the maximum number of cases processed to
891    *CASES_REMAINING. */
892 static enum trns_result
893 case_limit_trns_proc (void *cases_remaining_,
894                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
895 {
896   size_t *cases_remaining = cases_remaining_;
897   if (*cases_remaining > 0)
898     {
899       (*cases_remaining)--;
900       return TRNS_CONTINUE;
901     }
902   else
903     return TRNS_DROP_CASE;
904 }
905
906 /* Frees the data associated with a case limit transformation. */
907 static bool
908 case_limit_trns_free (void *cases_remaining_)
909 {
910   size_t *cases_remaining = cases_remaining_;
911   free (cases_remaining);
912   return true;
913 }
914
915 /* Adds a transformation that limits the number of cases that may
916    pass through, if DS->DICT has a case limit. */
917 static void
918 add_case_limit_trns (struct dataset *ds)
919 {
920   casenumber case_limit = dict_get_case_limit (ds->dict);
921   if (case_limit != 0)
922     {
923       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
924       *cases_remaining = case_limit;
925
926       static const struct trns_class trns_class = {
927         .name = "case limit",
928         .execute = case_limit_trns_proc,
929         .destroy = case_limit_trns_free,
930       };
931       add_transformation (ds, &trns_class, cases_remaining);
932
933       dict_set_case_limit (ds->dict, 0);
934     }
935 }
936
937 \f
938 /* FILTER transformation. */
939 static enum trns_result
940 filter_trns_proc (void *filter_var_,
941                   struct ccase **c, casenumber case_nr UNUSED)
942
943 {
944   struct variable *filter_var = filter_var_;
945   double f = case_num (*c, filter_var);
946   return (f != 0.0 && !var_is_num_missing (filter_var, f)
947           ? TRNS_CONTINUE : TRNS_DROP_CASE);
948 }
949
950 /* Adds a temporary transformation to filter data according to
951    the variable specified on FILTER, if any. */
952 static void
953 add_filter_trns (struct dataset *ds)
954 {
955   struct variable *filter_var = dict_get_filter (ds->dict);
956   if (filter_var != NULL)
957     {
958       proc_start_temporary_transformations (ds);
959
960       static const struct trns_class trns_class = {
961         .name = "FILTER",
962         .execute = filter_trns_proc,
963       };
964       add_transformation (ds, &trns_class, filter_var);
965     }
966 }
967
968 void
969 dataset_need_lag (struct dataset *ds, int n_before)
970 {
971   ds->n_lag = MAX (ds->n_lag, n_before);
972 }
973 \f
974 /* Measurement guesser, for guessing a measurement level from formats and
975    data. */
976
977 struct mg_value
978   {
979     struct hmap_node hmap_node;
980     double value;
981   };
982
983 struct mg_var
984   {
985     struct variable *var;
986     struct hmap *values;
987   };
988
989 static void
990 mg_var_uninit (struct mg_var *mgv)
991 {
992   struct mg_value *mgvalue, *next;
993   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
994                       mgv->values)
995     {
996       hmap_delete (mgv->values, &mgvalue->hmap_node);
997       free (mgvalue);
998     }
999   hmap_destroy (mgv->values);
1000   free (mgv->values);
1001 }
1002
1003 static enum measure
1004 mg_var_interpret (const struct mg_var *mgv)
1005 {
1006   size_t n = hmap_count (mgv->values);
1007   if (!n)
1008     {
1009       /* All missing (or no data). */
1010       return MEASURE_NOMINAL;
1011     }
1012
1013   const struct mg_value *mgvalue;
1014   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1015                  mgv->values)
1016     if (mgvalue->value < 10)
1017       return MEASURE_NOMINAL;
1018   return MEASURE_SCALE;
1019 }
1020
1021 static enum measure
1022 mg_var_add_value (struct mg_var *mgv, double value)
1023 {
1024   if (var_is_num_missing (mgv->var, value))
1025     return MEASURE_UNKNOWN;
1026   else if (value < 0 || value != floor (value))
1027     return MEASURE_SCALE;
1028
1029   size_t hash = hash_double (value, 0);
1030   struct mg_value *mgvalue;
1031   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1032                            hash, mgv->values)
1033     if (mgvalue->value == value)
1034       return MEASURE_UNKNOWN;
1035
1036   mgvalue = xmalloc (sizeof *mgvalue);
1037   mgvalue->value = value;
1038   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1039   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1040     return MEASURE_SCALE;
1041
1042   return MEASURE_UNKNOWN;
1043 }
1044
1045 struct measure_guesser
1046   {
1047     struct mg_var *vars;
1048     size_t n_vars;
1049   };
1050
1051 static struct measure_guesser *
1052 measure_guesser_create__ (struct dictionary *dict)
1053 {
1054   struct mg_var *mgvs = NULL;
1055   size_t n_mgvs = 0;
1056   size_t allocated_mgvs = 0;
1057
1058   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1059     {
1060       struct variable *var = dict_get_var (dict, i);
1061       if (var_get_measure (var) != MEASURE_UNKNOWN)
1062         continue;
1063
1064       const struct fmt_spec *f = var_get_print_format (var);
1065       enum measure m = var_default_measure_for_format (f->type);
1066       if (m != MEASURE_UNKNOWN)
1067         {
1068           var_set_measure (var, m);
1069           continue;
1070         }
1071
1072       if (n_mgvs >= allocated_mgvs)
1073         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1074
1075       struct mg_var *mgv = &mgvs[n_mgvs++];
1076       *mgv = (struct mg_var) {
1077         .var = var,
1078         .values = xmalloc (sizeof *mgv->values),
1079       };
1080       hmap_init (mgv->values);
1081     }
1082   if (!n_mgvs)
1083     return NULL;
1084
1085   struct measure_guesser *mg = xmalloc (sizeof *mg);
1086   *mg = (struct measure_guesser) {
1087     .vars = mgvs,
1088     .n_vars = n_mgvs,
1089   };
1090   return mg;
1091 }
1092
1093 /* Scans through DS's dictionary for variables that have an unknown measurement
1094    level.  For those, if the measurement level can be guessed based on the
1095    variable's type and format, sets a default.  If that's enough, returns NULL.
1096    If any remain whose levels are unknown and can't be guessed that way,
1097    creates and returns a structure that the caller should pass to
1098    measure_guesser_add_case() or measure_guesser_run() for guessing a
1099    measurement level based on the data.  */
1100 struct measure_guesser *
1101 measure_guesser_create (struct dataset *ds)
1102 {
1103   return measure_guesser_create__ (dataset_dict (ds));
1104 }
1105
1106 /* Adds data from case C to MG. */
1107 static void
1108 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1109 {
1110   for (size_t i = 0; i < mg->n_vars; )
1111     {
1112       struct mg_var *mgv = &mg->vars[i];
1113       double value = case_num (c, mgv->var);
1114       enum measure m = mg_var_add_value (mgv, value);
1115       if (m != MEASURE_UNKNOWN)
1116         {
1117           var_set_measure (mgv->var, m);
1118
1119           mg_var_uninit (mgv);
1120           *mgv = mg->vars[--mg->n_vars];
1121         }
1122       else
1123         i++;
1124     }
1125 }
1126
1127 /* Destroys MG. */
1128 void
1129 measure_guesser_destroy (struct measure_guesser *mg)
1130 {
1131   if (!mg)
1132     return;
1133
1134   for (size_t i = 0; i < mg->n_vars; i++)
1135     {
1136       struct mg_var *mgv = &mg->vars[i];
1137       var_set_measure (mgv->var, mg_var_interpret (mgv));
1138       mg_var_uninit (mgv);
1139     }
1140   free (mg->vars);
1141   free (mg);
1142 }
1143
1144 /* Adds final measurement levels based on MG, after all the cases have been
1145    added. */
1146 static void
1147 measure_guesser_commit (struct measure_guesser *mg)
1148 {
1149   for (size_t i = 0; i < mg->n_vars; i++)
1150     {
1151       struct mg_var *mgv = &mg->vars[i];
1152       var_set_measure (mgv->var, mg_var_interpret (mgv));
1153     }
1154 }
1155
1156 /* Passes the cases in READER through MG and uses the data in the cases to set
1157    measurement levels for the variables where they were still unknown. */
1158 void
1159 measure_guesser_run (struct measure_guesser *mg,
1160                      const struct casereader *reader)
1161 {
1162   struct casereader *r = casereader_clone (reader);
1163   while (mg->n_vars > 0)
1164     {
1165       struct ccase *c = casereader_read (r);
1166       if (!c)
1167         break;
1168       measure_guesser_add_case (mg, c);
1169       case_unref (c);
1170     }
1171   casereader_destroy (r);
1172
1173   measure_guesser_commit (mg);
1174 }
1175 \f
1176 /* A transformation for guessing measurement levels. */
1177
1178 static enum trns_result
1179 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1180 {
1181   struct measure_guesser *mg = mg_;
1182   measure_guesser_add_case (mg, *c);
1183   return TRNS_CONTINUE;
1184 }
1185
1186 static bool
1187 mg_trns_free (void *mg_)
1188 {
1189   struct measure_guesser *mg = mg_;
1190   measure_guesser_commit (mg);
1191   measure_guesser_destroy (mg);
1192   return true;
1193 }
1194
1195 static const struct trns_class mg_trns_class = {
1196   .name = "add measurement level",
1197   .execute = mg_trns_proc,
1198   .destroy = mg_trns_free,
1199 };
1200
1201 static void
1202 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1203 {
1204   struct measure_guesser *mg = measure_guesser_create__ (dict);
1205   if (mg)
1206     add_transformation (ds, &mg_trns_class, mg);
1207 }
1208
1209 static void
1210 cancel_measurement_level_trns (struct trns_chain *chain)
1211 {
1212   if (!chain->n)
1213     return;
1214
1215   struct transformation *trns = &chain->xforms[chain->n - 1];
1216   if (trns->class != &mg_trns_class)
1217     return;
1218
1219   struct measure_guesser *mg = trns->aux;
1220   measure_guesser_destroy (mg);
1221   chain->n--;
1222 }
1223 \f
1224 static void
1225 dataset_changed__ (struct dataset *ds)
1226 {
1227   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1228     ds->callbacks->changed (ds->cb_data);
1229 }
1230
1231 static void
1232 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1233 {
1234   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1235     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1236 }
1237 \f
1238 /* Private interface for use by session code. */
1239
1240 void
1241 dataset_set_session__ (struct dataset *ds, struct session *session)
1242 {
1243   ds->session = session;
1244 }