dataset: Replace 'compactor' by a translating casewriter.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct variable *order_var;
72   struct casewriter *sink;
73   struct trns_chain temporary_trns_chain;
74   bool temporary;
75   struct dictionary *dict;
76
77   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
78   struct trns_chain *stack;
79   size_t n_stack;
80   size_t allocated_stack;
81
82   /* If true, cases are discarded instead of being written to
83      sink. */
84   bool discard_output;
85
86   /* Time at which proc was last invoked. */
87   time_t last_proc_invocation;
88
89   /* Cases just before ("lagging") the current one. */
90   int n_lag;                    /* Number of cases to lag. */
91   struct deque lag;             /* Deque of lagged cases. */
92   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
93
94   /* Procedure data. */
95   enum
96     {
97       PROC_COMMITTED,           /* No procedure in progress. */
98       PROC_OPEN,                /* proc_open called, casereader still open. */
99       PROC_CLOSED               /* casereader from proc_open destroyed,
100                                    but proc_commit not yet called. */
101     }
102   proc_state;
103   casenumber cases_written;     /* Cases output so far. */
104   bool ok;                      /* Error status. */
105   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
106
107   const struct dataset_callbacks *callbacks;
108   void *cb_data;
109
110   /* Uniquely distinguishes datasets. */
111   unsigned int seqno;
112 };
113
114 static void dataset_changed__ (struct dataset *);
115 static void dataset_transformations_changed__ (struct dataset *,
116                                                bool non_empty);
117
118 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
119 static void cancel_measurement_level_trns (struct trns_chain *);
120 static void add_case_limit_trns (struct dataset *ds);
121 static void add_filter_trns (struct dataset *ds);
122
123 static void update_last_proc_invocation (struct dataset *ds);
124
125 static void
126 dict_callback (struct dictionary *d UNUSED, void *ds_)
127 {
128   struct dataset *ds = ds_;
129   dataset_changed__ (ds);
130 }
131 \f
132 static void
133 dataset_create_finish__ (struct dataset *ds, struct session *session)
134 {
135   static unsigned int seqno;
136
137   dict_set_change_callback (ds->dict, dict_callback, ds);
138   proc_cancel_all_transformations (ds);
139   dataset_set_session (ds, session);
140   ds->seqno = ++seqno;
141 }
142
143 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
144    SESSION already contains a dataset named NAME, it is deleted and replaced.
145    The dataset initially has an empty dictionary and no data source. */
146 struct dataset *
147 dataset_create (struct session *session, const char *name)
148 {
149   struct dataset *ds = XMALLOC (struct dataset);
150   *ds = (struct dataset) {
151     .name = xstrdup (name),
152     .display = DATASET_FRONT,
153     .dict = dict_create (get_default_encoding ()),
154     .caseinit = caseinit_create (),
155   };
156   dataset_create_finish__ (ds, session);
157
158   return ds;
159 }
160
161 /* Creates and returns a new dataset that has the same data and dictionary as
162    OLD named NAME, adds it to the same session as OLD, and returns the new
163    dataset.  If SESSION already contains a dataset named NAME, it is deleted
164    and replaced.
165
166    OLD must not have any active transformations or temporary state and must
167    not be in the middle of a procedure.
168
169    Callbacks are not cloned. */
170 struct dataset *
171 dataset_clone (struct dataset *old, const char *name)
172 {
173   struct dataset *new;
174
175   assert (old->proc_state == PROC_COMMITTED);
176   assert (!old->permanent_trns_chain.n);
177   assert (old->permanent_dict == NULL);
178   assert (old->sink == NULL);
179   assert (!old->temporary);
180   assert (!old->temporary_trns_chain.n);
181   assert (!old->n_stack);
182
183   new = xzalloc (sizeof *new);
184   new->name = xstrdup (name);
185   new->display = DATASET_FRONT;
186   new->source = casereader_clone (old->source);
187   new->dict = dict_clone (old->dict);
188   new->caseinit = caseinit_clone (old->caseinit);
189   new->last_proc_invocation = old->last_proc_invocation;
190   new->ok = old->ok;
191
192   dataset_create_finish__ (new, old->session);
193
194   return new;
195 }
196
197 /* Destroys DS. */
198 void
199 dataset_destroy (struct dataset *ds)
200 {
201   if (ds != NULL)
202     {
203       dataset_set_session (ds, NULL);
204       dataset_clear (ds);
205       dict_unref (ds->dict);
206       dict_unref (ds->permanent_dict);
207       caseinit_destroy (ds->caseinit);
208       trns_chain_uninit (&ds->permanent_trns_chain);
209       for (size_t i = 0; i < ds->n_stack; i++)
210         trns_chain_uninit (&ds->stack[i]);
211       free (ds->stack);
212       dataset_transformations_changed__ (ds, false);
213       free (ds->name);
214       free (ds);
215     }
216 }
217
218 /* Discards the active dataset's dictionary, data, and transformations. */
219 void
220 dataset_clear (struct dataset *ds)
221 {
222   assert (ds->proc_state == PROC_COMMITTED);
223
224   dict_clear (ds->dict);
225   fh_set_default_handle (NULL);
226
227   ds->n_lag = 0;
228
229   casereader_destroy (ds->source);
230   ds->source = NULL;
231
232   proc_cancel_all_transformations (ds);
233 }
234
235 const char *
236 dataset_name (const struct dataset *ds)
237 {
238   return ds->name;
239 }
240
241 void
242 dataset_set_name (struct dataset *ds, const char *name)
243 {
244   struct session *session = ds->session;
245   bool active = false;
246
247   if (session != NULL)
248     {
249       active = session_active_dataset (session) == ds;
250       if (active)
251         session_set_active_dataset (session, NULL);
252       dataset_set_session (ds, NULL);
253     }
254
255   free (ds->name);
256   ds->name = xstrdup (name);
257
258   if (session != NULL)
259     {
260       dataset_set_session (ds, session);
261       if (active)
262         session_set_active_dataset (session, ds);
263     }
264 }
265
266 struct session *
267 dataset_session (const struct dataset *ds)
268 {
269   return ds->session;
270 }
271
272 void
273 dataset_set_session (struct dataset *ds, struct session *session)
274 {
275   if (session != ds->session)
276     {
277       if (ds->session != NULL)
278         session_remove_dataset (ds->session, ds);
279       if (session != NULL)
280         session_add_dataset (session, ds);
281     }
282 }
283
284 /* Returns the dictionary within DS.  This is always nonnull, although it
285    might not contain any variables. */
286 struct dictionary *
287 dataset_dict (const struct dataset *ds)
288 {
289   return ds->dict;
290 }
291
292 /* Replaces DS's dictionary by DICT, discarding any source and
293    transformations. */
294 void
295 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
296 {
297   assert (ds->proc_state == PROC_COMMITTED);
298   assert (ds->dict != dict);
299
300   dataset_clear (ds);
301
302   dict_unref (ds->dict);
303   ds->dict = dict;
304   dict_set_change_callback (ds->dict, dict_callback, ds);
305 }
306
307 /* Returns the casereader that will be read when a procedure is executed on
308    DS.  This can be NULL if none has been set up yet. */
309 const struct casereader *
310 dataset_source (const struct dataset *ds)
311 {
312   return ds->source;
313 }
314
315 /* Returns true if DS has a data source, false otherwise. */
316 bool
317 dataset_has_source (const struct dataset *ds)
318 {
319   return dataset_source (ds) != NULL;
320 }
321
322 /* Replaces the active dataset's data by READER.  READER's cases must have an
323    appropriate format for DS's dictionary. */
324 bool
325 dataset_set_source (struct dataset *ds, struct casereader *reader)
326 {
327   casereader_destroy (ds->source);
328   ds->source = reader;
329
330   caseinit_clear (ds->caseinit);
331   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
332
333   return reader == NULL || !casereader_error (reader);
334 }
335
336 /* Returns the data source from DS and removes it from DS.  Returns a null
337    pointer if DS has no data source. */
338 struct casereader *
339 dataset_steal_source (struct dataset *ds)
340 {
341   struct casereader *reader = ds->source;
342   ds->source = NULL;
343
344   return reader;
345 }
346
347 void
348 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
349 {
350   assert (!proc_in_temporary_transformations (ds));
351   assert (!proc_has_transformations (ds));
352   assert (n < dict_get_n_vars (ds->dict));
353
354   caseinit_mark_for_init (ds->caseinit, ds->dict);
355   ds->source = caseinit_translate_casereader_to_init_vars (
356     ds->caseinit, dict_get_proto (ds->dict), ds->source);
357   caseinit_clear (ds->caseinit);
358   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
359
360   struct case_map_stage *stage = case_map_stage_create (ds->dict);
361   dict_delete_vars (ds->dict, vars, n);
362   ds->source = case_map_create_input_translator (
363     case_map_stage_get_case_map (stage), ds->source);
364   case_map_stage_destroy (stage);
365   caseinit_clear (ds->caseinit);
366   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
367 }
368
369 void
370 dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
371 {
372   assert (!proc_in_temporary_transformations (ds));
373   assert (!proc_has_transformations (ds));
374   assert (n <= dict_get_n_vars (ds->dict));
375
376   caseinit_mark_for_init (ds->caseinit, ds->dict);
377   ds->source = caseinit_translate_casereader_to_init_vars (
378     ds->caseinit, dict_get_proto (ds->dict), ds->source);
379   caseinit_clear (ds->caseinit);
380   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
381
382   struct case_map_stage *stage = case_map_stage_create (ds->dict);
383   dict_reorder_vars (ds->dict, vars, n);
384   ds->source = case_map_create_input_translator (
385     case_map_stage_get_case_map (stage), ds->source);
386   case_map_stage_destroy (stage);
387   caseinit_clear (ds->caseinit);
388   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
389 }
390
391 /* Returns a number unique to DS.  It can be used to distinguish one dataset
392    from any other within a given program run, even datasets that do not exist
393    at the same time. */
394 unsigned int
395 dataset_seqno (const struct dataset *ds)
396 {
397   return ds->seqno;
398 }
399
400 void
401 dataset_set_callbacks (struct dataset *ds,
402                        const struct dataset_callbacks *callbacks,
403                        void *cb_data)
404 {
405   ds->callbacks = callbacks;
406   ds->cb_data = cb_data;
407 }
408
409 enum dataset_display
410 dataset_get_display (const struct dataset *ds)
411 {
412   return ds->display;
413 }
414
415 void
416 dataset_set_display (struct dataset *ds, enum dataset_display display)
417 {
418   ds->display = display;
419 }
420 \f
421 /* Returns the last time the data was read. */
422 time_t
423 time_of_last_procedure (struct dataset *ds)
424 {
425   if (!ds)
426     return time (NULL);
427   if (ds->last_proc_invocation == 0)
428     update_last_proc_invocation (ds);
429   return ds->last_proc_invocation;
430 }
431 \f
432 /* Regular procedure. */
433
434 /* Executes any pending transformations, if necessary.
435    This is not identical to the EXECUTE command in that it won't
436    always read the source data.  This can be important when the
437    source data is given inline within BEGIN DATA...END FILE. */
438 bool
439 proc_execute (struct dataset *ds)
440 {
441   bool ok;
442
443   if ((!ds->temporary || !ds->temporary_trns_chain.n)
444       && !ds->permanent_trns_chain.n)
445     {
446       ds->n_lag = 0;
447       ds->discard_output = false;
448       dict_set_case_limit (ds->dict, 0);
449       dict_clear_vectors (ds->dict);
450       return true;
451     }
452
453   ok = casereader_destroy (proc_open (ds));
454   return proc_commit (ds) && ok;
455 }
456
457 static const struct casereader_class proc_casereader_class;
458
459 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
460    cases filtered out with FILTER BY will not be included in the casereader
461    (which is usually desirable).  If FILTER is false, all cases will be
462    included regardless of FILTER BY settings.
463
464    proc_commit must be called when done. */
465 struct casereader *
466 proc_open_filtering (struct dataset *ds, bool filter)
467 {
468   struct casereader *reader;
469
470   assert (ds->n_stack == 0);
471   assert (ds->source != NULL);
472   assert (ds->proc_state == PROC_COMMITTED);
473
474   update_last_proc_invocation (ds);
475
476   caseinit_mark_for_init (ds->caseinit, ds->dict);
477   ds->source = caseinit_translate_casereader_to_init_vars (
478     ds->caseinit, dict_get_proto (ds->dict), ds->source);
479
480   /* Finish up the collection of transformations. */
481   add_case_limit_trns (ds);
482   if (filter)
483     add_filter_trns (ds);
484   if (!proc_in_temporary_transformations (ds))
485     add_measurement_level_trns (ds, ds->dict);
486
487   /* Make permanent_dict refer to the dictionary right before
488      data reaches the sink. */
489   if (ds->permanent_dict == NULL)
490     ds->permanent_dict = ds->dict;
491
492   /* Prepare sink. */
493   if (!ds->discard_output)
494     {
495       struct dictionary *pd = dict_clone (ds->permanent_dict);
496       struct case_map_stage *stage = case_map_stage_create (pd);
497       dict_delete_scratch_vars (pd);
498       ds->sink = case_map_create_output_translator (
499         case_map_stage_get_case_map (stage),
500         autopaging_writer_create (dict_get_proto (pd)));
501       case_map_stage_destroy (stage);
502       dict_unref (pd);
503     }
504   else
505     ds->sink = NULL;
506
507   /* Allocate memory for lagged cases. */
508   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
509
510   ds->proc_state = PROC_OPEN;
511   ds->cases_written = 0;
512   ds->ok = true;
513
514   /* FIXME: use taint in dataset in place of `ok'? */
515   /* FIXME: for trivial cases we can just return a clone of
516      ds->source? */
517
518   /* Create casereader and insert a shim on top.  The shim allows us to
519      arbitrarily extend the casereader's lifetime, by slurping the cases into
520      the shim's buffer in proc_commit().  That is especially useful when output
521      table_items are generated directly from the procedure casereader (e.g. by
522      the LIST procedure) when we are using an output driver that keeps a
523      reference to the output items passed to it (e.g. the GUI output driver in
524      PSPPIRE). */
525   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
526                                          CASENUMBER_MAX,
527                                          &proc_casereader_class, ds);
528   ds->shim = casereader_shim_insert (reader);
529   return reader;
530 }
531
532 /* Opens dataset DS for reading cases with proc_read.
533    proc_commit must be called when done. */
534 struct casereader *
535 proc_open (struct dataset *ds)
536 {
537   return proc_open_filtering (ds, true);
538 }
539
540 /* Returns true if a procedure is in progress, that is, if
541    proc_open has been called but proc_commit has not. */
542 bool
543 proc_is_open (const struct dataset *ds)
544 {
545   return ds->proc_state != PROC_COMMITTED;
546 }
547
548 /* "read" function for procedure casereader. */
549 static struct ccase *
550 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
551 {
552   struct dataset *ds = ds_;
553   enum trns_result retval = TRNS_DROP_CASE;
554   struct ccase *c;
555
556   assert (ds->proc_state == PROC_OPEN);
557   for (; ; case_unref (c))
558     {
559       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
560       if (retval == TRNS_ERROR)
561         ds->ok = false;
562       if (!ds->ok)
563         return NULL;
564
565       /* Read a case from source. */
566       c = casereader_read (ds->source);
567       if (c == NULL)
568         return NULL;
569       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
570       caseinit_restore_left_vars (ds->caseinit, c);
571
572       /* Execute permanent transformations.  */
573       casenumber case_nr = ds->cases_written + 1;
574       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
575       caseinit_save_left_vars (ds->caseinit, c);
576       if (retval != TRNS_CONTINUE)
577         continue;
578
579       /* Write case to collection of lagged cases. */
580       if (ds->n_lag > 0)
581         {
582           while (deque_count (&ds->lag) >= ds->n_lag)
583             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
584           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
585         }
586
587       /* Write case to replacement dataset. */
588       ds->cases_written++;
589       if (ds->sink != NULL)
590         {
591           if (ds->order_var)
592             *case_num_rw (c, ds->order_var) = case_nr;
593           casewriter_write (ds->sink, case_ref (c));
594         }
595
596       /* Execute temporary transformations. */
597       if (ds->temporary_trns_chain.n)
598         {
599           retval = trns_chain_execute (&ds->temporary_trns_chain,
600                                        ds->cases_written, &c);
601           if (retval != TRNS_CONTINUE)
602             continue;
603         }
604
605       return c;
606     }
607 }
608
609 /* "destroy" function for procedure casereader. */
610 static void
611 proc_casereader_destroy (struct casereader *reader, void *ds_)
612 {
613   struct dataset *ds = ds_;
614   struct ccase *c;
615
616   /* We are always the subreader for a casereader_buffer, so if we're being
617      destroyed then it's because the casereader_buffer has read all the cases
618      that it ever will. */
619   ds->shim = NULL;
620
621   /* Make sure transformations happen for every input case, in
622      case they have side effects, and ensure that the replacement
623      active dataset gets all the cases it should. */
624   while ((c = casereader_read (reader)) != NULL)
625     case_unref (c);
626
627   ds->proc_state = PROC_CLOSED;
628   ds->ok = casereader_destroy (ds->source) && ds->ok;
629   ds->source = NULL;
630   dataset_set_source (ds, NULL);
631 }
632
633 /* Must return false if the source casereader, a transformation,
634    or the sink casewriter signaled an error.  (If a temporary
635    transformation signals an error, then the return value is
636    false, but the replacement active dataset may still be
637    untainted.) */
638 bool
639 proc_commit (struct dataset *ds)
640 {
641   if (ds->shim != NULL)
642     casereader_shim_slurp (ds->shim);
643
644   assert (ds->proc_state == PROC_CLOSED);
645   ds->proc_state = PROC_COMMITTED;
646
647   dataset_changed__ (ds);
648
649   /* Free memory for lagged cases. */
650   while (!deque_is_empty (&ds->lag))
651     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
652   free (ds->lag_cases);
653
654   /* Dictionary from before TEMPORARY becomes permanent. */
655   proc_cancel_temporary_transformations (ds);
656   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
657
658   if (!ds->discard_output)
659     {
660       dict_delete_scratch_vars (ds->dict);
661
662       /* Old data sink becomes new data source. */
663       if (ds->sink != NULL)
664         ds->source = casewriter_make_reader (ds->sink);
665     }
666   else
667     {
668       ds->source = NULL;
669       ds->discard_output = false;
670     }
671   ds->sink = NULL;
672
673   caseinit_clear (ds->caseinit);
674   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
675
676   dict_clear_vectors (ds->dict);
677   ds->permanent_dict = NULL;
678   ds->order_var = NULL;
679   return ok;
680 }
681
682 /* Casereader class for procedure execution. */
683 static const struct casereader_class proc_casereader_class =
684   {
685     proc_casereader_read,
686     proc_casereader_destroy,
687     NULL,
688     NULL,
689   };
690
691 /* Updates last_proc_invocation. */
692 static void
693 update_last_proc_invocation (struct dataset *ds)
694 {
695   ds->last_proc_invocation = time (NULL);
696 }
697 \f
698 /* Returns a pointer to the lagged case from N_BEFORE cases before the
699    current one, or NULL if there haven't been that many cases yet. */
700 const struct ccase *
701 lagged_case (const struct dataset *ds, int n_before)
702 {
703   assert (n_before >= 1);
704   assert (n_before <= ds->n_lag);
705
706   if (n_before <= deque_count (&ds->lag))
707     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
708   else
709     return NULL;
710 }
711 \f
712 /* Adds TRNS to the current set of transformations. */
713 void
714 add_transformation (struct dataset *ds,
715                     const struct trns_class *class, void *aux)
716 {
717   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
718                               : ds->temporary ? &ds->temporary_trns_chain
719                               : &ds->permanent_trns_chain);
720   struct transformation t = { .class = class, .aux = aux };
721   trns_chain_append (chain, &t);
722   dataset_transformations_changed__ (ds, true);
723 }
724
725 /* Returns true if the next call to add_transformation() will add
726    a temporary transformation, false if it will add a permanent
727    transformation. */
728 bool
729 proc_in_temporary_transformations (const struct dataset *ds)
730 {
731   return ds->temporary;
732 }
733
734 /* Marks the start of temporary transformations.
735    Further calls to add_transformation() will add temporary
736    transformations. */
737 void
738 proc_start_temporary_transformations (struct dataset *ds)
739 {
740   assert (!ds->n_stack);
741   if (!proc_in_temporary_transformations (ds))
742     {
743       add_case_limit_trns (ds);
744
745       ds->permanent_dict = dict_clone (ds->dict);
746       add_measurement_level_trns (ds, ds->permanent_dict);
747
748       ds->temporary = true;
749       dataset_transformations_changed__ (ds, true);
750     }
751 }
752
753 /* Converts all the temporary transformations, if any, to permanent
754    transformations.  Further transformations will be permanent.
755
756    The FILTER command is implemented as a temporary transformation, so a
757    procedure that uses this function should usually use proc_open_filtering()
758    with FILTER false, instead of plain proc_open().
759
760    Returns true if anything changed, false otherwise. */
761 bool
762 proc_make_temporary_transformations_permanent (struct dataset *ds)
763 {
764   if (proc_in_temporary_transformations (ds))
765     {
766       cancel_measurement_level_trns (&ds->permanent_trns_chain);
767       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
768
769       ds->temporary = false;
770
771       dict_unref (ds->permanent_dict);
772       ds->permanent_dict = NULL;
773
774       return true;
775     }
776   else
777     return false;
778 }
779
780 /* Cancels all temporary transformations, if any.  Further
781    transformations will be permanent.
782    Returns true if anything changed, false otherwise. */
783 bool
784 proc_cancel_temporary_transformations (struct dataset *ds)
785 {
786   if (proc_in_temporary_transformations (ds))
787     {
788       trns_chain_clear (&ds->temporary_trns_chain);
789
790       dict_unref (ds->dict);
791       ds->dict = ds->permanent_dict;
792       ds->permanent_dict = NULL;
793
794       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
795       return true;
796     }
797   else
798     return false;
799 }
800
801 /* Cancels all transformations, if any.
802    Returns true if successful, false on I/O error. */
803 bool
804 proc_cancel_all_transformations (struct dataset *ds)
805 {
806   bool ok;
807   assert (ds->proc_state == PROC_COMMITTED);
808   ok = trns_chain_clear (&ds->permanent_trns_chain);
809   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
810   ds->temporary = false;
811   for (size_t i = 0; i < ds->n_stack; i++)
812     ok = trns_chain_uninit (&ds->stack[i]) && ok;
813   ds->n_stack = 0;
814   dataset_transformations_changed__ (ds, false);
815
816   return ok;
817 }
818
819 void
820 proc_push_transformations (struct dataset *ds)
821 {
822   if (ds->n_stack >= ds->allocated_stack)
823     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
824                             sizeof *ds->stack);
825   trns_chain_init (&ds->stack[ds->n_stack++]);
826 }
827
828 void
829 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
830 {
831   assert (ds->n_stack > 0);
832   *chain = ds->stack[--ds->n_stack];
833 }
834
835 bool
836 proc_has_transformations (const struct dataset *ds)
837 {
838   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
839 }
840
841 static enum trns_result
842 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
843 {
844   struct variable *var = var_;
845
846   *cc = case_unshare (*cc);
847   *case_num_rw (*cc, var) = case_num;
848
849   return TRNS_CONTINUE;
850 }
851
852 /* Add a variable $ORDERING which we can sort by to get back the original order. */
853 struct variable *
854 add_permanent_ordering_transformation (struct dataset *ds)
855 {
856   struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
857   struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
858   ds->order_var = order_var;
859
860   if (ds->permanent_dict)
861     {
862       order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
863       static const struct trns_class trns_class = {
864         .name = "ordering",
865         .execute = store_case_num
866       };
867       const struct transformation t = { .class = &trns_class, .aux = order_var };
868       trns_chain_prepend (&ds->temporary_trns_chain, &t);
869     }
870
871   return order_var;
872 }
873 \f
874 /* Causes output from the next procedure to be discarded, instead
875    of being preserved for use as input for the next procedure. */
876 void
877 proc_discard_output (struct dataset *ds)
878 {
879   ds->discard_output = true;
880 }
881
882
883 /* Checks whether DS has a corrupted active dataset.  If so,
884    discards it and returns false.  If not, returns true without
885    doing anything. */
886 bool
887 dataset_end_of_command (struct dataset *ds)
888 {
889   if (ds->source != NULL)
890     {
891       if (casereader_error (ds->source))
892         {
893           dataset_clear (ds);
894           return false;
895         }
896       else
897         {
898           const struct taint *taint = casereader_get_taint (ds->source);
899           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
900           assert (!taint_has_tainted_successor (taint));
901         }
902     }
903   return true;
904 }
905 \f
906 /* Limits the maximum number of cases processed to
907    *CASES_REMAINING. */
908 static enum trns_result
909 case_limit_trns_proc (void *cases_remaining_,
910                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
911 {
912   size_t *cases_remaining = cases_remaining_;
913   if (*cases_remaining > 0)
914     {
915       (*cases_remaining)--;
916       return TRNS_CONTINUE;
917     }
918   else
919     return TRNS_DROP_CASE;
920 }
921
922 /* Frees the data associated with a case limit transformation. */
923 static bool
924 case_limit_trns_free (void *cases_remaining_)
925 {
926   size_t *cases_remaining = cases_remaining_;
927   free (cases_remaining);
928   return true;
929 }
930
931 /* Adds a transformation that limits the number of cases that may
932    pass through, if DS->DICT has a case limit. */
933 static void
934 add_case_limit_trns (struct dataset *ds)
935 {
936   casenumber case_limit = dict_get_case_limit (ds->dict);
937   if (case_limit != 0)
938     {
939       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
940       *cases_remaining = case_limit;
941
942       static const struct trns_class trns_class = {
943         .name = "case limit",
944         .execute = case_limit_trns_proc,
945         .destroy = case_limit_trns_free,
946       };
947       add_transformation (ds, &trns_class, cases_remaining);
948
949       dict_set_case_limit (ds->dict, 0);
950     }
951 }
952
953 \f
954 /* FILTER transformation. */
955 static enum trns_result
956 filter_trns_proc (void *filter_var_,
957                   struct ccase **c, casenumber case_nr UNUSED)
958
959 {
960   struct variable *filter_var = filter_var_;
961   double f = case_num (*c, filter_var);
962   return (f != 0.0 && !var_is_num_missing (filter_var, f)
963           ? TRNS_CONTINUE : TRNS_DROP_CASE);
964 }
965
966 /* Adds a temporary transformation to filter data according to
967    the variable specified on FILTER, if any. */
968 static void
969 add_filter_trns (struct dataset *ds)
970 {
971   struct variable *filter_var = dict_get_filter (ds->dict);
972   if (filter_var != NULL)
973     {
974       proc_start_temporary_transformations (ds);
975
976       static const struct trns_class trns_class = {
977         .name = "FILTER",
978         .execute = filter_trns_proc,
979       };
980       add_transformation (ds, &trns_class, filter_var);
981     }
982 }
983
984 void
985 dataset_need_lag (struct dataset *ds, int n_before)
986 {
987   ds->n_lag = MAX (ds->n_lag, n_before);
988 }
989 \f
990 /* Measurement guesser, for guessing a measurement level from formats and
991    data. */
992
993 struct mg_value
994   {
995     struct hmap_node hmap_node;
996     double value;
997   };
998
999 struct mg_var
1000   {
1001     struct variable *var;
1002     struct hmap *values;
1003   };
1004
1005 static void
1006 mg_var_uninit (struct mg_var *mgv)
1007 {
1008   struct mg_value *mgvalue, *next;
1009   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1010                       mgv->values)
1011     {
1012       hmap_delete (mgv->values, &mgvalue->hmap_node);
1013       free (mgvalue);
1014     }
1015   hmap_destroy (mgv->values);
1016   free (mgv->values);
1017 }
1018
1019 static enum measure
1020 mg_var_interpret (const struct mg_var *mgv)
1021 {
1022   size_t n = hmap_count (mgv->values);
1023   if (!n)
1024     {
1025       /* All missing (or no data). */
1026       return MEASURE_NOMINAL;
1027     }
1028
1029   const struct mg_value *mgvalue;
1030   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1031                  mgv->values)
1032     if (mgvalue->value < 10)
1033       return MEASURE_NOMINAL;
1034   return MEASURE_SCALE;
1035 }
1036
1037 static enum measure
1038 mg_var_add_value (struct mg_var *mgv, double value)
1039 {
1040   if (var_is_num_missing (mgv->var, value))
1041     return MEASURE_UNKNOWN;
1042   else if (value < 0 || value != floor (value))
1043     return MEASURE_SCALE;
1044
1045   size_t hash = hash_double (value, 0);
1046   struct mg_value *mgvalue;
1047   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1048                            hash, mgv->values)
1049     if (mgvalue->value == value)
1050       return MEASURE_UNKNOWN;
1051
1052   mgvalue = xmalloc (sizeof *mgvalue);
1053   mgvalue->value = value;
1054   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1055   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1056     return MEASURE_SCALE;
1057
1058   return MEASURE_UNKNOWN;
1059 }
1060
1061 struct measure_guesser
1062   {
1063     struct mg_var *vars;
1064     size_t n_vars;
1065   };
1066
1067 static struct measure_guesser *
1068 measure_guesser_create__ (struct dictionary *dict)
1069 {
1070   struct mg_var *mgvs = NULL;
1071   size_t n_mgvs = 0;
1072   size_t allocated_mgvs = 0;
1073
1074   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1075     {
1076       struct variable *var = dict_get_var (dict, i);
1077       if (var_get_measure (var) != MEASURE_UNKNOWN)
1078         continue;
1079
1080       struct fmt_spec f = var_get_print_format (var);
1081       enum measure m = var_default_measure_for_format (f.type);
1082       if (m != MEASURE_UNKNOWN)
1083         {
1084           var_set_measure (var, m);
1085           continue;
1086         }
1087
1088       if (n_mgvs >= allocated_mgvs)
1089         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1090
1091       struct mg_var *mgv = &mgvs[n_mgvs++];
1092       *mgv = (struct mg_var) {
1093         .var = var,
1094         .values = xmalloc (sizeof *mgv->values),
1095       };
1096       hmap_init (mgv->values);
1097     }
1098   if (!n_mgvs)
1099     return NULL;
1100
1101   struct measure_guesser *mg = xmalloc (sizeof *mg);
1102   *mg = (struct measure_guesser) {
1103     .vars = mgvs,
1104     .n_vars = n_mgvs,
1105   };
1106   return mg;
1107 }
1108
1109 /* Scans through DS's dictionary for variables that have an unknown measurement
1110    level.  For those, if the measurement level can be guessed based on the
1111    variable's type and format, sets a default.  If that's enough, returns NULL.
1112    If any remain whose levels are unknown and can't be guessed that way,
1113    creates and returns a structure that the caller should pass to
1114    measure_guesser_add_case() or measure_guesser_run() for guessing a
1115    measurement level based on the data.  */
1116 struct measure_guesser *
1117 measure_guesser_create (struct dataset *ds)
1118 {
1119   return measure_guesser_create__ (dataset_dict (ds));
1120 }
1121
1122 /* Adds data from case C to MG. */
1123 static void
1124 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1125 {
1126   for (size_t i = 0; i < mg->n_vars; )
1127     {
1128       struct mg_var *mgv = &mg->vars[i];
1129       double value = case_num (c, mgv->var);
1130       enum measure m = mg_var_add_value (mgv, value);
1131       if (m != MEASURE_UNKNOWN)
1132         {
1133           var_set_measure (mgv->var, m);
1134
1135           mg_var_uninit (mgv);
1136           *mgv = mg->vars[--mg->n_vars];
1137         }
1138       else
1139         i++;
1140     }
1141 }
1142
1143 /* Destroys MG. */
1144 void
1145 measure_guesser_destroy (struct measure_guesser *mg)
1146 {
1147   if (!mg)
1148     return;
1149
1150   for (size_t i = 0; i < mg->n_vars; i++)
1151     {
1152       struct mg_var *mgv = &mg->vars[i];
1153       var_set_measure (mgv->var, mg_var_interpret (mgv));
1154       mg_var_uninit (mgv);
1155     }
1156   free (mg->vars);
1157   free (mg);
1158 }
1159
1160 /* Adds final measurement levels based on MG, after all the cases have been
1161    added. */
1162 static void
1163 measure_guesser_commit (struct measure_guesser *mg)
1164 {
1165   for (size_t i = 0; i < mg->n_vars; i++)
1166     {
1167       struct mg_var *mgv = &mg->vars[i];
1168       var_set_measure (mgv->var, mg_var_interpret (mgv));
1169     }
1170 }
1171
1172 /* Passes the cases in READER through MG and uses the data in the cases to set
1173    measurement levels for the variables where they were still unknown. */
1174 void
1175 measure_guesser_run (struct measure_guesser *mg,
1176                      const struct casereader *reader)
1177 {
1178   struct casereader *r = casereader_clone (reader);
1179   while (mg->n_vars > 0)
1180     {
1181       struct ccase *c = casereader_read (r);
1182       if (!c)
1183         break;
1184       measure_guesser_add_case (mg, c);
1185       case_unref (c);
1186     }
1187   casereader_destroy (r);
1188
1189   measure_guesser_commit (mg);
1190 }
1191 \f
1192 /* A transformation for guessing measurement levels. */
1193
1194 static enum trns_result
1195 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1196 {
1197   struct measure_guesser *mg = mg_;
1198   measure_guesser_add_case (mg, *c);
1199   return TRNS_CONTINUE;
1200 }
1201
1202 static bool
1203 mg_trns_free (void *mg_)
1204 {
1205   struct measure_guesser *mg = mg_;
1206   measure_guesser_commit (mg);
1207   measure_guesser_destroy (mg);
1208   return true;
1209 }
1210
1211 static const struct trns_class mg_trns_class = {
1212   .name = "add measurement level",
1213   .execute = mg_trns_proc,
1214   .destroy = mg_trns_free,
1215 };
1216
1217 static void
1218 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1219 {
1220   struct measure_guesser *mg = measure_guesser_create__ (dict);
1221   if (mg)
1222     add_transformation (ds, &mg_trns_class, mg);
1223 }
1224
1225 static void
1226 cancel_measurement_level_trns (struct trns_chain *chain)
1227 {
1228   if (!chain->n)
1229     return;
1230
1231   struct transformation *trns = &chain->xforms[chain->n - 1];
1232   if (trns->class != &mg_trns_class)
1233     return;
1234
1235   struct measure_guesser *mg = trns->aux;
1236   measure_guesser_destroy (mg);
1237   chain->n--;
1238 }
1239 \f
1240 static void
1241 dataset_changed__ (struct dataset *ds)
1242 {
1243   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1244     ds->callbacks->changed (ds->cb_data);
1245 }
1246
1247 static void
1248 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1249 {
1250   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1251     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1252 }
1253 \f
1254 /* Private interface for use by session code. */
1255
1256 void
1257 dataset_set_session__ (struct dataset *ds, struct session *session)
1258 {
1259   ds->session = session;
1260 }