work on DELETE VARS
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 bool
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   dict_delete_vars (ds->dict, vars, n);
354
355   if (ds->source)
356     {
357       struct case_map *map = case_map_to_compact_dict (ds->d, 0);
358       ds->source = case_map_create_input_translator (map, ds->source);
359     }
360   dict_compact_values (ds->dict);
361 }
362
363 /* Returns a number unique to DS.  It can be used to distinguish one dataset
364    from any other within a given program run, even datasets that do not exist
365    at the same time. */
366 unsigned int
367 dataset_seqno (const struct dataset *ds)
368 {
369   return ds->seqno;
370 }
371
372 void
373 dataset_set_callbacks (struct dataset *ds,
374                        const struct dataset_callbacks *callbacks,
375                        void *cb_data)
376 {
377   ds->callbacks = callbacks;
378   ds->cb_data = cb_data;
379 }
380
381 enum dataset_display
382 dataset_get_display (const struct dataset *ds)
383 {
384   return ds->display;
385 }
386
387 void
388 dataset_set_display (struct dataset *ds, enum dataset_display display)
389 {
390   ds->display = display;
391 }
392 \f
393 /* Returns the last time the data was read. */
394 time_t
395 time_of_last_procedure (struct dataset *ds)
396 {
397   if (!ds)
398     return time (NULL);
399   if (ds->last_proc_invocation == 0)
400     update_last_proc_invocation (ds);
401   return ds->last_proc_invocation;
402 }
403 \f
404 /* Regular procedure. */
405
406 /* Executes any pending transformations, if necessary.
407    This is not identical to the EXECUTE command in that it won't
408    always read the source data.  This can be important when the
409    source data is given inline within BEGIN DATA...END FILE. */
410 bool
411 proc_execute (struct dataset *ds)
412 {
413   bool ok;
414
415   if ((!ds->temporary || !ds->temporary_trns_chain.n)
416       && !ds->permanent_trns_chain.n)
417     {
418       ds->n_lag = 0;
419       ds->discard_output = false;
420       dict_set_case_limit (ds->dict, 0);
421       dict_clear_vectors (ds->dict);
422       return true;
423     }
424
425   ok = casereader_destroy (proc_open (ds));
426   return proc_commit (ds) && ok;
427 }
428
429 static const struct casereader_class proc_casereader_class;
430
431 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
432    cases filtered out with FILTER BY will not be included in the casereader
433    (which is usually desirable).  If FILTER is false, all cases will be
434    included regardless of FILTER BY settings.
435
436    proc_commit must be called when done. */
437 struct casereader *
438 proc_open_filtering (struct dataset *ds, bool filter)
439 {
440   struct casereader *reader;
441
442   assert (ds->n_stack == 0);
443   assert (ds->source != NULL);
444   assert (ds->proc_state == PROC_COMMITTED);
445
446   update_last_proc_invocation (ds);
447
448   caseinit_mark_for_init (ds->caseinit, ds->dict);
449
450   /* Finish up the collection of transformations. */
451   add_case_limit_trns (ds);
452   if (filter)
453     add_filter_trns (ds);
454   if (!proc_in_temporary_transformations (ds))
455     add_measurement_level_trns (ds, ds->dict);
456
457   /* Make permanent_dict refer to the dictionary right before
458      data reaches the sink. */
459   if (ds->permanent_dict == NULL)
460     ds->permanent_dict = ds->dict;
461
462   /* Prepare sink. */
463   if (!ds->discard_output)
464     {
465       struct dictionary *pd = ds->permanent_dict;
466       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
467       if (compacted_n_values < dict_get_next_value_idx (pd))
468         {
469           struct caseproto *compacted_proto;
470           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
471           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
472           ds->sink = autopaging_writer_create (compacted_proto);
473           caseproto_unref (compacted_proto);
474         }
475       else
476         {
477           ds->compactor = NULL;
478           ds->sink = autopaging_writer_create (dict_get_proto (pd));
479         }
480     }
481   else
482     {
483       ds->compactor = NULL;
484       ds->sink = NULL;
485     }
486
487   /* Allocate memory for lagged cases. */
488   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
489
490   ds->proc_state = PROC_OPEN;
491   ds->cases_written = 0;
492   ds->ok = true;
493
494   /* FIXME: use taint in dataset in place of `ok'? */
495   /* FIXME: for trivial cases we can just return a clone of
496      ds->source? */
497
498   /* Create casereader and insert a shim on top.  The shim allows us to
499      arbitrarily extend the casereader's lifetime, by slurping the cases into
500      the shim's buffer in proc_commit().  That is especially useful when output
501      table_items are generated directly from the procedure casereader (e.g. by
502      the LIST procedure) when we are using an output driver that keeps a
503      reference to the output items passed to it (e.g. the GUI output driver in
504      PSPPIRE). */
505   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
506                                          CASENUMBER_MAX,
507                                          &proc_casereader_class, ds);
508   ds->shim = casereader_shim_insert (reader);
509   return reader;
510 }
511
512 /* Opens dataset DS for reading cases with proc_read.
513    proc_commit must be called when done. */
514 struct casereader *
515 proc_open (struct dataset *ds)
516 {
517   return proc_open_filtering (ds, true);
518 }
519
520 /* Returns true if a procedure is in progress, that is, if
521    proc_open has been called but proc_commit has not. */
522 bool
523 proc_is_open (const struct dataset *ds)
524 {
525   return ds->proc_state != PROC_COMMITTED;
526 }
527
528 /* "read" function for procedure casereader. */
529 static struct ccase *
530 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
531 {
532   struct dataset *ds = ds_;
533   enum trns_result retval = TRNS_DROP_CASE;
534   struct ccase *c;
535
536   assert (ds->proc_state == PROC_OPEN);
537   for (; ; case_unref (c))
538     {
539       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
540       if (retval == TRNS_ERROR)
541         ds->ok = false;
542       if (!ds->ok)
543         return NULL;
544
545       /* Read a case from source. */
546       c = casereader_read (ds->source);
547       if (c == NULL)
548         return NULL;
549       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
550       caseinit_init_vars (ds->caseinit, c);
551
552       /* Execute permanent transformations.  */
553       casenumber case_nr = ds->cases_written + 1;
554       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
555       caseinit_update_left_vars (ds->caseinit, c);
556       if (retval != TRNS_CONTINUE)
557         continue;
558
559       /* Write case to collection of lagged cases. */
560       if (ds->n_lag > 0)
561         {
562           while (deque_count (&ds->lag) >= ds->n_lag)
563             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
564           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
565         }
566
567       /* Write case to replacement dataset. */
568       ds->cases_written++;
569       if (ds->sink != NULL)
570         casewriter_write (ds->sink,
571                           case_map_execute (ds->compactor, case_ref (c)));
572
573       /* Execute temporary transformations. */
574       if (ds->temporary_trns_chain.n)
575         {
576           retval = trns_chain_execute (&ds->temporary_trns_chain,
577                                        ds->cases_written, &c);
578           if (retval != TRNS_CONTINUE)
579             continue;
580         }
581
582       return c;
583     }
584 }
585
586 /* "destroy" function for procedure casereader. */
587 static void
588 proc_casereader_destroy (struct casereader *reader, void *ds_)
589 {
590   struct dataset *ds = ds_;
591   struct ccase *c;
592
593   /* We are always the subreader for a casereader_buffer, so if we're being
594      destroyed then it's because the casereader_buffer has read all the cases
595      that it ever will. */
596   ds->shim = NULL;
597
598   /* Make sure transformations happen for every input case, in
599      case they have side effects, and ensure that the replacement
600      active dataset gets all the cases it should. */
601   while ((c = casereader_read (reader)) != NULL)
602     case_unref (c);
603
604   ds->proc_state = PROC_CLOSED;
605   ds->ok = casereader_destroy (ds->source) && ds->ok;
606   ds->source = NULL;
607   dataset_set_source (ds, NULL);
608 }
609
610 /* Must return false if the source casereader, a transformation,
611    or the sink casewriter signaled an error.  (If a temporary
612    transformation signals an error, then the return value is
613    false, but the replacement active dataset may still be
614    untainted.) */
615 bool
616 proc_commit (struct dataset *ds)
617 {
618   if (ds->shim != NULL)
619     casereader_shim_slurp (ds->shim);
620
621   assert (ds->proc_state == PROC_CLOSED);
622   ds->proc_state = PROC_COMMITTED;
623
624   dataset_changed__ (ds);
625
626   /* Free memory for lagged cases. */
627   while (!deque_is_empty (&ds->lag))
628     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
629   free (ds->lag_cases);
630
631   /* Dictionary from before TEMPORARY becomes permanent. */
632   proc_cancel_temporary_transformations (ds);
633   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
634
635   if (!ds->discard_output)
636     {
637       /* Finish compacting. */
638       if (ds->compactor != NULL)
639         {
640           case_map_destroy (ds->compactor);
641           ds->compactor = NULL;
642
643           dict_delete_scratch_vars (ds->dict);
644           dict_compact_values (ds->dict);
645         }
646
647       /* Old data sink becomes new data source. */
648       if (ds->sink != NULL)
649         ds->source = casewriter_make_reader (ds->sink);
650     }
651   else
652     {
653       ds->source = NULL;
654       ds->discard_output = false;
655     }
656   ds->sink = NULL;
657
658   caseinit_clear (ds->caseinit);
659   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
660
661   dict_clear_vectors (ds->dict);
662   ds->permanent_dict = NULL;
663   return ok;
664 }
665
666 /* Casereader class for procedure execution. */
667 static const struct casereader_class proc_casereader_class =
668   {
669     proc_casereader_read,
670     proc_casereader_destroy,
671     NULL,
672     NULL,
673   };
674
675 /* Updates last_proc_invocation. */
676 static void
677 update_last_proc_invocation (struct dataset *ds)
678 {
679   ds->last_proc_invocation = time (NULL);
680 }
681 \f
682 /* Returns a pointer to the lagged case from N_BEFORE cases before the
683    current one, or NULL if there haven't been that many cases yet. */
684 const struct ccase *
685 lagged_case (const struct dataset *ds, int n_before)
686 {
687   assert (n_before >= 1);
688   assert (n_before <= ds->n_lag);
689
690   if (n_before <= deque_count (&ds->lag))
691     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
692   else
693     return NULL;
694 }
695 \f
696 /* Adds TRNS to the current set of transformations. */
697 void
698 add_transformation (struct dataset *ds,
699                     const struct trns_class *class, void *aux)
700 {
701   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
702                               : ds->temporary ? &ds->temporary_trns_chain
703                               : &ds->permanent_trns_chain);
704   struct transformation t = { .class = class, .aux = aux };
705   trns_chain_append (chain, &t);
706   dataset_transformations_changed__ (ds, true);
707 }
708
709 /* Returns true if the next call to add_transformation() will add
710    a temporary transformation, false if it will add a permanent
711    transformation. */
712 bool
713 proc_in_temporary_transformations (const struct dataset *ds)
714 {
715   return ds->temporary;
716 }
717
718 /* Marks the start of temporary transformations.
719    Further calls to add_transformation() will add temporary
720    transformations. */
721 void
722 proc_start_temporary_transformations (struct dataset *ds)
723 {
724   assert (!ds->n_stack);
725   if (!proc_in_temporary_transformations (ds))
726     {
727       add_case_limit_trns (ds);
728
729       ds->permanent_dict = dict_clone (ds->dict);
730       add_measurement_level_trns (ds, ds->permanent_dict);
731
732       ds->temporary = true;
733       dataset_transformations_changed__ (ds, true);
734     }
735 }
736
737 /* Converts all the temporary transformations, if any, to permanent
738    transformations.  Further transformations will be permanent.
739
740    The FILTER command is implemented as a temporary transformation, so a
741    procedure that uses this function should usually use proc_open_filtering()
742    with FILTER false, instead of plain proc_open().
743
744    Returns true if anything changed, false otherwise. */
745 bool
746 proc_make_temporary_transformations_permanent (struct dataset *ds)
747 {
748   if (proc_in_temporary_transformations (ds))
749     {
750       cancel_measurement_level_trns (&ds->permanent_trns_chain);
751       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
752
753       ds->temporary = false;
754
755       dict_unref (ds->permanent_dict);
756       ds->permanent_dict = NULL;
757
758       return true;
759     }
760   else
761     return false;
762 }
763
764 /* Cancels all temporary transformations, if any.  Further
765    transformations will be permanent.
766    Returns true if anything changed, false otherwise. */
767 bool
768 proc_cancel_temporary_transformations (struct dataset *ds)
769 {
770   if (proc_in_temporary_transformations (ds))
771     {
772       trns_chain_clear (&ds->temporary_trns_chain);
773
774       dict_unref (ds->dict);
775       ds->dict = ds->permanent_dict;
776       ds->permanent_dict = NULL;
777
778       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
779       return true;
780     }
781   else
782     return false;
783 }
784
785 /* Cancels all transformations, if any.
786    Returns true if successful, false on I/O error. */
787 bool
788 proc_cancel_all_transformations (struct dataset *ds)
789 {
790   bool ok;
791   assert (ds->proc_state == PROC_COMMITTED);
792   ok = trns_chain_clear (&ds->permanent_trns_chain);
793   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
794   ds->temporary = false;
795   for (size_t i = 0; i < ds->n_stack; i++)
796     ok = trns_chain_uninit (&ds->stack[i]) && ok;
797   ds->n_stack = 0;
798   dataset_transformations_changed__ (ds, false);
799
800   return ok;
801 }
802
803 void
804 proc_push_transformations (struct dataset *ds)
805 {
806   if (ds->n_stack >= ds->allocated_stack)
807     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
808                             sizeof *ds->stack);
809   trns_chain_init (&ds->stack[ds->n_stack++]);
810 }
811
812 void
813 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
814 {
815   assert (ds->n_stack > 0);
816   *chain = ds->stack[--ds->n_stack];
817 }
818
819 static enum trns_result
820 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
821 {
822   struct variable *var = var_;
823
824   *cc = case_unshare (*cc);
825   *case_num_rw (*cc, var) = case_num;
826
827   return TRNS_CONTINUE;
828 }
829
830 /* Add a variable which we can sort by to get back the original order. */
831 struct variable *
832 add_permanent_ordering_transformation (struct dataset *ds)
833 {
834   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
835   struct variable *order_var
836     = (proc_in_temporary_transformations (ds)
837        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
838        : temp_var);
839
840   static const struct trns_class trns_class = {
841     .name = "ordering",
842     .execute = store_case_num
843   };
844   const struct transformation t = { .class = &trns_class, .aux = order_var };
845   trns_chain_append (&ds->permanent_trns_chain, &t);
846
847   return temp_var;
848 }
849 \f
850 /* Causes output from the next procedure to be discarded, instead
851    of being preserved for use as input for the next procedure. */
852 void
853 proc_discard_output (struct dataset *ds)
854 {
855   ds->discard_output = true;
856 }
857
858
859 /* Checks whether DS has a corrupted active dataset.  If so,
860    discards it and returns false.  If not, returns true without
861    doing anything. */
862 bool
863 dataset_end_of_command (struct dataset *ds)
864 {
865   if (ds->source != NULL)
866     {
867       if (casereader_error (ds->source))
868         {
869           dataset_clear (ds);
870           return false;
871         }
872       else
873         {
874           const struct taint *taint = casereader_get_taint (ds->source);
875           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
876           assert (!taint_has_tainted_successor (taint));
877         }
878     }
879   return true;
880 }
881 \f
882 /* Limits the maximum number of cases processed to
883    *CASES_REMAINING. */
884 static enum trns_result
885 case_limit_trns_proc (void *cases_remaining_,
886                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
887 {
888   size_t *cases_remaining = cases_remaining_;
889   if (*cases_remaining > 0)
890     {
891       (*cases_remaining)--;
892       return TRNS_CONTINUE;
893     }
894   else
895     return TRNS_DROP_CASE;
896 }
897
898 /* Frees the data associated with a case limit transformation. */
899 static bool
900 case_limit_trns_free (void *cases_remaining_)
901 {
902   size_t *cases_remaining = cases_remaining_;
903   free (cases_remaining);
904   return true;
905 }
906
907 /* Adds a transformation that limits the number of cases that may
908    pass through, if DS->DICT has a case limit. */
909 static void
910 add_case_limit_trns (struct dataset *ds)
911 {
912   casenumber case_limit = dict_get_case_limit (ds->dict);
913   if (case_limit != 0)
914     {
915       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
916       *cases_remaining = case_limit;
917
918       static const struct trns_class trns_class = {
919         .name = "case limit",
920         .execute = case_limit_trns_proc,
921         .destroy = case_limit_trns_free,
922       };
923       add_transformation (ds, &trns_class, cases_remaining);
924
925       dict_set_case_limit (ds->dict, 0);
926     }
927 }
928
929 \f
930 /* FILTER transformation. */
931 static enum trns_result
932 filter_trns_proc (void *filter_var_,
933                   struct ccase **c, casenumber case_nr UNUSED)
934
935 {
936   struct variable *filter_var = filter_var_;
937   double f = case_num (*c, filter_var);
938   return (f != 0.0 && !var_is_num_missing (filter_var, f)
939           ? TRNS_CONTINUE : TRNS_DROP_CASE);
940 }
941
942 /* Adds a temporary transformation to filter data according to
943    the variable specified on FILTER, if any. */
944 static void
945 add_filter_trns (struct dataset *ds)
946 {
947   struct variable *filter_var = dict_get_filter (ds->dict);
948   if (filter_var != NULL)
949     {
950       proc_start_temporary_transformations (ds);
951
952       static const struct trns_class trns_class = {
953         .name = "FILTER",
954         .execute = filter_trns_proc,
955       };
956       add_transformation (ds, &trns_class, filter_var);
957     }
958 }
959
960 void
961 dataset_need_lag (struct dataset *ds, int n_before)
962 {
963   ds->n_lag = MAX (ds->n_lag, n_before);
964 }
965 \f
966 /* Measurement guesser, for guessing a measurement level from formats and
967    data. */
968
969 struct mg_value
970   {
971     struct hmap_node hmap_node;
972     double value;
973   };
974
975 struct mg_var
976   {
977     struct variable *var;
978     struct hmap *values;
979   };
980
981 static void
982 mg_var_uninit (struct mg_var *mgv)
983 {
984   struct mg_value *mgvalue, *next;
985   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
986                       mgv->values)
987     {
988       hmap_delete (mgv->values, &mgvalue->hmap_node);
989       free (mgvalue);
990     }
991   hmap_destroy (mgv->values);
992   free (mgv->values);
993 }
994
995 static enum measure
996 mg_var_interpret (const struct mg_var *mgv)
997 {
998   size_t n = hmap_count (mgv->values);
999   if (!n)
1000     {
1001       /* All missing (or no data). */
1002       return MEASURE_NOMINAL;
1003     }
1004
1005   const struct mg_value *mgvalue;
1006   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1007                  mgv->values)
1008     if (mgvalue->value < 10)
1009       return MEASURE_NOMINAL;
1010   return MEASURE_SCALE;
1011 }
1012
1013 static enum measure
1014 mg_var_add_value (struct mg_var *mgv, double value)
1015 {
1016   if (var_is_num_missing (mgv->var, value))
1017     return MEASURE_UNKNOWN;
1018   else if (value < 0 || value != floor (value))
1019     return MEASURE_SCALE;
1020
1021   size_t hash = hash_double (value, 0);
1022   struct mg_value *mgvalue;
1023   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1024                            hash, mgv->values)
1025     if (mgvalue->value == value)
1026       return MEASURE_UNKNOWN;
1027
1028   mgvalue = xmalloc (sizeof *mgvalue);
1029   mgvalue->value = value;
1030   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1031   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1032     return MEASURE_SCALE;
1033
1034   return MEASURE_UNKNOWN;
1035 }
1036
1037 struct measure_guesser
1038   {
1039     struct mg_var *vars;
1040     size_t n_vars;
1041   };
1042
1043 static struct measure_guesser *
1044 measure_guesser_create__ (struct dictionary *dict)
1045 {
1046   struct mg_var *mgvs = NULL;
1047   size_t n_mgvs = 0;
1048   size_t allocated_mgvs = 0;
1049
1050   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1051     {
1052       struct variable *var = dict_get_var (dict, i);
1053       if (var_get_measure (var) != MEASURE_UNKNOWN)
1054         continue;
1055
1056       const struct fmt_spec *f = var_get_print_format (var);
1057       enum measure m = var_default_measure_for_format (f->type);
1058       if (m != MEASURE_UNKNOWN)
1059         {
1060           var_set_measure (var, m);
1061           continue;
1062         }
1063
1064       if (n_mgvs >= allocated_mgvs)
1065         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1066
1067       struct mg_var *mgv = &mgvs[n_mgvs++];
1068       *mgv = (struct mg_var) {
1069         .var = var,
1070         .values = xmalloc (sizeof *mgv->values),
1071       };
1072       hmap_init (mgv->values);
1073     }
1074   if (!n_mgvs)
1075     return NULL;
1076
1077   struct measure_guesser *mg = xmalloc (sizeof *mg);
1078   *mg = (struct measure_guesser) {
1079     .vars = mgvs,
1080     .n_vars = n_mgvs,
1081   };
1082   return mg;
1083 }
1084
1085 /* Scans through DS's dictionary for variables that have an unknown measurement
1086    level.  For those, if the measurement level can be guessed based on the
1087    variable's type and format, sets a default.  If that's enough, returns NULL.
1088    If any remain whose levels are unknown and can't be guessed that way,
1089    creates and returns a structure that the caller should pass to
1090    measure_guesser_add_case() or measure_guesser_run() for guessing a
1091    measurement level based on the data.  */
1092 struct measure_guesser *
1093 measure_guesser_create (struct dataset *ds)
1094 {
1095   return measure_guesser_create__ (dataset_dict (ds));
1096 }
1097
1098 /* Adds data from case C to MG. */
1099 static void
1100 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1101 {
1102   for (size_t i = 0; i < mg->n_vars; )
1103     {
1104       struct mg_var *mgv = &mg->vars[i];
1105       double value = case_num (c, mgv->var);
1106       enum measure m = mg_var_add_value (mgv, value);
1107       if (m != MEASURE_UNKNOWN)
1108         {
1109           var_set_measure (mgv->var, m);
1110
1111           mg_var_uninit (mgv);
1112           *mgv = mg->vars[--mg->n_vars];
1113         }
1114       else
1115         i++;
1116     }
1117 }
1118
1119 /* Destroys MG. */
1120 void
1121 measure_guesser_destroy (struct measure_guesser *mg)
1122 {
1123   if (!mg)
1124     return;
1125
1126   for (size_t i = 0; i < mg->n_vars; i++)
1127     {
1128       struct mg_var *mgv = &mg->vars[i];
1129       var_set_measure (mgv->var, mg_var_interpret (mgv));
1130       mg_var_uninit (mgv);
1131     }
1132   free (mg->vars);
1133   free (mg);
1134 }
1135
1136 /* Adds final measurement levels based on MG, after all the cases have been
1137    added. */
1138 static void
1139 measure_guesser_commit (struct measure_guesser *mg)
1140 {
1141   for (size_t i = 0; i < mg->n_vars; i++)
1142     {
1143       struct mg_var *mgv = &mg->vars[i];
1144       var_set_measure (mgv->var, mg_var_interpret (mgv));
1145     }
1146 }
1147
1148 /* Passes the cases in READER through MG and uses the data in the cases to set
1149    measurement levels for the variables where they were still unknown. */
1150 void
1151 measure_guesser_run (struct measure_guesser *mg,
1152                      const struct casereader *reader)
1153 {
1154   struct casereader *r = casereader_clone (reader);
1155   while (mg->n_vars > 0)
1156     {
1157       struct ccase *c = casereader_read (r);
1158       if (!c)
1159         break;
1160       measure_guesser_add_case (mg, c);
1161       case_unref (c);
1162     }
1163   casereader_destroy (r);
1164
1165   measure_guesser_commit (mg);
1166 }
1167 \f
1168 /* A transformation for guessing measurement levels. */
1169
1170 static enum trns_result
1171 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1172 {
1173   struct measure_guesser *mg = mg_;
1174   measure_guesser_add_case (mg, *c);
1175   return TRNS_CONTINUE;
1176 }
1177
1178 static bool
1179 mg_trns_free (void *mg_)
1180 {
1181   struct measure_guesser *mg = mg_;
1182   measure_guesser_commit (mg);
1183   measure_guesser_destroy (mg);
1184   return true;
1185 }
1186
1187 static const struct trns_class mg_trns_class = {
1188   .name = "add measurement level",
1189   .execute = mg_trns_proc,
1190   .destroy = mg_trns_free,
1191 };
1192
1193 static void
1194 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1195 {
1196   struct measure_guesser *mg = measure_guesser_create__ (dict);
1197   if (mg)
1198     add_transformation (ds, &mg_trns_class, mg);
1199 }
1200
1201 static void
1202 cancel_measurement_level_trns (struct trns_chain *chain)
1203 {
1204   if (!chain->n)
1205     return;
1206
1207   struct transformation *trns = &chain->xforms[chain->n - 1];
1208   if (trns->class != &mg_trns_class)
1209     return;
1210
1211   struct measure_guesser *mg = trns->aux;
1212   measure_guesser_destroy (mg);
1213   chain->n--;
1214 }
1215 \f
1216 static void
1217 dataset_changed__ (struct dataset *ds)
1218 {
1219   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1220     ds->callbacks->changed (ds->cb_data);
1221 }
1222
1223 static void
1224 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1225 {
1226   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1227     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1228 }
1229 \f
1230 /* Private interface for use by session code. */
1231
1232 void
1233 dataset_set_session__ (struct dataset *ds, struct session *session)
1234 {
1235   ds->session = session;
1236 }