063a4a425184ff1c6c35c06717c1552d83177e58
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 void
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   assert (!proc_in_temporary_transformations (ds));
354   assert (!proc_has_transformations (ds));
355   assert (n < dict_get_n_vars (ds->dict));
356
357   caseinit_mark_for_init (ds->caseinit, ds->dict);
358   ds->source = caseinit_translate_casereader_to_init_vars (
359     ds->caseinit, dict_get_proto (ds->dict), ds->source);
360   caseinit_clear (ds->caseinit);
361   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
362
363   dict_delete_vars (ds->dict, vars, n);
364   ds->source = case_map_create_input_translator (
365     case_map_to_compact_dict (ds->dict, 0), ds->source);
366   dict_compact_values (ds->dict);
367   caseinit_clear (ds->caseinit);
368   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
369 }
370
371 /* Returns a number unique to DS.  It can be used to distinguish one dataset
372    from any other within a given program run, even datasets that do not exist
373    at the same time. */
374 unsigned int
375 dataset_seqno (const struct dataset *ds)
376 {
377   return ds->seqno;
378 }
379
380 void
381 dataset_set_callbacks (struct dataset *ds,
382                        const struct dataset_callbacks *callbacks,
383                        void *cb_data)
384 {
385   ds->callbacks = callbacks;
386   ds->cb_data = cb_data;
387 }
388
389 enum dataset_display
390 dataset_get_display (const struct dataset *ds)
391 {
392   return ds->display;
393 }
394
395 void
396 dataset_set_display (struct dataset *ds, enum dataset_display display)
397 {
398   ds->display = display;
399 }
400 \f
401 /* Returns the last time the data was read. */
402 time_t
403 time_of_last_procedure (struct dataset *ds)
404 {
405   if (!ds)
406     return time (NULL);
407   if (ds->last_proc_invocation == 0)
408     update_last_proc_invocation (ds);
409   return ds->last_proc_invocation;
410 }
411 \f
412 /* Regular procedure. */
413
414 /* Executes any pending transformations, if necessary.
415    This is not identical to the EXECUTE command in that it won't
416    always read the source data.  This can be important when the
417    source data is given inline within BEGIN DATA...END FILE. */
418 bool
419 proc_execute (struct dataset *ds)
420 {
421   bool ok;
422
423   if ((!ds->temporary || !ds->temporary_trns_chain.n)
424       && !ds->permanent_trns_chain.n)
425     {
426       ds->n_lag = 0;
427       ds->discard_output = false;
428       dict_set_case_limit (ds->dict, 0);
429       dict_clear_vectors (ds->dict);
430       return true;
431     }
432
433   ok = casereader_destroy (proc_open (ds));
434   return proc_commit (ds) && ok;
435 }
436
437 static const struct casereader_class proc_casereader_class;
438
439 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
440    cases filtered out with FILTER BY will not be included in the casereader
441    (which is usually desirable).  If FILTER is false, all cases will be
442    included regardless of FILTER BY settings.
443
444    proc_commit must be called when done. */
445 struct casereader *
446 proc_open_filtering (struct dataset *ds, bool filter)
447 {
448   struct casereader *reader;
449
450   assert (ds->n_stack == 0);
451   assert (ds->source != NULL);
452   assert (ds->proc_state == PROC_COMMITTED);
453
454   update_last_proc_invocation (ds);
455
456   caseinit_mark_for_init (ds->caseinit, ds->dict);
457   ds->source = caseinit_translate_casereader_to_init_vars (
458     ds->caseinit, dict_get_proto (ds->dict), ds->source);
459
460   /* Finish up the collection of transformations. */
461   add_case_limit_trns (ds);
462   if (filter)
463     add_filter_trns (ds);
464   if (!proc_in_temporary_transformations (ds))
465     add_measurement_level_trns (ds, ds->dict);
466
467   /* Make permanent_dict refer to the dictionary right before
468      data reaches the sink. */
469   if (ds->permanent_dict == NULL)
470     ds->permanent_dict = ds->dict;
471
472   /* Prepare sink. */
473   if (!ds->discard_output)
474     {
475       struct dictionary *pd = ds->permanent_dict;
476       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
477       assert (dict_count_values (pd, 0) == dict_get_n_vars (pd));
478       if (compacted_n_values < dict_get_n_vars (pd))
479         {
480           struct caseproto *compacted_proto;
481           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
482           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
483           ds->sink = autopaging_writer_create (compacted_proto);
484           caseproto_unref (compacted_proto);
485         }
486       else
487         {
488           ds->compactor = NULL;
489           ds->sink = autopaging_writer_create (dict_get_proto (pd));
490         }
491     }
492   else
493     {
494       ds->compactor = NULL;
495       ds->sink = NULL;
496     }
497
498   /* Allocate memory for lagged cases. */
499   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
500
501   ds->proc_state = PROC_OPEN;
502   ds->cases_written = 0;
503   ds->ok = true;
504
505   /* FIXME: use taint in dataset in place of `ok'? */
506   /* FIXME: for trivial cases we can just return a clone of
507      ds->source? */
508
509   /* Create casereader and insert a shim on top.  The shim allows us to
510      arbitrarily extend the casereader's lifetime, by slurping the cases into
511      the shim's buffer in proc_commit().  That is especially useful when output
512      table_items are generated directly from the procedure casereader (e.g. by
513      the LIST procedure) when we are using an output driver that keeps a
514      reference to the output items passed to it (e.g. the GUI output driver in
515      PSPPIRE). */
516   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
517                                          CASENUMBER_MAX,
518                                          &proc_casereader_class, ds);
519   ds->shim = casereader_shim_insert (reader);
520   return reader;
521 }
522
523 /* Opens dataset DS for reading cases with proc_read.
524    proc_commit must be called when done. */
525 struct casereader *
526 proc_open (struct dataset *ds)
527 {
528   return proc_open_filtering (ds, true);
529 }
530
531 /* Returns true if a procedure is in progress, that is, if
532    proc_open has been called but proc_commit has not. */
533 bool
534 proc_is_open (const struct dataset *ds)
535 {
536   return ds->proc_state != PROC_COMMITTED;
537 }
538
539 /* "read" function for procedure casereader. */
540 static struct ccase *
541 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
542 {
543   struct dataset *ds = ds_;
544   enum trns_result retval = TRNS_DROP_CASE;
545   struct ccase *c;
546
547   assert (ds->proc_state == PROC_OPEN);
548   for (; ; case_unref (c))
549     {
550       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
551       if (retval == TRNS_ERROR)
552         ds->ok = false;
553       if (!ds->ok)
554         return NULL;
555
556       /* Read a case from source. */
557       c = casereader_read (ds->source);
558       if (c == NULL)
559         return NULL;
560       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
561       caseinit_restore_left_vars (ds->caseinit, c);
562
563       /* Execute permanent transformations.  */
564       casenumber case_nr = ds->cases_written + 1;
565       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
566       caseinit_save_left_vars (ds->caseinit, c);
567       if (retval != TRNS_CONTINUE)
568         continue;
569
570       /* Write case to collection of lagged cases. */
571       if (ds->n_lag > 0)
572         {
573           while (deque_count (&ds->lag) >= ds->n_lag)
574             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
575           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
576         }
577
578       /* Write case to replacement dataset. */
579       ds->cases_written++;
580       if (ds->sink != NULL)
581         casewriter_write (ds->sink,
582                           case_map_execute (ds->compactor, case_ref (c)));
583
584       /* Execute temporary transformations. */
585       if (ds->temporary_trns_chain.n)
586         {
587           retval = trns_chain_execute (&ds->temporary_trns_chain,
588                                        ds->cases_written, &c);
589           if (retval != TRNS_CONTINUE)
590             continue;
591         }
592
593       return c;
594     }
595 }
596
597 /* "destroy" function for procedure casereader. */
598 static void
599 proc_casereader_destroy (struct casereader *reader, void *ds_)
600 {
601   struct dataset *ds = ds_;
602   struct ccase *c;
603
604   /* We are always the subreader for a casereader_buffer, so if we're being
605      destroyed then it's because the casereader_buffer has read all the cases
606      that it ever will. */
607   ds->shim = NULL;
608
609   /* Make sure transformations happen for every input case, in
610      case they have side effects, and ensure that the replacement
611      active dataset gets all the cases it should. */
612   while ((c = casereader_read (reader)) != NULL)
613     case_unref (c);
614
615   ds->proc_state = PROC_CLOSED;
616   ds->ok = casereader_destroy (ds->source) && ds->ok;
617   ds->source = NULL;
618   dataset_set_source (ds, NULL);
619 }
620
621 /* Must return false if the source casereader, a transformation,
622    or the sink casewriter signaled an error.  (If a temporary
623    transformation signals an error, then the return value is
624    false, but the replacement active dataset may still be
625    untainted.) */
626 bool
627 proc_commit (struct dataset *ds)
628 {
629   if (ds->shim != NULL)
630     casereader_shim_slurp (ds->shim);
631
632   assert (ds->proc_state == PROC_CLOSED);
633   ds->proc_state = PROC_COMMITTED;
634
635   dataset_changed__ (ds);
636
637   /* Free memory for lagged cases. */
638   while (!deque_is_empty (&ds->lag))
639     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
640   free (ds->lag_cases);
641
642   /* Dictionary from before TEMPORARY becomes permanent. */
643   proc_cancel_temporary_transformations (ds);
644   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
645
646   if (!ds->discard_output)
647     {
648       /* Finish compacting. */
649       if (ds->compactor != NULL)
650         {
651           case_map_destroy (ds->compactor);
652           ds->compactor = NULL;
653
654           dict_delete_scratch_vars (ds->dict);
655           dict_compact_values (ds->dict);
656         }
657
658       /* Old data sink becomes new data source. */
659       if (ds->sink != NULL)
660         ds->source = casewriter_make_reader (ds->sink);
661     }
662   else
663     {
664       ds->source = NULL;
665       ds->discard_output = false;
666     }
667   ds->sink = NULL;
668
669   caseinit_clear (ds->caseinit);
670   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
671
672   dict_clear_vectors (ds->dict);
673   ds->permanent_dict = NULL;
674   return ok;
675 }
676
677 /* Casereader class for procedure execution. */
678 static const struct casereader_class proc_casereader_class =
679   {
680     proc_casereader_read,
681     proc_casereader_destroy,
682     NULL,
683     NULL,
684   };
685
686 /* Updates last_proc_invocation. */
687 static void
688 update_last_proc_invocation (struct dataset *ds)
689 {
690   ds->last_proc_invocation = time (NULL);
691 }
692 \f
693 /* Returns a pointer to the lagged case from N_BEFORE cases before the
694    current one, or NULL if there haven't been that many cases yet. */
695 const struct ccase *
696 lagged_case (const struct dataset *ds, int n_before)
697 {
698   assert (n_before >= 1);
699   assert (n_before <= ds->n_lag);
700
701   if (n_before <= deque_count (&ds->lag))
702     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
703   else
704     return NULL;
705 }
706 \f
707 /* Adds TRNS to the current set of transformations. */
708 void
709 add_transformation (struct dataset *ds,
710                     const struct trns_class *class, void *aux)
711 {
712   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
713                               : ds->temporary ? &ds->temporary_trns_chain
714                               : &ds->permanent_trns_chain);
715   struct transformation t = { .class = class, .aux = aux };
716   trns_chain_append (chain, &t);
717   dataset_transformations_changed__ (ds, true);
718 }
719
720 /* Returns true if the next call to add_transformation() will add
721    a temporary transformation, false if it will add a permanent
722    transformation. */
723 bool
724 proc_in_temporary_transformations (const struct dataset *ds)
725 {
726   return ds->temporary;
727 }
728
729 /* Marks the start of temporary transformations.
730    Further calls to add_transformation() will add temporary
731    transformations. */
732 void
733 proc_start_temporary_transformations (struct dataset *ds)
734 {
735   assert (!ds->n_stack);
736   if (!proc_in_temporary_transformations (ds))
737     {
738       add_case_limit_trns (ds);
739
740       ds->permanent_dict = dict_clone (ds->dict);
741       add_measurement_level_trns (ds, ds->permanent_dict);
742
743       ds->temporary = true;
744       dataset_transformations_changed__ (ds, true);
745     }
746 }
747
748 /* Converts all the temporary transformations, if any, to permanent
749    transformations.  Further transformations will be permanent.
750
751    The FILTER command is implemented as a temporary transformation, so a
752    procedure that uses this function should usually use proc_open_filtering()
753    with FILTER false, instead of plain proc_open().
754
755    Returns true if anything changed, false otherwise. */
756 bool
757 proc_make_temporary_transformations_permanent (struct dataset *ds)
758 {
759   if (proc_in_temporary_transformations (ds))
760     {
761       cancel_measurement_level_trns (&ds->permanent_trns_chain);
762       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
763
764       ds->temporary = false;
765
766       dict_unref (ds->permanent_dict);
767       ds->permanent_dict = NULL;
768
769       return true;
770     }
771   else
772     return false;
773 }
774
775 /* Cancels all temporary transformations, if any.  Further
776    transformations will be permanent.
777    Returns true if anything changed, false otherwise. */
778 bool
779 proc_cancel_temporary_transformations (struct dataset *ds)
780 {
781   if (proc_in_temporary_transformations (ds))
782     {
783       trns_chain_clear (&ds->temporary_trns_chain);
784
785       dict_unref (ds->dict);
786       ds->dict = ds->permanent_dict;
787       ds->permanent_dict = NULL;
788
789       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
790       return true;
791     }
792   else
793     return false;
794 }
795
796 /* Cancels all transformations, if any.
797    Returns true if successful, false on I/O error. */
798 bool
799 proc_cancel_all_transformations (struct dataset *ds)
800 {
801   bool ok;
802   assert (ds->proc_state == PROC_COMMITTED);
803   ok = trns_chain_clear (&ds->permanent_trns_chain);
804   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
805   ds->temporary = false;
806   for (size_t i = 0; i < ds->n_stack; i++)
807     ok = trns_chain_uninit (&ds->stack[i]) && ok;
808   ds->n_stack = 0;
809   dataset_transformations_changed__ (ds, false);
810
811   return ok;
812 }
813
814 void
815 proc_push_transformations (struct dataset *ds)
816 {
817   if (ds->n_stack >= ds->allocated_stack)
818     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
819                             sizeof *ds->stack);
820   trns_chain_init (&ds->stack[ds->n_stack++]);
821 }
822
823 void
824 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
825 {
826   assert (ds->n_stack > 0);
827   *chain = ds->stack[--ds->n_stack];
828 }
829
830 bool
831 proc_has_transformations (const struct dataset *ds)
832 {
833   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
834 }
835
836 static enum trns_result
837 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
838 {
839   struct variable *var = var_;
840
841   *cc = case_unshare (*cc);
842   *case_num_rw (*cc, var) = case_num;
843
844   return TRNS_CONTINUE;
845 }
846
847 /* Add a variable which we can sort by to get back the original order. */
848 struct variable *
849 add_permanent_ordering_transformation (struct dataset *ds)
850 {
851   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
852   struct variable *order_var
853     = (proc_in_temporary_transformations (ds)
854        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
855        : temp_var);
856
857   static const struct trns_class trns_class = {
858     .name = "ordering",
859     .execute = store_case_num
860   };
861   const struct transformation t = { .class = &trns_class, .aux = order_var };
862   trns_chain_append (&ds->permanent_trns_chain, &t);
863
864   return temp_var;
865 }
866 \f
867 /* Causes output from the next procedure to be discarded, instead
868    of being preserved for use as input for the next procedure. */
869 void
870 proc_discard_output (struct dataset *ds)
871 {
872   ds->discard_output = true;
873 }
874
875
876 /* Checks whether DS has a corrupted active dataset.  If so,
877    discards it and returns false.  If not, returns true without
878    doing anything. */
879 bool
880 dataset_end_of_command (struct dataset *ds)
881 {
882   if (ds->source != NULL)
883     {
884       if (casereader_error (ds->source))
885         {
886           dataset_clear (ds);
887           return false;
888         }
889       else
890         {
891           const struct taint *taint = casereader_get_taint (ds->source);
892           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
893           assert (!taint_has_tainted_successor (taint));
894         }
895     }
896   return true;
897 }
898 \f
899 /* Limits the maximum number of cases processed to
900    *CASES_REMAINING. */
901 static enum trns_result
902 case_limit_trns_proc (void *cases_remaining_,
903                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
904 {
905   size_t *cases_remaining = cases_remaining_;
906   if (*cases_remaining > 0)
907     {
908       (*cases_remaining)--;
909       return TRNS_CONTINUE;
910     }
911   else
912     return TRNS_DROP_CASE;
913 }
914
915 /* Frees the data associated with a case limit transformation. */
916 static bool
917 case_limit_trns_free (void *cases_remaining_)
918 {
919   size_t *cases_remaining = cases_remaining_;
920   free (cases_remaining);
921   return true;
922 }
923
924 /* Adds a transformation that limits the number of cases that may
925    pass through, if DS->DICT has a case limit. */
926 static void
927 add_case_limit_trns (struct dataset *ds)
928 {
929   casenumber case_limit = dict_get_case_limit (ds->dict);
930   if (case_limit != 0)
931     {
932       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
933       *cases_remaining = case_limit;
934
935       static const struct trns_class trns_class = {
936         .name = "case limit",
937         .execute = case_limit_trns_proc,
938         .destroy = case_limit_trns_free,
939       };
940       add_transformation (ds, &trns_class, cases_remaining);
941
942       dict_set_case_limit (ds->dict, 0);
943     }
944 }
945
946 \f
947 /* FILTER transformation. */
948 static enum trns_result
949 filter_trns_proc (void *filter_var_,
950                   struct ccase **c, casenumber case_nr UNUSED)
951
952 {
953   struct variable *filter_var = filter_var_;
954   double f = case_num (*c, filter_var);
955   return (f != 0.0 && !var_is_num_missing (filter_var, f)
956           ? TRNS_CONTINUE : TRNS_DROP_CASE);
957 }
958
959 /* Adds a temporary transformation to filter data according to
960    the variable specified on FILTER, if any. */
961 static void
962 add_filter_trns (struct dataset *ds)
963 {
964   struct variable *filter_var = dict_get_filter (ds->dict);
965   if (filter_var != NULL)
966     {
967       proc_start_temporary_transformations (ds);
968
969       static const struct trns_class trns_class = {
970         .name = "FILTER",
971         .execute = filter_trns_proc,
972       };
973       add_transformation (ds, &trns_class, filter_var);
974     }
975 }
976
977 void
978 dataset_need_lag (struct dataset *ds, int n_before)
979 {
980   ds->n_lag = MAX (ds->n_lag, n_before);
981 }
982 \f
983 /* Measurement guesser, for guessing a measurement level from formats and
984    data. */
985
986 struct mg_value
987   {
988     struct hmap_node hmap_node;
989     double value;
990   };
991
992 struct mg_var
993   {
994     struct variable *var;
995     struct hmap *values;
996   };
997
998 static void
999 mg_var_uninit (struct mg_var *mgv)
1000 {
1001   struct mg_value *mgvalue, *next;
1002   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
1003                       mgv->values)
1004     {
1005       hmap_delete (mgv->values, &mgvalue->hmap_node);
1006       free (mgvalue);
1007     }
1008   hmap_destroy (mgv->values);
1009   free (mgv->values);
1010 }
1011
1012 static enum measure
1013 mg_var_interpret (const struct mg_var *mgv)
1014 {
1015   size_t n = hmap_count (mgv->values);
1016   if (!n)
1017     {
1018       /* All missing (or no data). */
1019       return MEASURE_NOMINAL;
1020     }
1021
1022   const struct mg_value *mgvalue;
1023   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1024                  mgv->values)
1025     if (mgvalue->value < 10)
1026       return MEASURE_NOMINAL;
1027   return MEASURE_SCALE;
1028 }
1029
1030 static enum measure
1031 mg_var_add_value (struct mg_var *mgv, double value)
1032 {
1033   if (var_is_num_missing (mgv->var, value))
1034     return MEASURE_UNKNOWN;
1035   else if (value < 0 || value != floor (value))
1036     return MEASURE_SCALE;
1037
1038   size_t hash = hash_double (value, 0);
1039   struct mg_value *mgvalue;
1040   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1041                            hash, mgv->values)
1042     if (mgvalue->value == value)
1043       return MEASURE_UNKNOWN;
1044
1045   mgvalue = xmalloc (sizeof *mgvalue);
1046   mgvalue->value = value;
1047   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1048   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1049     return MEASURE_SCALE;
1050
1051   return MEASURE_UNKNOWN;
1052 }
1053
1054 struct measure_guesser
1055   {
1056     struct mg_var *vars;
1057     size_t n_vars;
1058   };
1059
1060 static struct measure_guesser *
1061 measure_guesser_create__ (struct dictionary *dict)
1062 {
1063   struct mg_var *mgvs = NULL;
1064   size_t n_mgvs = 0;
1065   size_t allocated_mgvs = 0;
1066
1067   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1068     {
1069       struct variable *var = dict_get_var (dict, i);
1070       if (var_get_measure (var) != MEASURE_UNKNOWN)
1071         continue;
1072
1073       struct fmt_spec f = var_get_print_format (var);
1074       enum measure m = var_default_measure_for_format (f.type);
1075       if (m != MEASURE_UNKNOWN)
1076         {
1077           var_set_measure (var, m);
1078           continue;
1079         }
1080
1081       if (n_mgvs >= allocated_mgvs)
1082         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1083
1084       struct mg_var *mgv = &mgvs[n_mgvs++];
1085       *mgv = (struct mg_var) {
1086         .var = var,
1087         .values = xmalloc (sizeof *mgv->values),
1088       };
1089       hmap_init (mgv->values);
1090     }
1091   if (!n_mgvs)
1092     return NULL;
1093
1094   struct measure_guesser *mg = xmalloc (sizeof *mg);
1095   *mg = (struct measure_guesser) {
1096     .vars = mgvs,
1097     .n_vars = n_mgvs,
1098   };
1099   return mg;
1100 }
1101
1102 /* Scans through DS's dictionary for variables that have an unknown measurement
1103    level.  For those, if the measurement level can be guessed based on the
1104    variable's type and format, sets a default.  If that's enough, returns NULL.
1105    If any remain whose levels are unknown and can't be guessed that way,
1106    creates and returns a structure that the caller should pass to
1107    measure_guesser_add_case() or measure_guesser_run() for guessing a
1108    measurement level based on the data.  */
1109 struct measure_guesser *
1110 measure_guesser_create (struct dataset *ds)
1111 {
1112   return measure_guesser_create__ (dataset_dict (ds));
1113 }
1114
1115 /* Adds data from case C to MG. */
1116 static void
1117 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1118 {
1119   for (size_t i = 0; i < mg->n_vars; )
1120     {
1121       struct mg_var *mgv = &mg->vars[i];
1122       double value = case_num (c, mgv->var);
1123       enum measure m = mg_var_add_value (mgv, value);
1124       if (m != MEASURE_UNKNOWN)
1125         {
1126           var_set_measure (mgv->var, m);
1127
1128           mg_var_uninit (mgv);
1129           *mgv = mg->vars[--mg->n_vars];
1130         }
1131       else
1132         i++;
1133     }
1134 }
1135
1136 /* Destroys MG. */
1137 void
1138 measure_guesser_destroy (struct measure_guesser *mg)
1139 {
1140   if (!mg)
1141     return;
1142
1143   for (size_t i = 0; i < mg->n_vars; i++)
1144     {
1145       struct mg_var *mgv = &mg->vars[i];
1146       var_set_measure (mgv->var, mg_var_interpret (mgv));
1147       mg_var_uninit (mgv);
1148     }
1149   free (mg->vars);
1150   free (mg);
1151 }
1152
1153 /* Adds final measurement levels based on MG, after all the cases have been
1154    added. */
1155 static void
1156 measure_guesser_commit (struct measure_guesser *mg)
1157 {
1158   for (size_t i = 0; i < mg->n_vars; i++)
1159     {
1160       struct mg_var *mgv = &mg->vars[i];
1161       var_set_measure (mgv->var, mg_var_interpret (mgv));
1162     }
1163 }
1164
1165 /* Passes the cases in READER through MG and uses the data in the cases to set
1166    measurement levels for the variables where they were still unknown. */
1167 void
1168 measure_guesser_run (struct measure_guesser *mg,
1169                      const struct casereader *reader)
1170 {
1171   struct casereader *r = casereader_clone (reader);
1172   while (mg->n_vars > 0)
1173     {
1174       struct ccase *c = casereader_read (r);
1175       if (!c)
1176         break;
1177       measure_guesser_add_case (mg, c);
1178       case_unref (c);
1179     }
1180   casereader_destroy (r);
1181
1182   measure_guesser_commit (mg);
1183 }
1184 \f
1185 /* A transformation for guessing measurement levels. */
1186
1187 static enum trns_result
1188 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1189 {
1190   struct measure_guesser *mg = mg_;
1191   measure_guesser_add_case (mg, *c);
1192   return TRNS_CONTINUE;
1193 }
1194
1195 static bool
1196 mg_trns_free (void *mg_)
1197 {
1198   struct measure_guesser *mg = mg_;
1199   measure_guesser_commit (mg);
1200   measure_guesser_destroy (mg);
1201   return true;
1202 }
1203
1204 static const struct trns_class mg_trns_class = {
1205   .name = "add measurement level",
1206   .execute = mg_trns_proc,
1207   .destroy = mg_trns_free,
1208 };
1209
1210 static void
1211 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1212 {
1213   struct measure_guesser *mg = measure_guesser_create__ (dict);
1214   if (mg)
1215     add_transformation (ds, &mg_trns_class, mg);
1216 }
1217
1218 static void
1219 cancel_measurement_level_trns (struct trns_chain *chain)
1220 {
1221   if (!chain->n)
1222     return;
1223
1224   struct transformation *trns = &chain->xforms[chain->n - 1];
1225   if (trns->class != &mg_trns_class)
1226     return;
1227
1228   struct measure_guesser *mg = trns->aux;
1229   measure_guesser_destroy (mg);
1230   chain->n--;
1231 }
1232 \f
1233 static void
1234 dataset_changed__ (struct dataset *ds)
1235 {
1236   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1237     ds->callbacks->changed (ds->cb_data);
1238 }
1239
1240 static void
1241 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1242 {
1243   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1244     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1245 }
1246 \f
1247 /* Private interface for use by session code. */
1248
1249 void
1250 dataset_set_session__ (struct dataset *ds, struct session *session)
1251 {
1252   ds->session = session;
1253 }