case-map: Make creating a case_map destroy the stage.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct variable *order_var;
72   struct casewriter *sink;
73   struct trns_chain temporary_trns_chain;
74   bool temporary;
75   struct dictionary *dict;
76
77   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
78   struct trns_chain *stack;
79   size_t n_stack;
80   size_t allocated_stack;
81
82   /* If true, cases are discarded instead of being written to
83      sink. */
84   bool discard_output;
85
86   /* Time at which proc was last invoked. */
87   time_t last_proc_invocation;
88
89   /* Cases just before ("lagging") the current one. */
90   int n_lag;                    /* Number of cases to lag. */
91   struct deque lag;             /* Deque of lagged cases. */
92   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
93
94   /* Procedure data. */
95   enum
96     {
97       PROC_COMMITTED,           /* No procedure in progress. */
98       PROC_OPEN,                /* proc_open called, casereader still open. */
99       PROC_CLOSED               /* casereader from proc_open destroyed,
100                                    but proc_commit not yet called. */
101     }
102   proc_state;
103   casenumber cases_written;     /* Cases output so far. */
104   bool ok;                      /* Error status. */
105   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
106
107   const struct dataset_callbacks *callbacks;
108   void *cb_data;
109
110   /* Uniquely distinguishes datasets. */
111   unsigned int seqno;
112 };
113
114 static void dataset_changed__ (struct dataset *);
115 static void dataset_transformations_changed__ (struct dataset *,
116                                                bool non_empty);
117
118 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
119 static void cancel_measurement_level_trns (struct trns_chain *);
120 static void add_case_limit_trns (struct dataset *ds);
121 static void add_filter_trns (struct dataset *ds);
122
123 static void update_last_proc_invocation (struct dataset *ds);
124
125 static void
126 dict_callback (struct dictionary *d UNUSED, void *ds_)
127 {
128   struct dataset *ds = ds_;
129   dataset_changed__ (ds);
130 }
131 \f
132 static void
133 dataset_create_finish__ (struct dataset *ds, struct session *session)
134 {
135   static unsigned int seqno;
136
137   dict_set_change_callback (ds->dict, dict_callback, ds);
138   proc_cancel_all_transformations (ds);
139   dataset_set_session (ds, session);
140   ds->seqno = ++seqno;
141 }
142
143 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
144    SESSION already contains a dataset named NAME, it is deleted and replaced.
145    The dataset initially has an empty dictionary and no data source. */
146 struct dataset *
147 dataset_create (struct session *session, const char *name)
148 {
149   struct dataset *ds = XMALLOC (struct dataset);
150   *ds = (struct dataset) {
151     .name = xstrdup (name),
152     .display = DATASET_FRONT,
153     .dict = dict_create (get_default_encoding ()),
154     .caseinit = caseinit_create (),
155   };
156   dataset_create_finish__ (ds, session);
157
158   return ds;
159 }
160
161 /* Creates and returns a new dataset that has the same data and dictionary as
162    OLD named NAME, adds it to the same session as OLD, and returns the new
163    dataset.  If SESSION already contains a dataset named NAME, it is deleted
164    and replaced.
165
166    OLD must not have any active transformations or temporary state and must
167    not be in the middle of a procedure.
168
169    Callbacks are not cloned. */
170 struct dataset *
171 dataset_clone (struct dataset *old, const char *name)
172 {
173   struct dataset *new;
174
175   assert (old->proc_state == PROC_COMMITTED);
176   assert (!old->permanent_trns_chain.n);
177   assert (old->permanent_dict == NULL);
178   assert (old->sink == NULL);
179   assert (!old->temporary);
180   assert (!old->temporary_trns_chain.n);
181   assert (!old->n_stack);
182
183   new = xzalloc (sizeof *new);
184   new->name = xstrdup (name);
185   new->display = DATASET_FRONT;
186   new->source = casereader_clone (old->source);
187   new->dict = dict_clone (old->dict);
188   new->caseinit = caseinit_clone (old->caseinit);
189   new->last_proc_invocation = old->last_proc_invocation;
190   new->ok = old->ok;
191
192   dataset_create_finish__ (new, old->session);
193
194   return new;
195 }
196
197 /* Destroys DS. */
198 void
199 dataset_destroy (struct dataset *ds)
200 {
201   if (ds != NULL)
202     {
203       dataset_set_session (ds, NULL);
204       dataset_clear (ds);
205       dict_unref (ds->dict);
206       dict_unref (ds->permanent_dict);
207       caseinit_destroy (ds->caseinit);
208       trns_chain_uninit (&ds->permanent_trns_chain);
209       for (size_t i = 0; i < ds->n_stack; i++)
210         trns_chain_uninit (&ds->stack[i]);
211       free (ds->stack);
212       dataset_transformations_changed__ (ds, false);
213       free (ds->name);
214       free (ds);
215     }
216 }
217
218 /* Discards the active dataset's dictionary, data, and transformations. */
219 void
220 dataset_clear (struct dataset *ds)
221 {
222   assert (ds->proc_state == PROC_COMMITTED);
223
224   dict_clear (ds->dict);
225   fh_set_default_handle (NULL);
226
227   ds->n_lag = 0;
228
229   casereader_destroy (ds->source);
230   ds->source = NULL;
231
232   proc_cancel_all_transformations (ds);
233 }
234
235 const char *
236 dataset_name (const struct dataset *ds)
237 {
238   return ds->name;
239 }
240
241 void
242 dataset_set_name (struct dataset *ds, const char *name)
243 {
244   struct session *session = ds->session;
245   bool active = false;
246
247   if (session != NULL)
248     {
249       active = session_active_dataset (session) == ds;
250       if (active)
251         session_set_active_dataset (session, NULL);
252       dataset_set_session (ds, NULL);
253     }
254
255   free (ds->name);
256   ds->name = xstrdup (name);
257
258   if (session != NULL)
259     {
260       dataset_set_session (ds, session);
261       if (active)
262         session_set_active_dataset (session, ds);
263     }
264 }
265
266 struct session *
267 dataset_session (const struct dataset *ds)
268 {
269   return ds->session;
270 }
271
272 void
273 dataset_set_session (struct dataset *ds, struct session *session)
274 {
275   if (session != ds->session)
276     {
277       if (ds->session != NULL)
278         session_remove_dataset (ds->session, ds);
279       if (session != NULL)
280         session_add_dataset (session, ds);
281     }
282 }
283
284 /* Returns the dictionary within DS.  This is always nonnull, although it
285    might not contain any variables. */
286 struct dictionary *
287 dataset_dict (const struct dataset *ds)
288 {
289   return ds->dict;
290 }
291
292 /* Replaces DS's dictionary by DICT, discarding any source and
293    transformations. */
294 void
295 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
296 {
297   assert (ds->proc_state == PROC_COMMITTED);
298   assert (ds->dict != dict);
299
300   dataset_clear (ds);
301
302   dict_unref (ds->dict);
303   ds->dict = dict;
304   dict_set_change_callback (ds->dict, dict_callback, ds);
305 }
306
307 /* Returns the casereader that will be read when a procedure is executed on
308    DS.  This can be NULL if none has been set up yet. */
309 const struct casereader *
310 dataset_source (const struct dataset *ds)
311 {
312   return ds->source;
313 }
314
315 /* Returns true if DS has a data source, false otherwise. */
316 bool
317 dataset_has_source (const struct dataset *ds)
318 {
319   return dataset_source (ds) != NULL;
320 }
321
322 /* Replaces the active dataset's data by READER.  READER's cases must have an
323    appropriate format for DS's dictionary. */
324 bool
325 dataset_set_source (struct dataset *ds, struct casereader *reader)
326 {
327   casereader_destroy (ds->source);
328   ds->source = reader;
329
330   caseinit_clear (ds->caseinit);
331   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
332
333   return reader == NULL || !casereader_error (reader);
334 }
335
336 /* Returns the data source from DS and removes it from DS.  Returns a null
337    pointer if DS has no data source. */
338 struct casereader *
339 dataset_steal_source (struct dataset *ds)
340 {
341   struct casereader *reader = ds->source;
342   ds->source = NULL;
343
344   return reader;
345 }
346
347 void
348 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
349 {
350   assert (!proc_in_temporary_transformations (ds));
351   assert (!proc_has_transformations (ds));
352   assert (n < dict_get_n_vars (ds->dict));
353
354   caseinit_mark_for_init (ds->caseinit, ds->dict);
355   ds->source = caseinit_translate_casereader_to_init_vars (
356     ds->caseinit, dict_get_proto (ds->dict), ds->source);
357   caseinit_clear (ds->caseinit);
358   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
359
360   struct case_map_stage *stage = case_map_stage_create (ds->dict);
361   dict_delete_vars (ds->dict, vars, n);
362   ds->source = case_map_create_input_translator (
363     case_map_stage_to_case_map (stage), ds->source);
364   caseinit_clear (ds->caseinit);
365   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
366 }
367
368 void
369 dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
370 {
371   assert (!proc_in_temporary_transformations (ds));
372   assert (!proc_has_transformations (ds));
373   assert (n <= dict_get_n_vars (ds->dict));
374
375   caseinit_mark_for_init (ds->caseinit, ds->dict);
376   ds->source = caseinit_translate_casereader_to_init_vars (
377     ds->caseinit, dict_get_proto (ds->dict), ds->source);
378   caseinit_clear (ds->caseinit);
379   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
380
381   struct case_map_stage *stage = case_map_stage_create (ds->dict);
382   dict_reorder_vars (ds->dict, vars, n);
383   ds->source = case_map_create_input_translator (
384     case_map_stage_to_case_map (stage), ds->source);
385   caseinit_clear (ds->caseinit);
386   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
387 }
388
389 /* Returns a number unique to DS.  It can be used to distinguish one dataset
390    from any other within a given program run, even datasets that do not exist
391    at the same time. */
392 unsigned int
393 dataset_seqno (const struct dataset *ds)
394 {
395   return ds->seqno;
396 }
397
398 void
399 dataset_set_callbacks (struct dataset *ds,
400                        const struct dataset_callbacks *callbacks,
401                        void *cb_data)
402 {
403   ds->callbacks = callbacks;
404   ds->cb_data = cb_data;
405 }
406
407 enum dataset_display
408 dataset_get_display (const struct dataset *ds)
409 {
410   return ds->display;
411 }
412
413 void
414 dataset_set_display (struct dataset *ds, enum dataset_display display)
415 {
416   ds->display = display;
417 }
418 \f
419 /* Returns the last time the data was read. */
420 time_t
421 time_of_last_procedure (struct dataset *ds)
422 {
423   if (!ds)
424     return time (NULL);
425   if (ds->last_proc_invocation == 0)
426     update_last_proc_invocation (ds);
427   return ds->last_proc_invocation;
428 }
429 \f
430 /* Regular procedure. */
431
432 /* Executes any pending transformations, if necessary.
433    This is not identical to the EXECUTE command in that it won't
434    always read the source data.  This can be important when the
435    source data is given inline within BEGIN DATA...END FILE. */
436 bool
437 proc_execute (struct dataset *ds)
438 {
439   bool ok;
440
441   if ((!ds->temporary || !ds->temporary_trns_chain.n)
442       && !ds->permanent_trns_chain.n)
443     {
444       ds->n_lag = 0;
445       ds->discard_output = false;
446       dict_set_case_limit (ds->dict, 0);
447       dict_clear_vectors (ds->dict);
448       return true;
449     }
450
451   ok = casereader_destroy (proc_open (ds));
452   return proc_commit (ds) && ok;
453 }
454
455 static const struct casereader_class proc_casereader_class;
456
457 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
458    cases filtered out with FILTER BY will not be included in the casereader
459    (which is usually desirable).  If FILTER is false, all cases will be
460    included regardless of FILTER BY settings.
461
462    proc_commit must be called when done. */
463 struct casereader *
464 proc_open_filtering (struct dataset *ds, bool filter)
465 {
466   struct casereader *reader;
467
468   assert (ds->n_stack == 0);
469   assert (ds->source != NULL);
470   assert (ds->proc_state == PROC_COMMITTED);
471
472   update_last_proc_invocation (ds);
473
474   caseinit_mark_for_init (ds->caseinit, ds->dict);
475   ds->source = caseinit_translate_casereader_to_init_vars (
476     ds->caseinit, dict_get_proto (ds->dict), ds->source);
477
478   /* Finish up the collection of transformations. */
479   add_case_limit_trns (ds);
480   if (filter)
481     add_filter_trns (ds);
482   if (!proc_in_temporary_transformations (ds))
483     add_measurement_level_trns (ds, ds->dict);
484
485   /* Make permanent_dict refer to the dictionary right before
486      data reaches the sink. */
487   if (ds->permanent_dict == NULL)
488     ds->permanent_dict = ds->dict;
489
490   /* Prepare sink. */
491   if (!ds->discard_output)
492     {
493       struct dictionary *pd = dict_clone (ds->permanent_dict);
494       struct case_map_stage *stage = case_map_stage_create (pd);
495       dict_delete_scratch_vars (pd);
496       ds->sink = case_map_create_output_translator (
497         case_map_stage_to_case_map (stage),
498         autopaging_writer_create (dict_get_proto (pd)));
499       dict_unref (pd);
500     }
501   else
502     ds->sink = NULL;
503
504   /* Allocate memory for lagged cases. */
505   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
506
507   ds->proc_state = PROC_OPEN;
508   ds->cases_written = 0;
509   ds->ok = true;
510
511   /* FIXME: use taint in dataset in place of `ok'? */
512   /* FIXME: for trivial cases we can just return a clone of
513      ds->source? */
514
515   /* Create casereader and insert a shim on top.  The shim allows us to
516      arbitrarily extend the casereader's lifetime, by slurping the cases into
517      the shim's buffer in proc_commit().  That is especially useful when output
518      table_items are generated directly from the procedure casereader (e.g. by
519      the LIST procedure) when we are using an output driver that keeps a
520      reference to the output items passed to it (e.g. the GUI output driver in
521      PSPPIRE). */
522   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
523                                          CASENUMBER_MAX,
524                                          &proc_casereader_class, ds);
525   ds->shim = casereader_shim_insert (reader);
526   return reader;
527 }
528
529 /* Opens dataset DS for reading cases with proc_read.
530    proc_commit must be called when done. */
531 struct casereader *
532 proc_open (struct dataset *ds)
533 {
534   return proc_open_filtering (ds, true);
535 }
536
537 /* Returns true if a procedure is in progress, that is, if
538    proc_open has been called but proc_commit has not. */
539 bool
540 proc_is_open (const struct dataset *ds)
541 {
542   return ds->proc_state != PROC_COMMITTED;
543 }
544
545 /* "read" function for procedure casereader. */
546 static struct ccase *
547 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
548 {
549   struct dataset *ds = ds_;
550   enum trns_result retval = TRNS_DROP_CASE;
551   struct ccase *c;
552
553   assert (ds->proc_state == PROC_OPEN);
554   for (; ; case_unref (c))
555     {
556       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
557       if (retval == TRNS_ERROR)
558         ds->ok = false;
559       if (!ds->ok)
560         return NULL;
561
562       /* Read a case from source. */
563       c = casereader_read (ds->source);
564       if (c == NULL)
565         return NULL;
566       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
567       caseinit_restore_left_vars (ds->caseinit, c);
568
569       /* Execute permanent transformations.  */
570       casenumber case_nr = ds->cases_written + 1;
571       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
572       caseinit_save_left_vars (ds->caseinit, c);
573       if (retval != TRNS_CONTINUE)
574         continue;
575
576       /* Write case to collection of lagged cases. */
577       if (ds->n_lag > 0)
578         {
579           while (deque_count (&ds->lag) >= ds->n_lag)
580             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
581           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
582         }
583
584       /* Write case to replacement dataset. */
585       ds->cases_written++;
586       if (ds->sink != NULL)
587         {
588           if (ds->order_var)
589             *case_num_rw (c, ds->order_var) = case_nr;
590           casewriter_write (ds->sink, case_ref (c));
591         }
592
593       /* Execute temporary transformations. */
594       if (ds->temporary_trns_chain.n)
595         {
596           retval = trns_chain_execute (&ds->temporary_trns_chain,
597                                        ds->cases_written, &c);
598           if (retval != TRNS_CONTINUE)
599             continue;
600         }
601
602       return c;
603     }
604 }
605
606 /* "destroy" function for procedure casereader. */
607 static void
608 proc_casereader_destroy (struct casereader *reader, void *ds_)
609 {
610   struct dataset *ds = ds_;
611   struct ccase *c;
612
613   /* We are always the subreader for a casereader_buffer, so if we're being
614      destroyed then it's because the casereader_buffer has read all the cases
615      that it ever will. */
616   ds->shim = NULL;
617
618   /* Make sure transformations happen for every input case, in
619      case they have side effects, and ensure that the replacement
620      active dataset gets all the cases it should. */
621   while ((c = casereader_read (reader)) != NULL)
622     case_unref (c);
623
624   ds->proc_state = PROC_CLOSED;
625   ds->ok = casereader_destroy (ds->source) && ds->ok;
626   ds->source = NULL;
627   dataset_set_source (ds, NULL);
628 }
629
630 /* Must return false if the source casereader, a transformation,
631    or the sink casewriter signaled an error.  (If a temporary
632    transformation signals an error, then the return value is
633    false, but the replacement active dataset may still be
634    untainted.) */
635 bool
636 proc_commit (struct dataset *ds)
637 {
638   if (ds->shim != NULL)
639     casereader_shim_slurp (ds->shim);
640
641   assert (ds->proc_state == PROC_CLOSED);
642   ds->proc_state = PROC_COMMITTED;
643
644   dataset_changed__ (ds);
645
646   /* Free memory for lagged cases. */
647   while (!deque_is_empty (&ds->lag))
648     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
649   free (ds->lag_cases);
650
651   /* Dictionary from before TEMPORARY becomes permanent. */
652   proc_cancel_temporary_transformations (ds);
653   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
654
655   if (!ds->discard_output)
656     {
657       dict_delete_scratch_vars (ds->dict);
658
659       /* Old data sink becomes new data source. */
660       if (ds->sink != NULL)
661         ds->source = casewriter_make_reader (ds->sink);
662     }
663   else
664     {
665       ds->source = NULL;
666       ds->discard_output = false;
667     }
668   ds->sink = NULL;
669
670   caseinit_clear (ds->caseinit);
671   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
672
673   dict_clear_vectors (ds->dict);
674   ds->permanent_dict = NULL;
675   ds->order_var = NULL;
676   return ok;
677 }
678
679 /* Casereader class for procedure execution. */
680 static const struct casereader_class proc_casereader_class =
681   {
682     proc_casereader_read,
683     proc_casereader_destroy,
684     NULL,
685     NULL,
686   };
687
688 /* Updates last_proc_invocation. */
689 static void
690 update_last_proc_invocation (struct dataset *ds)
691 {
692   ds->last_proc_invocation = time (NULL);
693 }
694 \f
695 /* Returns a pointer to the lagged case from N_BEFORE cases before the
696    current one, or NULL if there haven't been that many cases yet. */
697 const struct ccase *
698 lagged_case (const struct dataset *ds, int n_before)
699 {
700   assert (n_before >= 1);
701   assert (n_before <= ds->n_lag);
702
703   if (n_before <= deque_count (&ds->lag))
704     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
705   else
706     return NULL;
707 }
708 \f
709 /* Adds TRNS to the current set of transformations. */
710 void
711 add_transformation (struct dataset *ds,
712                     const struct trns_class *class, void *aux)
713 {
714   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
715                               : ds->temporary ? &ds->temporary_trns_chain
716                               : &ds->permanent_trns_chain);
717   struct transformation t = { .class = class, .aux = aux };
718   trns_chain_append (chain, &t);
719   dataset_transformations_changed__ (ds, true);
720 }
721
722 /* Returns true if the next call to add_transformation() will add
723    a temporary transformation, false if it will add a permanent
724    transformation. */
725 bool
726 proc_in_temporary_transformations (const struct dataset *ds)
727 {
728   return ds->temporary;
729 }
730
731 /* Marks the start of temporary transformations.
732    Further calls to add_transformation() will add temporary
733    transformations. */
734 void
735 proc_start_temporary_transformations (struct dataset *ds)
736 {
737   assert (!ds->n_stack);
738   if (!proc_in_temporary_transformations (ds))
739     {
740       add_case_limit_trns (ds);
741
742       ds->permanent_dict = dict_clone (ds->dict);
743       add_measurement_level_trns (ds, ds->permanent_dict);
744
745       ds->temporary = true;
746       dataset_transformations_changed__ (ds, true);
747     }
748 }
749
750 /* Converts all the temporary transformations, if any, to permanent
751    transformations.  Further transformations will be permanent.
752
753    The FILTER command is implemented as a temporary transformation, so a
754    procedure that uses this function should usually use proc_open_filtering()
755    with FILTER false, instead of plain proc_open().
756
757    Returns true if anything changed, false otherwise. */
758 bool
759 proc_make_temporary_transformations_permanent (struct dataset *ds)
760 {
761   if (proc_in_temporary_transformations (ds))
762     {
763       cancel_measurement_level_trns (&ds->permanent_trns_chain);
764       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
765
766       ds->temporary = false;
767
768       dict_unref (ds->permanent_dict);
769       ds->permanent_dict = NULL;
770
771       return true;
772     }
773   else
774     return false;
775 }
776
777 /* Cancels all temporary transformations, if any.  Further
778    transformations will be permanent.
779    Returns true if anything changed, false otherwise. */
780 bool
781 proc_cancel_temporary_transformations (struct dataset *ds)
782 {
783   if (proc_in_temporary_transformations (ds))
784     {
785       trns_chain_clear (&ds->temporary_trns_chain);
786
787       dict_unref (ds->dict);
788       ds->dict = ds->permanent_dict;
789       ds->permanent_dict = NULL;
790
791       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
792       return true;
793     }
794   else
795     return false;
796 }
797
798 /* Cancels all transformations, if any.
799    Returns true if successful, false on I/O error. */
800 bool
801 proc_cancel_all_transformations (struct dataset *ds)
802 {
803   bool ok;
804   assert (ds->proc_state == PROC_COMMITTED);
805   ok = trns_chain_clear (&ds->permanent_trns_chain);
806   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
807   ds->temporary = false;
808   for (size_t i = 0; i < ds->n_stack; i++)
809     ok = trns_chain_uninit (&ds->stack[i]) && ok;
810   ds->n_stack = 0;
811   dataset_transformations_changed__ (ds, false);
812
813   return ok;
814 }
815
816 void
817 proc_push_transformations (struct dataset *ds)
818 {
819   if (ds->n_stack >= ds->allocated_stack)
820     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
821                             sizeof *ds->stack);
822   trns_chain_init (&ds->stack[ds->n_stack++]);
823 }
824
825 void
826 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
827 {
828   assert (ds->n_stack > 0);
829   *chain = ds->stack[--ds->n_stack];
830 }
831
832 bool
833 proc_has_transformations (const struct dataset *ds)
834 {
835   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
836 }
837
838 static enum trns_result
839 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
840 {
841   struct variable *var = var_;
842
843   *cc = case_unshare (*cc);
844   *case_num_rw (*cc, var) = case_num;
845
846   return TRNS_CONTINUE;
847 }
848
849 /* Add a variable $ORDERING which we can sort by to get back the original order. */
850 struct variable *
851 add_permanent_ordering_transformation (struct dataset *ds)
852 {
853   struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
854   struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
855   ds->order_var = order_var;
856
857   if (ds->permanent_dict)
858     {
859       order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
860       static const struct trns_class trns_class = {
861         .name = "ordering",
862         .execute = store_case_num
863       };
864       const struct transformation t = { .class = &trns_class, .aux = order_var };
865       trns_chain_prepend (&ds->temporary_trns_chain, &t);
866     }
867
868   return order_var;
869 }
870 \f
871 /* Causes output from the next procedure to be discarded, instead
872    of being preserved for use as input for the next procedure. */
873 void
874 proc_discard_output (struct dataset *ds)
875 {
876   ds->discard_output = true;
877 }
878
879
880 /* Checks whether DS has a corrupted active dataset.  If so,
881    discards it and returns false.  If not, returns true without
882    doing anything. */
883 bool
884 dataset_end_of_command (struct dataset *ds)
885 {
886   if (ds->source != NULL)
887     {
888       if (casereader_error (ds->source))
889         {
890           dataset_clear (ds);
891           return false;
892         }
893       else
894         {
895           const struct taint *taint = casereader_get_taint (ds->source);
896           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
897           assert (!taint_has_tainted_successor (taint));
898         }
899     }
900   return true;
901 }
902 \f
903 /* Limits the maximum number of cases processed to
904    *CASES_REMAINING. */
905 static enum trns_result
906 case_limit_trns_proc (void *cases_remaining_,
907                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
908 {
909   size_t *cases_remaining = cases_remaining_;
910   if (*cases_remaining > 0)
911     {
912       (*cases_remaining)--;
913       return TRNS_CONTINUE;
914     }
915   else
916     return TRNS_DROP_CASE;
917 }
918
919 /* Frees the data associated with a case limit transformation. */
920 static bool
921 case_limit_trns_free (void *cases_remaining_)
922 {
923   size_t *cases_remaining = cases_remaining_;
924   free (cases_remaining);
925   return true;
926 }
927
928 /* Adds a transformation that limits the number of cases that may
929    pass through, if DS->DICT has a case limit. */
930 static void
931 add_case_limit_trns (struct dataset *ds)
932 {
933   casenumber case_limit = dict_get_case_limit (ds->dict);
934   if (case_limit != 0)
935     {
936       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
937       *cases_remaining = case_limit;
938
939       static const struct trns_class trns_class = {
940         .name = "case limit",
941         .execute = case_limit_trns_proc,
942         .destroy = case_limit_trns_free,
943       };
944       add_transformation (ds, &trns_class, cases_remaining);
945
946       dict_set_case_limit (ds->dict, 0);
947     }
948 }
949
950 \f
951 /* FILTER transformation. */
952 static enum trns_result
953 filter_trns_proc (void *filter_var_,
954                   struct ccase **c, casenumber case_nr UNUSED)
955
956 {
957   struct variable *filter_var = filter_var_;
958   double f = case_num (*c, filter_var);
959   return (f != 0.0 && !var_is_num_missing (filter_var, f)
960           ? TRNS_CONTINUE : TRNS_DROP_CASE);
961 }
962
963 /* Adds a temporary transformation to filter data according to
964    the variable specified on FILTER, if any. */
965 static void
966 add_filter_trns (struct dataset *ds)
967 {
968   struct variable *filter_var = dict_get_filter (ds->dict);
969   if (filter_var != NULL)
970     {
971       proc_start_temporary_transformations (ds);
972
973       static const struct trns_class trns_class = {
974         .name = "FILTER",
975         .execute = filter_trns_proc,
976       };
977       add_transformation (ds, &trns_class, filter_var);
978     }
979 }
980
981 void
982 dataset_need_lag (struct dataset *ds, int n_before)
983 {
984   ds->n_lag = MAX (ds->n_lag, n_before);
985 }
986 \f
987 /* Measurement guesser, for guessing a measurement level from formats and
988    data. */
989
990 struct mg_value
991   {
992     struct hmap_node hmap_node;
993     double value;
994   };
995
996 struct mg_var
997   {
998     struct variable *var;
999     struct hmap *values;
1000   };
1001
1002 static void
1003 mg_var_uninit (struct mg_var *mgv)
1004 {
1005   struct mg_value *mgvalue, *next;
1006   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1007                       mgv->values)
1008     {
1009       hmap_delete (mgv->values, &mgvalue->hmap_node);
1010       free (mgvalue);
1011     }
1012   hmap_destroy (mgv->values);
1013   free (mgv->values);
1014 }
1015
1016 static enum measure
1017 mg_var_interpret (const struct mg_var *mgv)
1018 {
1019   size_t n = hmap_count (mgv->values);
1020   if (!n)
1021     {
1022       /* All missing (or no data). */
1023       return MEASURE_NOMINAL;
1024     }
1025
1026   const struct mg_value *mgvalue;
1027   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1028                  mgv->values)
1029     if (mgvalue->value < 10)
1030       return MEASURE_NOMINAL;
1031   return MEASURE_SCALE;
1032 }
1033
1034 static enum measure
1035 mg_var_add_value (struct mg_var *mgv, double value)
1036 {
1037   if (var_is_num_missing (mgv->var, value))
1038     return MEASURE_UNKNOWN;
1039   else if (value < 0 || value != floor (value))
1040     return MEASURE_SCALE;
1041
1042   size_t hash = hash_double (value, 0);
1043   struct mg_value *mgvalue;
1044   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1045                            hash, mgv->values)
1046     if (mgvalue->value == value)
1047       return MEASURE_UNKNOWN;
1048
1049   mgvalue = xmalloc (sizeof *mgvalue);
1050   mgvalue->value = value;
1051   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1052   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1053     return MEASURE_SCALE;
1054
1055   return MEASURE_UNKNOWN;
1056 }
1057
1058 struct measure_guesser
1059   {
1060     struct mg_var *vars;
1061     size_t n_vars;
1062   };
1063
1064 static struct measure_guesser *
1065 measure_guesser_create__ (struct dictionary *dict)
1066 {
1067   struct mg_var *mgvs = NULL;
1068   size_t n_mgvs = 0;
1069   size_t allocated_mgvs = 0;
1070
1071   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1072     {
1073       struct variable *var = dict_get_var (dict, i);
1074       if (var_get_measure (var) != MEASURE_UNKNOWN)
1075         continue;
1076
1077       struct fmt_spec f = var_get_print_format (var);
1078       enum measure m = var_default_measure_for_format (f.type);
1079       if (m != MEASURE_UNKNOWN)
1080         {
1081           var_set_measure (var, m);
1082           continue;
1083         }
1084
1085       if (n_mgvs >= allocated_mgvs)
1086         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1087
1088       struct mg_var *mgv = &mgvs[n_mgvs++];
1089       *mgv = (struct mg_var) {
1090         .var = var,
1091         .values = xmalloc (sizeof *mgv->values),
1092       };
1093       hmap_init (mgv->values);
1094     }
1095   if (!n_mgvs)
1096     return NULL;
1097
1098   struct measure_guesser *mg = xmalloc (sizeof *mg);
1099   *mg = (struct measure_guesser) {
1100     .vars = mgvs,
1101     .n_vars = n_mgvs,
1102   };
1103   return mg;
1104 }
1105
1106 /* Scans through DS's dictionary for variables that have an unknown measurement
1107    level.  For those, if the measurement level can be guessed based on the
1108    variable's type and format, sets a default.  If that's enough, returns NULL.
1109    If any remain whose levels are unknown and can't be guessed that way,
1110    creates and returns a structure that the caller should pass to
1111    measure_guesser_add_case() or measure_guesser_run() for guessing a
1112    measurement level based on the data.  */
1113 struct measure_guesser *
1114 measure_guesser_create (struct dataset *ds)
1115 {
1116   return measure_guesser_create__ (dataset_dict (ds));
1117 }
1118
1119 /* Adds data from case C to MG. */
1120 static void
1121 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1122 {
1123   for (size_t i = 0; i < mg->n_vars; )
1124     {
1125       struct mg_var *mgv = &mg->vars[i];
1126       double value = case_num (c, mgv->var);
1127       enum measure m = mg_var_add_value (mgv, value);
1128       if (m != MEASURE_UNKNOWN)
1129         {
1130           var_set_measure (mgv->var, m);
1131
1132           mg_var_uninit (mgv);
1133           *mgv = mg->vars[--mg->n_vars];
1134         }
1135       else
1136         i++;
1137     }
1138 }
1139
1140 /* Destroys MG. */
1141 void
1142 measure_guesser_destroy (struct measure_guesser *mg)
1143 {
1144   if (!mg)
1145     return;
1146
1147   for (size_t i = 0; i < mg->n_vars; i++)
1148     {
1149       struct mg_var *mgv = &mg->vars[i];
1150       var_set_measure (mgv->var, mg_var_interpret (mgv));
1151       mg_var_uninit (mgv);
1152     }
1153   free (mg->vars);
1154   free (mg);
1155 }
1156
1157 /* Adds final measurement levels based on MG, after all the cases have been
1158    added. */
1159 static void
1160 measure_guesser_commit (struct measure_guesser *mg)
1161 {
1162   for (size_t i = 0; i < mg->n_vars; i++)
1163     {
1164       struct mg_var *mgv = &mg->vars[i];
1165       var_set_measure (mgv->var, mg_var_interpret (mgv));
1166     }
1167 }
1168
1169 /* Passes the cases in READER through MG and uses the data in the cases to set
1170    measurement levels for the variables where they were still unknown. */
1171 void
1172 measure_guesser_run (struct measure_guesser *mg,
1173                      const struct casereader *reader)
1174 {
1175   struct casereader *r = casereader_clone (reader);
1176   while (mg->n_vars > 0)
1177     {
1178       struct ccase *c = casereader_read (r);
1179       if (!c)
1180         break;
1181       measure_guesser_add_case (mg, c);
1182       case_unref (c);
1183     }
1184   casereader_destroy (r);
1185
1186   measure_guesser_commit (mg);
1187 }
1188 \f
1189 /* A transformation for guessing measurement levels. */
1190
1191 static enum trns_result
1192 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1193 {
1194   struct measure_guesser *mg = mg_;
1195   measure_guesser_add_case (mg, *c);
1196   return TRNS_CONTINUE;
1197 }
1198
1199 static bool
1200 mg_trns_free (void *mg_)
1201 {
1202   struct measure_guesser *mg = mg_;
1203   measure_guesser_commit (mg);
1204   measure_guesser_destroy (mg);
1205   return true;
1206 }
1207
1208 static const struct trns_class mg_trns_class = {
1209   .name = "add measurement level",
1210   .execute = mg_trns_proc,
1211   .destroy = mg_trns_free,
1212 };
1213
1214 static void
1215 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1216 {
1217   struct measure_guesser *mg = measure_guesser_create__ (dict);
1218   if (mg)
1219     add_transformation (ds, &mg_trns_class, mg);
1220 }
1221
1222 static void
1223 cancel_measurement_level_trns (struct trns_chain *chain)
1224 {
1225   if (!chain->n)
1226     return;
1227
1228   struct transformation *trns = &chain->xforms[chain->n - 1];
1229   if (trns->class != &mg_trns_class)
1230     return;
1231
1232   struct measure_guesser *mg = trns->aux;
1233   measure_guesser_destroy (mg);
1234   chain->n--;
1235 }
1236 \f
1237 static void
1238 dataset_changed__ (struct dataset *ds)
1239 {
1240   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1241     ds->callbacks->changed (ds->cb_data);
1242 }
1243
1244 static void
1245 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1246 {
1247   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1248     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1249 }
1250 \f
1251 /* Private interface for use by session code. */
1252
1253 void
1254 dataset_set_session__ (struct dataset *ds, struct session *session)
1255 {
1256   ds->session = session;
1257 }