RANK: Add support for temporary transformations.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152
153   ds->caseinit = caseinit_create ();
154
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188
189   dataset_create_finish__ (new, old->session);
190
191   return new;
192 }
193
194 /* Destroys DS. */
195 void
196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_destroy (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds->name);
207       free (ds);
208     }
209 }
210
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
213 dataset_clear (struct dataset *ds)
214 {
215   assert (ds->proc_state == PROC_COMMITTED);
216
217   dict_clear (ds->dict);
218   fh_set_default_handle (NULL);
219
220   ds->n_lag = 0;
221
222   casereader_destroy (ds->source);
223   ds->source = NULL;
224
225   proc_cancel_all_transformations (ds);
226 }
227
228 const char *
229 dataset_name (const struct dataset *ds)
230 {
231   return ds->name;
232 }
233
234 void
235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237   struct session *session = ds->session;
238   bool active = false;
239
240   if (session != NULL)
241     {
242       active = session_active_dataset (session) == ds;
243       if (active)
244         session_set_active_dataset (session, NULL);
245       dataset_set_session (ds, NULL);
246     }
247
248   free (ds->name);
249   ds->name = xstrdup (name);
250
251   if (session != NULL)
252     {
253       dataset_set_session (ds, session);
254       if (active)
255         session_set_active_dataset (session, ds);
256     }
257 }
258
259 struct session *
260 dataset_session (const struct dataset *ds)
261 {
262   return ds->session;
263 }
264
265 void
266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268   if (session != ds->session)
269     {
270       if (ds->session != NULL)
271         session_remove_dataset (ds->session, ds);
272       if (session != NULL)
273         session_add_dataset (session, ds);
274     }
275 }
276
277 /* Returns the dictionary within DS.  This is always nonnull, although it
278    might not contain any variables. */
279 struct dictionary *
280 dataset_dict (const struct dataset *ds)
281 {
282   return ds->dict;
283 }
284
285 /* Replaces DS's dictionary by DICT, discarding any source and
286    transformations. */
287 void
288 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 {
290   assert (ds->proc_state == PROC_COMMITTED);
291   assert (ds->dict != dict);
292
293   dataset_clear (ds);
294
295   dict_destroy (ds->dict);
296   ds->dict = dict;
297   dict_set_change_callback (ds->dict, dict_callback, ds);
298 }
299
300 /* Returns the casereader that will be read when a procedure is executed on
301    DS.  This can be NULL if none has been set up yet. */
302 const struct casereader *
303 dataset_source (const struct dataset *ds)
304 {
305   return ds->source;
306 }
307
308 /* Returns true if DS has a data source, false otherwise. */
309 bool
310 dataset_has_source (const struct dataset *ds)
311 {
312   return dataset_source (ds) != NULL;
313 }
314
315 /* Replaces the active dataset's data by READER.  READER's cases must have an
316    appropriate format for DS's dictionary. */
317 bool
318 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 {
320   casereader_destroy (ds->source);
321   ds->source = reader;
322
323   caseinit_clear (ds->caseinit);
324   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325
326   return reader == NULL || !casereader_error (reader);
327 }
328
329 /* Returns the data source from DS and removes it from DS.  Returns a null
330    pointer if DS has no data source. */
331 struct casereader *
332 dataset_steal_source (struct dataset *ds)
333 {
334   struct casereader *reader = ds->source;
335   ds->source = NULL;
336
337   return reader;
338 }
339
340 /* Returns a number unique to DS.  It can be used to distinguish one dataset
341    from any other within a given program run, even datasets that do not exist
342    at the same time. */
343 unsigned int
344 dataset_seqno (const struct dataset *ds)
345 {
346   return ds->seqno;
347 }
348
349 void
350 dataset_set_callbacks (struct dataset *ds,
351                        const struct dataset_callbacks *callbacks,
352                        void *cb_data)
353 {
354   ds->callbacks = callbacks;
355   ds->cb_data = cb_data;
356 }
357
358 enum dataset_display
359 dataset_get_display (const struct dataset *ds)
360 {
361   return ds->display;
362 }
363
364 void
365 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 {
367   ds->display = display;
368 }
369 \f
370 /* Returns the last time the data was read. */
371 time_t
372 time_of_last_procedure (struct dataset *ds)
373 {
374   if (ds->last_proc_invocation == 0)
375     update_last_proc_invocation (ds);
376   return ds->last_proc_invocation;
377 }
378 \f
379 /* Regular procedure. */
380
381 /* Executes any pending transformations, if necessary.
382    This is not identical to the EXECUTE command in that it won't
383    always read the source data.  This can be important when the
384    source data is given inline within BEGIN DATA...END FILE. */
385 bool
386 proc_execute (struct dataset *ds)
387 {
388   bool ok;
389
390   if ((ds->temporary_trns_chain == NULL
391        || trns_chain_is_empty (ds->temporary_trns_chain))
392       && trns_chain_is_empty (ds->permanent_trns_chain))
393     {
394       ds->n_lag = 0;
395       ds->discard_output = false;
396       dict_set_case_limit (ds->dict, 0);
397       dict_clear_vectors (ds->dict);
398       return true;
399     }
400
401   ok = casereader_destroy (proc_open (ds));
402   return proc_commit (ds) && ok;
403 }
404
405 static const struct casereader_class proc_casereader_class;
406
407 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
408    cases filtered out with FILTER BY will not be included in the casereader
409    (which is usually desirable).  If FILTER is false, all cases will be
410    included regardless of FILTER BY settings.
411
412    proc_commit must be called when done. */
413 struct casereader *
414 proc_open_filtering (struct dataset *ds, bool filter)
415 {
416   struct casereader *reader;
417
418   assert (ds->source != NULL);
419   assert (ds->proc_state == PROC_COMMITTED);
420
421   update_last_proc_invocation (ds);
422
423   caseinit_mark_for_init (ds->caseinit, ds->dict);
424
425   /* Finish up the collection of transformations. */
426   add_case_limit_trns (ds);
427   if (filter)
428     add_filter_trns (ds);
429   trns_chain_finalize (ds->cur_trns_chain);
430
431   /* Make permanent_dict refer to the dictionary right before
432      data reaches the sink. */
433   if (ds->permanent_dict == NULL)
434     ds->permanent_dict = ds->dict;
435
436   /* Prepare sink. */
437   if (!ds->discard_output)
438     {
439       struct dictionary *pd = ds->permanent_dict;
440       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
441       if (compacted_value_cnt < dict_get_next_value_idx (pd))
442         {
443           struct caseproto *compacted_proto;
444           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
445           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
446           ds->sink = autopaging_writer_create (compacted_proto);
447           caseproto_unref (compacted_proto);
448         }
449       else
450         {
451           ds->compactor = NULL;
452           ds->sink = autopaging_writer_create (dict_get_proto (pd));
453         }
454     }
455   else
456     {
457       ds->compactor = NULL;
458       ds->sink = NULL;
459     }
460
461   /* Allocate memory for lagged cases. */
462   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463
464   ds->proc_state = PROC_OPEN;
465   ds->cases_written = 0;
466   ds->ok = true;
467
468   /* FIXME: use taint in dataset in place of `ok'? */
469   /* FIXME: for trivial cases we can just return a clone of
470      ds->source? */
471
472   /* Create casereader and insert a shim on top.  The shim allows us to
473      arbitrarily extend the casereader's lifetime, by slurping the cases into
474      the shim's buffer in proc_commit().  That is especially useful when output
475      table_items are generated directly from the procedure casereader (e.g. by
476      the LIST procedure) when we are using an output driver that keeps a
477      reference to the output items passed to it (e.g. the GUI output driver in
478      PSPPIRE). */
479   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480                                          CASENUMBER_MAX,
481                                          &proc_casereader_class, ds);
482   ds->shim = casereader_shim_insert (reader);
483   return reader;
484 }
485
486 /* Opens dataset DS for reading cases with proc_read.
487    proc_commit must be called when done. */
488 struct casereader *
489 proc_open (struct dataset *ds)
490 {
491   return proc_open_filtering (ds, true);
492 }
493
494 /* Returns true if a procedure is in progress, that is, if
495    proc_open has been called but proc_commit has not. */
496 bool
497 proc_is_open (const struct dataset *ds)
498 {
499   return ds->proc_state != PROC_COMMITTED;
500 }
501
502 /* "read" function for procedure casereader. */
503 static struct ccase *
504 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 {
506   struct dataset *ds = ds_;
507   enum trns_result retval = TRNS_DROP_CASE;
508   struct ccase *c;
509
510   assert (ds->proc_state == PROC_OPEN);
511   for (; ; case_unref (c))
512     {
513       casenumber case_nr;
514
515       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
516       if (retval == TRNS_ERROR)
517         ds->ok = false;
518       if (!ds->ok)
519         return NULL;
520
521       /* Read a case from source. */
522       c = casereader_read (ds->source);
523       if (c == NULL)
524         return NULL;
525       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
526       caseinit_init_vars (ds->caseinit, c);
527
528       /* Execute permanent transformations.  */
529       case_nr = ds->cases_written + 1;
530       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531                                    &c, case_nr);
532       caseinit_update_left_vars (ds->caseinit, c);
533       if (retval != TRNS_CONTINUE)
534         continue;
535
536       /* Write case to collection of lagged cases. */
537       if (ds->n_lag > 0)
538         {
539           while (deque_count (&ds->lag) >= ds->n_lag)
540             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542         }
543
544       /* Write case to replacement dataset. */
545       ds->cases_written++;
546       if (ds->sink != NULL)
547         casewriter_write (ds->sink,
548                           case_map_execute (ds->compactor, case_ref (c)));
549
550       /* Execute temporary transformations. */
551       if (ds->temporary_trns_chain != NULL)
552         {
553           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
554                                        &c, ds->cases_written);
555           if (retval != TRNS_CONTINUE)
556             continue;
557         }
558
559       return c;
560     }
561 }
562
563 /* "destroy" function for procedure casereader. */
564 static void
565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567   struct dataset *ds = ds_;
568   struct ccase *c;
569
570   /* We are always the subreader for a casereader_buffer, so if we're being
571      destroyed then it's because the casereader_buffer has read all the cases
572      that it ever will. */
573   ds->shim = NULL;
574
575   /* Make sure transformations happen for every input case, in
576      case they have side effects, and ensure that the replacement
577      active dataset gets all the cases it should. */
578   while ((c = casereader_read (reader)) != NULL)
579     case_unref (c);
580
581   ds->proc_state = PROC_CLOSED;
582   ds->ok = casereader_destroy (ds->source) && ds->ok;
583   ds->source = NULL;
584   dataset_set_source (ds, NULL);
585 }
586
587 /* Must return false if the source casereader, a transformation,
588    or the sink casewriter signaled an error.  (If a temporary
589    transformation signals an error, then the return value is
590    false, but the replacement active dataset may still be
591    untainted.) */
592 bool
593 proc_commit (struct dataset *ds)
594 {
595   if (ds->shim != NULL)
596     casereader_shim_slurp (ds->shim);
597
598   assert (ds->proc_state == PROC_CLOSED);
599   ds->proc_state = PROC_COMMITTED;
600
601   dataset_changed__ (ds);
602
603   /* Free memory for lagged cases. */
604   while (!deque_is_empty (&ds->lag))
605     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606   free (ds->lag_cases);
607
608   /* Dictionary from before TEMPORARY becomes permanent. */
609   proc_cancel_temporary_transformations (ds);
610
611   if (!ds->discard_output)
612     {
613       /* Finish compacting. */
614       if (ds->compactor != NULL)
615         {
616           case_map_destroy (ds->compactor);
617           ds->compactor = NULL;
618
619           dict_delete_scratch_vars (ds->dict);
620           dict_compact_values (ds->dict);
621         }
622
623       /* Old data sink becomes new data source. */
624       if (ds->sink != NULL)
625         ds->source = casewriter_make_reader (ds->sink);
626     }
627   else
628     {
629       ds->source = NULL;
630       ds->discard_output = false;
631     }
632   ds->sink = NULL;
633
634   caseinit_clear (ds->caseinit);
635   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636
637   dict_clear_vectors (ds->dict);
638   ds->permanent_dict = NULL;
639   return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644   {
645     proc_casereader_read,
646     proc_casereader_destroy,
647     NULL,
648     NULL,
649   };
650
651 /* Updates last_proc_invocation. */
652 static void
653 update_last_proc_invocation (struct dataset *ds)
654 {
655   ds->last_proc_invocation = time (NULL);
656 }
657 \f
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659    current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
661 lagged_case (const struct dataset *ds, int n_before)
662 {
663   assert (n_before >= 1);
664   assert (n_before <= ds->n_lag);
665
666   if (n_before <= deque_count (&ds->lag))
667     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668   else
669     return NULL;
670 }
671 \f
672 /* Returns the current set of permanent transformations,
673    and clears the permanent transformations.
674    For use by INPUT PROGRAM. */
675 struct trns_chain *
676 proc_capture_transformations (struct dataset *ds)
677 {
678   struct trns_chain *chain;
679
680   assert (ds->temporary_trns_chain == NULL);
681   chain = ds->permanent_trns_chain;
682   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
683   dataset_transformations_changed__ (ds, false);
684
685   return chain;
686 }
687
688 /* Adds a transformation that processes a case with PROC and
689    frees itself with FREE to the current set of transformations.
690    The functions are passed AUX as auxiliary data. */
691 void
692 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 {
694   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
695   dataset_transformations_changed__ (ds, true);
696 }
697
698 /* Adds a transformation that processes a case with PROC and
699    frees itself with FREE to the current set of transformations.
700    When parsing of the block of transformations is complete,
701    FINALIZE will be called.
702    The functions are passed AUX as auxiliary data. */
703 void
704 add_transformation_with_finalizer (struct dataset *ds,
705                                    trns_finalize_func *finalize,
706                                    trns_proc_func *proc,
707                                    trns_free_func *free, void *aux)
708 {
709   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
710   dataset_transformations_changed__ (ds, true);
711 }
712
713 /* Returns the index of the next transformation.
714    This value can be returned by a transformation procedure
715    function to indicate a "jump" to that transformation. */
716 size_t
717 next_transformation (const struct dataset *ds)
718 {
719   return trns_chain_next (ds->cur_trns_chain);
720 }
721
722 /* Returns true if the next call to add_transformation() will add
723    a temporary transformation, false if it will add a permanent
724    transformation. */
725 bool
726 proc_in_temporary_transformations (const struct dataset *ds)
727 {
728   return ds->temporary_trns_chain != NULL;
729 }
730
731 /* Marks the start of temporary transformations.
732    Further calls to add_transformation() will add temporary
733    transformations. */
734 void
735 proc_start_temporary_transformations (struct dataset *ds)
736 {
737   if (!proc_in_temporary_transformations (ds))
738     {
739       add_case_limit_trns (ds);
740
741       ds->permanent_dict = dict_clone (ds->dict);
742
743       trns_chain_finalize (ds->permanent_trns_chain);
744       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
745       dataset_transformations_changed__ (ds, true);
746     }
747 }
748
749 /* Converts all the temporary transformations, if any, to
750    permanent transformations.  Further transformations will be
751    permanent.
752    Returns true if anything changed, false otherwise. */
753 bool
754 proc_make_temporary_transformations_permanent (struct dataset *ds)
755 {
756   if (proc_in_temporary_transformations (ds))
757     {
758       trns_chain_finalize (ds->temporary_trns_chain);
759       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
760       ds->temporary_trns_chain = NULL;
761
762       dict_destroy (ds->permanent_dict);
763       ds->permanent_dict = NULL;
764
765       return true;
766     }
767   else
768     return false;
769 }
770
771 /* Cancels all temporary transformations, if any.  Further
772    transformations will be permanent.
773    Returns true if anything changed, false otherwise. */
774 bool
775 proc_cancel_temporary_transformations (struct dataset *ds)
776 {
777   if (proc_in_temporary_transformations (ds))
778     {
779       dict_destroy (ds->dict);
780       ds->dict = ds->permanent_dict;
781       ds->permanent_dict = NULL;
782
783       trns_chain_destroy (ds->temporary_trns_chain);
784       ds->temporary_trns_chain = NULL;
785       dataset_transformations_changed__ (
786         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
787       return true;
788     }
789   else
790     return false;
791 }
792
793 /* Cancels all transformations, if any.
794    Returns true if successful, false on I/O error. */
795 bool
796 proc_cancel_all_transformations (struct dataset *ds)
797 {
798   bool ok;
799   assert (ds->proc_state == PROC_COMMITTED);
800   ok = trns_chain_destroy (ds->permanent_trns_chain);
801   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
802   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
803   ds->temporary_trns_chain = NULL;
804   dataset_transformations_changed__ (ds, false);
805
806   return ok;
807 }
808
809 static int
810 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
811 {
812   struct variable *var = var_;
813
814   *cc = case_unshare (*cc);
815   case_data_rw (*cc, var)->f = case_num;
816
817   return TRNS_CONTINUE;
818 }
819
820 /* Add a variable which we can sort by to get back the original order. */
821 struct variable *
822 add_permanent_ordering_transformation (struct dataset *ds)
823 {
824   struct variable *temp_var;
825
826   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
827   if (proc_in_temporary_transformations (ds))
828     {
829       struct variable *perm_var;
830
831       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
832       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
833                          NULL, perm_var);
834       trns_chain_finalize (ds->permanent_trns_chain);
835     }
836   else
837     add_transformation (ds, store_case_num, NULL, temp_var);
838
839   return temp_var;
840 }
841 \f
842 /* Causes output from the next procedure to be discarded, instead
843    of being preserved for use as input for the next procedure. */
844 void
845 proc_discard_output (struct dataset *ds)
846 {
847   ds->discard_output = true;
848 }
849
850
851 /* Checks whether DS has a corrupted active dataset.  If so,
852    discards it and returns false.  If not, returns true without
853    doing anything. */
854 bool
855 dataset_end_of_command (struct dataset *ds)
856 {
857   if (ds->source != NULL)
858     {
859       if (casereader_error (ds->source))
860         {
861           dataset_clear (ds);
862           return false;
863         }
864       else
865         {
866           const struct taint *taint = casereader_get_taint (ds->source);
867           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
868           assert (!taint_has_tainted_successor (taint));
869         }
870     }
871   return true;
872 }
873 \f
874 static trns_proc_func case_limit_trns_proc;
875 static trns_free_func case_limit_trns_free;
876
877 /* Adds a transformation that limits the number of cases that may
878    pass through, if DS->DICT has a case limit. */
879 static void
880 add_case_limit_trns (struct dataset *ds)
881 {
882   casenumber case_limit = dict_get_case_limit (ds->dict);
883   if (case_limit != 0)
884     {
885       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
886       *cases_remaining = case_limit;
887       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
888                           cases_remaining);
889       dict_set_case_limit (ds->dict, 0);
890     }
891 }
892
893 /* Limits the maximum number of cases processed to
894    *CASES_REMAINING. */
895 static int
896 case_limit_trns_proc (void *cases_remaining_,
897                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
898 {
899   size_t *cases_remaining = cases_remaining_;
900   if (*cases_remaining > 0)
901     {
902       (*cases_remaining)--;
903       return TRNS_CONTINUE;
904     }
905   else
906     return TRNS_DROP_CASE;
907 }
908
909 /* Frees the data associated with a case limit transformation. */
910 static bool
911 case_limit_trns_free (void *cases_remaining_)
912 {
913   size_t *cases_remaining = cases_remaining_;
914   free (cases_remaining);
915   return true;
916 }
917 \f
918 static trns_proc_func filter_trns_proc;
919
920 /* Adds a temporary transformation to filter data according to
921    the variable specified on FILTER, if any. */
922 static void
923 add_filter_trns (struct dataset *ds)
924 {
925   struct variable *filter_var = dict_get_filter (ds->dict);
926   if (filter_var != NULL)
927     {
928       proc_start_temporary_transformations (ds);
929       add_transformation (ds, filter_trns_proc, NULL, filter_var);
930     }
931 }
932
933 /* FILTER transformation. */
934 static int
935 filter_trns_proc (void *filter_var_,
936                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
937
938 {
939   struct variable *filter_var = filter_var_;
940   double f = case_num (*c, filter_var);
941   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
942           ? TRNS_CONTINUE : TRNS_DROP_CASE);
943 }
944
945
946 void
947 dataset_need_lag (struct dataset *ds, int n_before)
948 {
949   ds->n_lag = MAX (ds->n_lag, n_before);
950 }
951 \f
952 static void
953 dataset_changed__ (struct dataset *ds)
954 {
955   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
956     ds->callbacks->changed (ds->cb_data);
957 }
958
959 static void
960 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
961 {
962   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
963     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
964 }
965 \f
966 /* Private interface for use by session code. */
967
968 void
969 dataset_set_session__ (struct dataset *ds, struct session *session)
970 {
971   ds->session = session;
972 }