72dfb8b41f54db04f27e790c84b751a3b838856e
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct variable *order_var;
72   struct casewriter *sink;
73   struct trns_chain temporary_trns_chain;
74   bool temporary;
75   struct dictionary *dict;
76
77   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
78   struct trns_chain *stack;
79   size_t n_stack;
80   size_t allocated_stack;
81
82   /* If true, cases are discarded instead of being written to
83      sink. */
84   bool discard_output;
85
86   /* The case map used to compact a case, if necessary;
87      otherwise a null pointer. */
88   struct case_map *compactor;
89
90   /* Time at which proc was last invoked. */
91   time_t last_proc_invocation;
92
93   /* Cases just before ("lagging") the current one. */
94   int n_lag;                    /* Number of cases to lag. */
95   struct deque lag;             /* Deque of lagged cases. */
96   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
97
98   /* Procedure data. */
99   enum
100     {
101       PROC_COMMITTED,           /* No procedure in progress. */
102       PROC_OPEN,                /* proc_open called, casereader still open. */
103       PROC_CLOSED               /* casereader from proc_open destroyed,
104                                    but proc_commit not yet called. */
105     }
106   proc_state;
107   casenumber cases_written;     /* Cases output so far. */
108   bool ok;                      /* Error status. */
109   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
110
111   const struct dataset_callbacks *callbacks;
112   void *cb_data;
113
114   /* Uniquely distinguishes datasets. */
115   unsigned int seqno;
116 };
117
118 static void dataset_changed__ (struct dataset *);
119 static void dataset_transformations_changed__ (struct dataset *,
120                                                bool non_empty);
121
122 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
123 static void cancel_measurement_level_trns (struct trns_chain *);
124 static void add_case_limit_trns (struct dataset *ds);
125 static void add_filter_trns (struct dataset *ds);
126
127 static void update_last_proc_invocation (struct dataset *ds);
128
129 static void
130 dict_callback (struct dictionary *d UNUSED, void *ds_)
131 {
132   struct dataset *ds = ds_;
133   dataset_changed__ (ds);
134 }
135 \f
136 static void
137 dataset_create_finish__ (struct dataset *ds, struct session *session)
138 {
139   static unsigned int seqno;
140
141   dict_set_change_callback (ds->dict, dict_callback, ds);
142   proc_cancel_all_transformations (ds);
143   dataset_set_session (ds, session);
144   ds->seqno = ++seqno;
145 }
146
147 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
148    SESSION already contains a dataset named NAME, it is deleted and replaced.
149    The dataset initially has an empty dictionary and no data source. */
150 struct dataset *
151 dataset_create (struct session *session, const char *name)
152 {
153   struct dataset *ds = XMALLOC (struct dataset);
154   *ds = (struct dataset) {
155     .name = xstrdup (name),
156     .display = DATASET_FRONT,
157     .dict = dict_create (get_default_encoding ()),
158     .caseinit = caseinit_create (),
159   };
160   dataset_create_finish__ (ds, session);
161
162   return ds;
163 }
164
165 /* Creates and returns a new dataset that has the same data and dictionary as
166    OLD named NAME, adds it to the same session as OLD, and returns the new
167    dataset.  If SESSION already contains a dataset named NAME, it is deleted
168    and replaced.
169
170    OLD must not have any active transformations or temporary state and must
171    not be in the middle of a procedure.
172
173    Callbacks are not cloned. */
174 struct dataset *
175 dataset_clone (struct dataset *old, const char *name)
176 {
177   struct dataset *new;
178
179   assert (old->proc_state == PROC_COMMITTED);
180   assert (!old->permanent_trns_chain.n);
181   assert (old->permanent_dict == NULL);
182   assert (old->sink == NULL);
183   assert (!old->temporary);
184   assert (!old->temporary_trns_chain.n);
185   assert (!old->n_stack);
186
187   new = xzalloc (sizeof *new);
188   new->name = xstrdup (name);
189   new->display = DATASET_FRONT;
190   new->source = casereader_clone (old->source);
191   new->dict = dict_clone (old->dict);
192   new->caseinit = caseinit_clone (old->caseinit);
193   new->last_proc_invocation = old->last_proc_invocation;
194   new->ok = old->ok;
195
196   dataset_create_finish__ (new, old->session);
197
198   return new;
199 }
200
201 /* Destroys DS. */
202 void
203 dataset_destroy (struct dataset *ds)
204 {
205   if (ds != NULL)
206     {
207       dataset_set_session (ds, NULL);
208       dataset_clear (ds);
209       dict_unref (ds->dict);
210       dict_unref (ds->permanent_dict);
211       caseinit_destroy (ds->caseinit);
212       trns_chain_uninit (&ds->permanent_trns_chain);
213       for (size_t i = 0; i < ds->n_stack; i++)
214         trns_chain_uninit (&ds->stack[i]);
215       free (ds->stack);
216       dataset_transformations_changed__ (ds, false);
217       free (ds->name);
218       free (ds);
219     }
220 }
221
222 /* Discards the active dataset's dictionary, data, and transformations. */
223 void
224 dataset_clear (struct dataset *ds)
225 {
226   assert (ds->proc_state == PROC_COMMITTED);
227
228   dict_clear (ds->dict);
229   fh_set_default_handle (NULL);
230
231   ds->n_lag = 0;
232
233   casereader_destroy (ds->source);
234   ds->source = NULL;
235
236   proc_cancel_all_transformations (ds);
237 }
238
239 const char *
240 dataset_name (const struct dataset *ds)
241 {
242   return ds->name;
243 }
244
245 void
246 dataset_set_name (struct dataset *ds, const char *name)
247 {
248   struct session *session = ds->session;
249   bool active = false;
250
251   if (session != NULL)
252     {
253       active = session_active_dataset (session) == ds;
254       if (active)
255         session_set_active_dataset (session, NULL);
256       dataset_set_session (ds, NULL);
257     }
258
259   free (ds->name);
260   ds->name = xstrdup (name);
261
262   if (session != NULL)
263     {
264       dataset_set_session (ds, session);
265       if (active)
266         session_set_active_dataset (session, ds);
267     }
268 }
269
270 struct session *
271 dataset_session (const struct dataset *ds)
272 {
273   return ds->session;
274 }
275
276 void
277 dataset_set_session (struct dataset *ds, struct session *session)
278 {
279   if (session != ds->session)
280     {
281       if (ds->session != NULL)
282         session_remove_dataset (ds->session, ds);
283       if (session != NULL)
284         session_add_dataset (session, ds);
285     }
286 }
287
288 /* Returns the dictionary within DS.  This is always nonnull, although it
289    might not contain any variables. */
290 struct dictionary *
291 dataset_dict (const struct dataset *ds)
292 {
293   return ds->dict;
294 }
295
296 /* Replaces DS's dictionary by DICT, discarding any source and
297    transformations. */
298 void
299 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
300 {
301   assert (ds->proc_state == PROC_COMMITTED);
302   assert (ds->dict != dict);
303
304   dataset_clear (ds);
305
306   dict_unref (ds->dict);
307   ds->dict = dict;
308   dict_set_change_callback (ds->dict, dict_callback, ds);
309 }
310
311 /* Returns the casereader that will be read when a procedure is executed on
312    DS.  This can be NULL if none has been set up yet. */
313 const struct casereader *
314 dataset_source (const struct dataset *ds)
315 {
316   return ds->source;
317 }
318
319 /* Returns true if DS has a data source, false otherwise. */
320 bool
321 dataset_has_source (const struct dataset *ds)
322 {
323   return dataset_source (ds) != NULL;
324 }
325
326 /* Replaces the active dataset's data by READER.  READER's cases must have an
327    appropriate format for DS's dictionary. */
328 bool
329 dataset_set_source (struct dataset *ds, struct casereader *reader)
330 {
331   casereader_destroy (ds->source);
332   ds->source = reader;
333
334   caseinit_clear (ds->caseinit);
335   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
336
337   return reader == NULL || !casereader_error (reader);
338 }
339
340 /* Returns the data source from DS and removes it from DS.  Returns a null
341    pointer if DS has no data source. */
342 struct casereader *
343 dataset_steal_source (struct dataset *ds)
344 {
345   struct casereader *reader = ds->source;
346   ds->source = NULL;
347
348   return reader;
349 }
350
351 void
352 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
353 {
354   assert (!proc_in_temporary_transformations (ds));
355   assert (!proc_has_transformations (ds));
356   assert (n < dict_get_n_vars (ds->dict));
357
358   caseinit_mark_for_init (ds->caseinit, ds->dict);
359   ds->source = caseinit_translate_casereader_to_init_vars (
360     ds->caseinit, dict_get_proto (ds->dict), ds->source);
361   caseinit_clear (ds->caseinit);
362   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
363
364   struct case_map_stage *stage = case_map_stage_create (ds->dict);
365   dict_delete_vars (ds->dict, vars, n);
366   ds->source = case_map_create_input_translator (
367     case_map_stage_get_case_map (stage), ds->source);
368   case_map_stage_destroy (stage);
369   caseinit_clear (ds->caseinit);
370   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
371 }
372
373 void
374 dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
375 {
376   assert (!proc_in_temporary_transformations (ds));
377   assert (!proc_has_transformations (ds));
378   assert (n <= dict_get_n_vars (ds->dict));
379
380   caseinit_mark_for_init (ds->caseinit, ds->dict);
381   ds->source = caseinit_translate_casereader_to_init_vars (
382     ds->caseinit, dict_get_proto (ds->dict), ds->source);
383   caseinit_clear (ds->caseinit);
384   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
385
386   struct case_map_stage *stage = case_map_stage_create (ds->dict);
387   dict_reorder_vars (ds->dict, vars, n);
388   ds->source = case_map_create_input_translator (
389     case_map_stage_get_case_map (stage), ds->source);
390   case_map_stage_destroy (stage);
391   caseinit_clear (ds->caseinit);
392   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
393 }
394
395 /* Returns a number unique to DS.  It can be used to distinguish one dataset
396    from any other within a given program run, even datasets that do not exist
397    at the same time. */
398 unsigned int
399 dataset_seqno (const struct dataset *ds)
400 {
401   return ds->seqno;
402 }
403
404 void
405 dataset_set_callbacks (struct dataset *ds,
406                        const struct dataset_callbacks *callbacks,
407                        void *cb_data)
408 {
409   ds->callbacks = callbacks;
410   ds->cb_data = cb_data;
411 }
412
413 enum dataset_display
414 dataset_get_display (const struct dataset *ds)
415 {
416   return ds->display;
417 }
418
419 void
420 dataset_set_display (struct dataset *ds, enum dataset_display display)
421 {
422   ds->display = display;
423 }
424 \f
425 /* Returns the last time the data was read. */
426 time_t
427 time_of_last_procedure (struct dataset *ds)
428 {
429   if (!ds)
430     return time (NULL);
431   if (ds->last_proc_invocation == 0)
432     update_last_proc_invocation (ds);
433   return ds->last_proc_invocation;
434 }
435 \f
436 /* Regular procedure. */
437
438 /* Executes any pending transformations, if necessary.
439    This is not identical to the EXECUTE command in that it won't
440    always read the source data.  This can be important when the
441    source data is given inline within BEGIN DATA...END FILE. */
442 bool
443 proc_execute (struct dataset *ds)
444 {
445   bool ok;
446
447   if ((!ds->temporary || !ds->temporary_trns_chain.n)
448       && !ds->permanent_trns_chain.n)
449     {
450       ds->n_lag = 0;
451       ds->discard_output = false;
452       dict_set_case_limit (ds->dict, 0);
453       dict_clear_vectors (ds->dict);
454       return true;
455     }
456
457   ok = casereader_destroy (proc_open (ds));
458   return proc_commit (ds) && ok;
459 }
460
461 static const struct casereader_class proc_casereader_class;
462
463 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
464    cases filtered out with FILTER BY will not be included in the casereader
465    (which is usually desirable).  If FILTER is false, all cases will be
466    included regardless of FILTER BY settings.
467
468    proc_commit must be called when done. */
469 struct casereader *
470 proc_open_filtering (struct dataset *ds, bool filter)
471 {
472   struct casereader *reader;
473
474   assert (ds->n_stack == 0);
475   assert (ds->source != NULL);
476   assert (ds->proc_state == PROC_COMMITTED);
477
478   update_last_proc_invocation (ds);
479
480   caseinit_mark_for_init (ds->caseinit, ds->dict);
481   ds->source = caseinit_translate_casereader_to_init_vars (
482     ds->caseinit, dict_get_proto (ds->dict), ds->source);
483
484   /* Finish up the collection of transformations. */
485   add_case_limit_trns (ds);
486   if (filter)
487     add_filter_trns (ds);
488   if (!proc_in_temporary_transformations (ds))
489     add_measurement_level_trns (ds, ds->dict);
490
491   /* Make permanent_dict refer to the dictionary right before
492      data reaches the sink. */
493   if (ds->permanent_dict == NULL)
494     ds->permanent_dict = ds->dict;
495
496   /* Prepare sink. */
497   if (!ds->discard_output)
498     {
499       struct dictionary *pd = dict_clone (ds->permanent_dict);
500       struct case_map_stage *stage = case_map_stage_create (pd);
501       dict_delete_scratch_vars (pd);
502       ds->compactor = case_map_stage_get_case_map (stage);
503       case_map_stage_destroy (stage);
504       ds->sink = autopaging_writer_create (dict_get_proto (pd));
505       dict_unref (pd);
506     }
507   else
508     {
509       ds->compactor = NULL;
510       ds->sink = NULL;
511     }
512
513   /* Allocate memory for lagged cases. */
514   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
515
516   ds->proc_state = PROC_OPEN;
517   ds->cases_written = 0;
518   ds->ok = true;
519
520   /* FIXME: use taint in dataset in place of `ok'? */
521   /* FIXME: for trivial cases we can just return a clone of
522      ds->source? */
523
524   /* Create casereader and insert a shim on top.  The shim allows us to
525      arbitrarily extend the casereader's lifetime, by slurping the cases into
526      the shim's buffer in proc_commit().  That is especially useful when output
527      table_items are generated directly from the procedure casereader (e.g. by
528      the LIST procedure) when we are using an output driver that keeps a
529      reference to the output items passed to it (e.g. the GUI output driver in
530      PSPPIRE). */
531   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
532                                          CASENUMBER_MAX,
533                                          &proc_casereader_class, ds);
534   ds->shim = casereader_shim_insert (reader);
535   return reader;
536 }
537
538 /* Opens dataset DS for reading cases with proc_read.
539    proc_commit must be called when done. */
540 struct casereader *
541 proc_open (struct dataset *ds)
542 {
543   return proc_open_filtering (ds, true);
544 }
545
546 /* Returns true if a procedure is in progress, that is, if
547    proc_open has been called but proc_commit has not. */
548 bool
549 proc_is_open (const struct dataset *ds)
550 {
551   return ds->proc_state != PROC_COMMITTED;
552 }
553
554 /* "read" function for procedure casereader. */
555 static struct ccase *
556 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
557 {
558   struct dataset *ds = ds_;
559   enum trns_result retval = TRNS_DROP_CASE;
560   struct ccase *c;
561
562   assert (ds->proc_state == PROC_OPEN);
563   for (; ; case_unref (c))
564     {
565       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
566       if (retval == TRNS_ERROR)
567         ds->ok = false;
568       if (!ds->ok)
569         return NULL;
570
571       /* Read a case from source. */
572       c = casereader_read (ds->source);
573       if (c == NULL)
574         return NULL;
575       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
576       caseinit_restore_left_vars (ds->caseinit, c);
577
578       /* Execute permanent transformations.  */
579       casenumber case_nr = ds->cases_written + 1;
580       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
581       caseinit_save_left_vars (ds->caseinit, c);
582       if (retval != TRNS_CONTINUE)
583         continue;
584
585       /* Write case to collection of lagged cases. */
586       if (ds->n_lag > 0)
587         {
588           while (deque_count (&ds->lag) >= ds->n_lag)
589             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
590           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
591         }
592
593       /* Write case to replacement dataset. */
594       ds->cases_written++;
595       if (ds->sink != NULL)
596         {
597           if (ds->order_var)
598             *case_num_rw (c, ds->order_var) = case_nr;
599           casewriter_write (ds->sink,
600                             case_map_execute (ds->compactor, case_ref (c)));
601         }
602
603       /* Execute temporary transformations. */
604       if (ds->temporary_trns_chain.n)
605         {
606           retval = trns_chain_execute (&ds->temporary_trns_chain,
607                                        ds->cases_written, &c);
608           if (retval != TRNS_CONTINUE)
609             continue;
610         }
611
612       return c;
613     }
614 }
615
616 /* "destroy" function for procedure casereader. */
617 static void
618 proc_casereader_destroy (struct casereader *reader, void *ds_)
619 {
620   struct dataset *ds = ds_;
621   struct ccase *c;
622
623   /* We are always the subreader for a casereader_buffer, so if we're being
624      destroyed then it's because the casereader_buffer has read all the cases
625      that it ever will. */
626   ds->shim = NULL;
627
628   /* Make sure transformations happen for every input case, in
629      case they have side effects, and ensure that the replacement
630      active dataset gets all the cases it should. */
631   while ((c = casereader_read (reader)) != NULL)
632     case_unref (c);
633
634   ds->proc_state = PROC_CLOSED;
635   ds->ok = casereader_destroy (ds->source) && ds->ok;
636   ds->source = NULL;
637   dataset_set_source (ds, NULL);
638 }
639
640 /* Must return false if the source casereader, a transformation,
641    or the sink casewriter signaled an error.  (If a temporary
642    transformation signals an error, then the return value is
643    false, but the replacement active dataset may still be
644    untainted.) */
645 bool
646 proc_commit (struct dataset *ds)
647 {
648   if (ds->shim != NULL)
649     casereader_shim_slurp (ds->shim);
650
651   assert (ds->proc_state == PROC_CLOSED);
652   ds->proc_state = PROC_COMMITTED;
653
654   dataset_changed__ (ds);
655
656   /* Free memory for lagged cases. */
657   while (!deque_is_empty (&ds->lag))
658     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
659   free (ds->lag_cases);
660
661   /* Dictionary from before TEMPORARY becomes permanent. */
662   proc_cancel_temporary_transformations (ds);
663   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
664
665   if (!ds->discard_output)
666     {
667       /* Finish compacting. */
668       if (ds->compactor != NULL)
669         {
670           case_map_destroy (ds->compactor);
671           ds->compactor = NULL;
672
673           dict_delete_scratch_vars (ds->dict);
674         }
675
676       /* Old data sink becomes new data source. */
677       if (ds->sink != NULL)
678         ds->source = casewriter_make_reader (ds->sink);
679     }
680   else
681     {
682       ds->source = NULL;
683       ds->discard_output = false;
684     }
685   ds->sink = NULL;
686
687   caseinit_clear (ds->caseinit);
688   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
689
690   dict_clear_vectors (ds->dict);
691   ds->permanent_dict = NULL;
692   ds->order_var = NULL;
693   return ok;
694 }
695
696 /* Casereader class for procedure execution. */
697 static const struct casereader_class proc_casereader_class =
698   {
699     proc_casereader_read,
700     proc_casereader_destroy,
701     NULL,
702     NULL,
703   };
704
705 /* Updates last_proc_invocation. */
706 static void
707 update_last_proc_invocation (struct dataset *ds)
708 {
709   ds->last_proc_invocation = time (NULL);
710 }
711 \f
712 /* Returns a pointer to the lagged case from N_BEFORE cases before the
713    current one, or NULL if there haven't been that many cases yet. */
714 const struct ccase *
715 lagged_case (const struct dataset *ds, int n_before)
716 {
717   assert (n_before >= 1);
718   assert (n_before <= ds->n_lag);
719
720   if (n_before <= deque_count (&ds->lag))
721     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
722   else
723     return NULL;
724 }
725 \f
726 /* Adds TRNS to the current set of transformations. */
727 void
728 add_transformation (struct dataset *ds,
729                     const struct trns_class *class, void *aux)
730 {
731   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
732                               : ds->temporary ? &ds->temporary_trns_chain
733                               : &ds->permanent_trns_chain);
734   struct transformation t = { .class = class, .aux = aux };
735   trns_chain_append (chain, &t);
736   dataset_transformations_changed__ (ds, true);
737 }
738
739 /* Returns true if the next call to add_transformation() will add
740    a temporary transformation, false if it will add a permanent
741    transformation. */
742 bool
743 proc_in_temporary_transformations (const struct dataset *ds)
744 {
745   return ds->temporary;
746 }
747
748 /* Marks the start of temporary transformations.
749    Further calls to add_transformation() will add temporary
750    transformations. */
751 void
752 proc_start_temporary_transformations (struct dataset *ds)
753 {
754   assert (!ds->n_stack);
755   if (!proc_in_temporary_transformations (ds))
756     {
757       add_case_limit_trns (ds);
758
759       ds->permanent_dict = dict_clone (ds->dict);
760       add_measurement_level_trns (ds, ds->permanent_dict);
761
762       ds->temporary = true;
763       dataset_transformations_changed__ (ds, true);
764     }
765 }
766
767 /* Converts all the temporary transformations, if any, to permanent
768    transformations.  Further transformations will be permanent.
769
770    The FILTER command is implemented as a temporary transformation, so a
771    procedure that uses this function should usually use proc_open_filtering()
772    with FILTER false, instead of plain proc_open().
773
774    Returns true if anything changed, false otherwise. */
775 bool
776 proc_make_temporary_transformations_permanent (struct dataset *ds)
777 {
778   if (proc_in_temporary_transformations (ds))
779     {
780       cancel_measurement_level_trns (&ds->permanent_trns_chain);
781       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
782
783       ds->temporary = false;
784
785       dict_unref (ds->permanent_dict);
786       ds->permanent_dict = NULL;
787
788       return true;
789     }
790   else
791     return false;
792 }
793
794 /* Cancels all temporary transformations, if any.  Further
795    transformations will be permanent.
796    Returns true if anything changed, false otherwise. */
797 bool
798 proc_cancel_temporary_transformations (struct dataset *ds)
799 {
800   if (proc_in_temporary_transformations (ds))
801     {
802       trns_chain_clear (&ds->temporary_trns_chain);
803
804       dict_unref (ds->dict);
805       ds->dict = ds->permanent_dict;
806       ds->permanent_dict = NULL;
807
808       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
809       return true;
810     }
811   else
812     return false;
813 }
814
815 /* Cancels all transformations, if any.
816    Returns true if successful, false on I/O error. */
817 bool
818 proc_cancel_all_transformations (struct dataset *ds)
819 {
820   bool ok;
821   assert (ds->proc_state == PROC_COMMITTED);
822   ok = trns_chain_clear (&ds->permanent_trns_chain);
823   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
824   ds->temporary = false;
825   for (size_t i = 0; i < ds->n_stack; i++)
826     ok = trns_chain_uninit (&ds->stack[i]) && ok;
827   ds->n_stack = 0;
828   dataset_transformations_changed__ (ds, false);
829
830   return ok;
831 }
832
833 void
834 proc_push_transformations (struct dataset *ds)
835 {
836   if (ds->n_stack >= ds->allocated_stack)
837     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
838                             sizeof *ds->stack);
839   trns_chain_init (&ds->stack[ds->n_stack++]);
840 }
841
842 void
843 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
844 {
845   assert (ds->n_stack > 0);
846   *chain = ds->stack[--ds->n_stack];
847 }
848
849 bool
850 proc_has_transformations (const struct dataset *ds)
851 {
852   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
853 }
854
855 static enum trns_result
856 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
857 {
858   struct variable *var = var_;
859
860   *cc = case_unshare (*cc);
861   *case_num_rw (*cc, var) = case_num;
862
863   return TRNS_CONTINUE;
864 }
865
866 /* Add a variable $ORDERING which we can sort by to get back the original order. */
867 struct variable *
868 add_permanent_ordering_transformation (struct dataset *ds)
869 {
870   struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
871   struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
872   ds->order_var = order_var;
873
874   if (ds->permanent_dict)
875     {
876       order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
877       static const struct trns_class trns_class = {
878         .name = "ordering",
879         .execute = store_case_num
880       };
881       const struct transformation t = { .class = &trns_class, .aux = order_var };
882       trns_chain_prepend (&ds->temporary_trns_chain, &t);
883     }
884
885   return order_var;
886 }
887 \f
888 /* Causes output from the next procedure to be discarded, instead
889    of being preserved for use as input for the next procedure. */
890 void
891 proc_discard_output (struct dataset *ds)
892 {
893   ds->discard_output = true;
894 }
895
896
897 /* Checks whether DS has a corrupted active dataset.  If so,
898    discards it and returns false.  If not, returns true without
899    doing anything. */
900 bool
901 dataset_end_of_command (struct dataset *ds)
902 {
903   if (ds->source != NULL)
904     {
905       if (casereader_error (ds->source))
906         {
907           dataset_clear (ds);
908           return false;
909         }
910       else
911         {
912           const struct taint *taint = casereader_get_taint (ds->source);
913           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
914           assert (!taint_has_tainted_successor (taint));
915         }
916     }
917   return true;
918 }
919 \f
920 /* Limits the maximum number of cases processed to
921    *CASES_REMAINING. */
922 static enum trns_result
923 case_limit_trns_proc (void *cases_remaining_,
924                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
925 {
926   size_t *cases_remaining = cases_remaining_;
927   if (*cases_remaining > 0)
928     {
929       (*cases_remaining)--;
930       return TRNS_CONTINUE;
931     }
932   else
933     return TRNS_DROP_CASE;
934 }
935
936 /* Frees the data associated with a case limit transformation. */
937 static bool
938 case_limit_trns_free (void *cases_remaining_)
939 {
940   size_t *cases_remaining = cases_remaining_;
941   free (cases_remaining);
942   return true;
943 }
944
945 /* Adds a transformation that limits the number of cases that may
946    pass through, if DS->DICT has a case limit. */
947 static void
948 add_case_limit_trns (struct dataset *ds)
949 {
950   casenumber case_limit = dict_get_case_limit (ds->dict);
951   if (case_limit != 0)
952     {
953       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
954       *cases_remaining = case_limit;
955
956       static const struct trns_class trns_class = {
957         .name = "case limit",
958         .execute = case_limit_trns_proc,
959         .destroy = case_limit_trns_free,
960       };
961       add_transformation (ds, &trns_class, cases_remaining);
962
963       dict_set_case_limit (ds->dict, 0);
964     }
965 }
966
967 \f
968 /* FILTER transformation. */
969 static enum trns_result
970 filter_trns_proc (void *filter_var_,
971                   struct ccase **c, casenumber case_nr UNUSED)
972
973 {
974   struct variable *filter_var = filter_var_;
975   double f = case_num (*c, filter_var);
976   return (f != 0.0 && !var_is_num_missing (filter_var, f)
977           ? TRNS_CONTINUE : TRNS_DROP_CASE);
978 }
979
980 /* Adds a temporary transformation to filter data according to
981    the variable specified on FILTER, if any. */
982 static void
983 add_filter_trns (struct dataset *ds)
984 {
985   struct variable *filter_var = dict_get_filter (ds->dict);
986   if (filter_var != NULL)
987     {
988       proc_start_temporary_transformations (ds);
989
990       static const struct trns_class trns_class = {
991         .name = "FILTER",
992         .execute = filter_trns_proc,
993       };
994       add_transformation (ds, &trns_class, filter_var);
995     }
996 }
997
998 void
999 dataset_need_lag (struct dataset *ds, int n_before)
1000 {
1001   ds->n_lag = MAX (ds->n_lag, n_before);
1002 }
1003 \f
1004 /* Measurement guesser, for guessing a measurement level from formats and
1005    data. */
1006
1007 struct mg_value
1008   {
1009     struct hmap_node hmap_node;
1010     double value;
1011   };
1012
1013 struct mg_var
1014   {
1015     struct variable *var;
1016     struct hmap *values;
1017   };
1018
1019 static void
1020 mg_var_uninit (struct mg_var *mgv)
1021 {
1022   struct mg_value *mgvalue, *next;
1023   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1024                       mgv->values)
1025     {
1026       hmap_delete (mgv->values, &mgvalue->hmap_node);
1027       free (mgvalue);
1028     }
1029   hmap_destroy (mgv->values);
1030   free (mgv->values);
1031 }
1032
1033 static enum measure
1034 mg_var_interpret (const struct mg_var *mgv)
1035 {
1036   size_t n = hmap_count (mgv->values);
1037   if (!n)
1038     {
1039       /* All missing (or no data). */
1040       return MEASURE_NOMINAL;
1041     }
1042
1043   const struct mg_value *mgvalue;
1044   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1045                  mgv->values)
1046     if (mgvalue->value < 10)
1047       return MEASURE_NOMINAL;
1048   return MEASURE_SCALE;
1049 }
1050
1051 static enum measure
1052 mg_var_add_value (struct mg_var *mgv, double value)
1053 {
1054   if (var_is_num_missing (mgv->var, value))
1055     return MEASURE_UNKNOWN;
1056   else if (value < 0 || value != floor (value))
1057     return MEASURE_SCALE;
1058
1059   size_t hash = hash_double (value, 0);
1060   struct mg_value *mgvalue;
1061   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1062                            hash, mgv->values)
1063     if (mgvalue->value == value)
1064       return MEASURE_UNKNOWN;
1065
1066   mgvalue = xmalloc (sizeof *mgvalue);
1067   mgvalue->value = value;
1068   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1069   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1070     return MEASURE_SCALE;
1071
1072   return MEASURE_UNKNOWN;
1073 }
1074
1075 struct measure_guesser
1076   {
1077     struct mg_var *vars;
1078     size_t n_vars;
1079   };
1080
1081 static struct measure_guesser *
1082 measure_guesser_create__ (struct dictionary *dict)
1083 {
1084   struct mg_var *mgvs = NULL;
1085   size_t n_mgvs = 0;
1086   size_t allocated_mgvs = 0;
1087
1088   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1089     {
1090       struct variable *var = dict_get_var (dict, i);
1091       if (var_get_measure (var) != MEASURE_UNKNOWN)
1092         continue;
1093
1094       struct fmt_spec f = var_get_print_format (var);
1095       enum measure m = var_default_measure_for_format (f.type);
1096       if (m != MEASURE_UNKNOWN)
1097         {
1098           var_set_measure (var, m);
1099           continue;
1100         }
1101
1102       if (n_mgvs >= allocated_mgvs)
1103         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1104
1105       struct mg_var *mgv = &mgvs[n_mgvs++];
1106       *mgv = (struct mg_var) {
1107         .var = var,
1108         .values = xmalloc (sizeof *mgv->values),
1109       };
1110       hmap_init (mgv->values);
1111     }
1112   if (!n_mgvs)
1113     return NULL;
1114
1115   struct measure_guesser *mg = xmalloc (sizeof *mg);
1116   *mg = (struct measure_guesser) {
1117     .vars = mgvs,
1118     .n_vars = n_mgvs,
1119   };
1120   return mg;
1121 }
1122
1123 /* Scans through DS's dictionary for variables that have an unknown measurement
1124    level.  For those, if the measurement level can be guessed based on the
1125    variable's type and format, sets a default.  If that's enough, returns NULL.
1126    If any remain whose levels are unknown and can't be guessed that way,
1127    creates and returns a structure that the caller should pass to
1128    measure_guesser_add_case() or measure_guesser_run() for guessing a
1129    measurement level based on the data.  */
1130 struct measure_guesser *
1131 measure_guesser_create (struct dataset *ds)
1132 {
1133   return measure_guesser_create__ (dataset_dict (ds));
1134 }
1135
1136 /* Adds data from case C to MG. */
1137 static void
1138 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1139 {
1140   for (size_t i = 0; i < mg->n_vars; )
1141     {
1142       struct mg_var *mgv = &mg->vars[i];
1143       double value = case_num (c, mgv->var);
1144       enum measure m = mg_var_add_value (mgv, value);
1145       if (m != MEASURE_UNKNOWN)
1146         {
1147           var_set_measure (mgv->var, m);
1148
1149           mg_var_uninit (mgv);
1150           *mgv = mg->vars[--mg->n_vars];
1151         }
1152       else
1153         i++;
1154     }
1155 }
1156
1157 /* Destroys MG. */
1158 void
1159 measure_guesser_destroy (struct measure_guesser *mg)
1160 {
1161   if (!mg)
1162     return;
1163
1164   for (size_t i = 0; i < mg->n_vars; i++)
1165     {
1166       struct mg_var *mgv = &mg->vars[i];
1167       var_set_measure (mgv->var, mg_var_interpret (mgv));
1168       mg_var_uninit (mgv);
1169     }
1170   free (mg->vars);
1171   free (mg);
1172 }
1173
1174 /* Adds final measurement levels based on MG, after all the cases have been
1175    added. */
1176 static void
1177 measure_guesser_commit (struct measure_guesser *mg)
1178 {
1179   for (size_t i = 0; i < mg->n_vars; i++)
1180     {
1181       struct mg_var *mgv = &mg->vars[i];
1182       var_set_measure (mgv->var, mg_var_interpret (mgv));
1183     }
1184 }
1185
1186 /* Passes the cases in READER through MG and uses the data in the cases to set
1187    measurement levels for the variables where they were still unknown. */
1188 void
1189 measure_guesser_run (struct measure_guesser *mg,
1190                      const struct casereader *reader)
1191 {
1192   struct casereader *r = casereader_clone (reader);
1193   while (mg->n_vars > 0)
1194     {
1195       struct ccase *c = casereader_read (r);
1196       if (!c)
1197         break;
1198       measure_guesser_add_case (mg, c);
1199       case_unref (c);
1200     }
1201   casereader_destroy (r);
1202
1203   measure_guesser_commit (mg);
1204 }
1205 \f
1206 /* A transformation for guessing measurement levels. */
1207
1208 static enum trns_result
1209 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1210 {
1211   struct measure_guesser *mg = mg_;
1212   measure_guesser_add_case (mg, *c);
1213   return TRNS_CONTINUE;
1214 }
1215
1216 static bool
1217 mg_trns_free (void *mg_)
1218 {
1219   struct measure_guesser *mg = mg_;
1220   measure_guesser_commit (mg);
1221   measure_guesser_destroy (mg);
1222   return true;
1223 }
1224
1225 static const struct trns_class mg_trns_class = {
1226   .name = "add measurement level",
1227   .execute = mg_trns_proc,
1228   .destroy = mg_trns_free,
1229 };
1230
1231 static void
1232 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1233 {
1234   struct measure_guesser *mg = measure_guesser_create__ (dict);
1235   if (mg)
1236     add_transformation (ds, &mg_trns_class, mg);
1237 }
1238
1239 static void
1240 cancel_measurement_level_trns (struct trns_chain *chain)
1241 {
1242   if (!chain->n)
1243     return;
1244
1245   struct transformation *trns = &chain->xforms[chain->n - 1];
1246   if (trns->class != &mg_trns_class)
1247     return;
1248
1249   struct measure_guesser *mg = trns->aux;
1250   measure_guesser_destroy (mg);
1251   chain->n--;
1252 }
1253 \f
1254 static void
1255 dataset_changed__ (struct dataset *ds)
1256 {
1257   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1258     ds->callbacks->changed (ds->cb_data);
1259 }
1260
1261 static void
1262 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1263 {
1264   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1265     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1266 }
1267 \f
1268 /* Private interface for use by session code. */
1269
1270 void
1271 dataset_set_session__ (struct dataset *ds, struct session *session)
1272 {
1273   ds->session = session;
1274 }