5b3f0ba5fb3a1b65c241f8757af77d08bfca2de9
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152
153   ds->caseinit = caseinit_create ();
154
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188
189   dataset_create_finish__ (new, old->session);
190
191   return new;
192 }
193
194 /* Destroys DS. */
195 void
196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_destroy (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds->name);
207       free (ds);
208     }
209 }
210
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
213 dataset_clear (struct dataset *ds)
214 {
215   assert (ds->proc_state == PROC_COMMITTED);
216
217   dict_clear (ds->dict);
218   fh_set_default_handle (NULL);
219
220   ds->n_lag = 0;
221
222   casereader_destroy (ds->source);
223   ds->source = NULL;
224
225   proc_cancel_all_transformations (ds);
226 }
227
228 const char *
229 dataset_name (const struct dataset *ds)
230 {
231   return ds->name;
232 }
233
234 void
235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237   struct session *session = ds->session;
238   bool active = false;
239
240   if (session != NULL)
241     {
242       active = session_active_dataset (session) == ds;
243       if (active)
244         session_set_active_dataset (session, NULL);
245       dataset_set_session (ds, NULL);
246     }
247
248   free (ds->name);
249   ds->name = xstrdup (name);
250
251   if (session != NULL)
252     {
253       dataset_set_session (ds, session);
254       if (active)
255         session_set_active_dataset (session, ds);
256     }
257 }
258
259 struct session *
260 dataset_session (const struct dataset *ds)
261 {
262   return ds->session;
263 }
264
265 void
266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268   if (session != ds->session)
269     {
270       if (ds->session != NULL)
271         session_remove_dataset (ds->session, ds);
272       if (session != NULL)
273         session_add_dataset (session, ds);
274     }
275 }
276
277 /* Returns the dictionary within DS.  This is always nonnull, although it
278    might not contain any variables. */
279 struct dictionary *
280 dataset_dict (const struct dataset *ds)
281 {
282   return ds->dict;
283 }
284
285 /* Replaces DS's dictionary by DICT, discarding any source and
286    transformations. */
287 void
288 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 {
290   assert (ds->proc_state == PROC_COMMITTED);
291   assert (ds->dict != dict);
292
293   dataset_clear (ds);
294
295   dict_destroy (ds->dict);
296   ds->dict = dict;
297   dict_set_change_callback (ds->dict, dict_callback, ds);
298 }
299
300 /* Returns the casereader that will be read when a procedure is executed on
301    DS.  This can be NULL if none has been set up yet. */
302 const struct casereader *
303 dataset_source (const struct dataset *ds)
304 {
305   return ds->source;
306 }
307
308 /* Returns true if DS has a data source, false otherwise. */
309 bool
310 dataset_has_source (const struct dataset *ds)
311 {
312   return dataset_source (ds) != NULL;
313 }
314
315 /* Replaces the active dataset's data by READER.  READER's cases must have an
316    appropriate format for DS's dictionary. */
317 bool
318 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 {
320   casereader_destroy (ds->source);
321   ds->source = reader;
322
323   caseinit_clear (ds->caseinit);
324   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325
326   return reader == NULL || !casereader_error (reader);
327 }
328
329 /* Returns the data source from DS and removes it from DS.  Returns a null
330    pointer if DS has no data source. */
331 struct casereader *
332 dataset_steal_source (struct dataset *ds)
333 {
334   struct casereader *reader = ds->source;
335   ds->source = NULL;
336
337   return reader;
338 }
339
340 /* Returns a number unique to DS.  It can be used to distinguish one dataset
341    from any other within a given program run, even datasets that do not exist
342    at the same time. */
343 unsigned int
344 dataset_seqno (const struct dataset *ds)
345 {
346   return ds->seqno;
347 }
348
349 void
350 dataset_set_callbacks (struct dataset *ds,
351                        const struct dataset_callbacks *callbacks,
352                        void *cb_data)
353 {
354   ds->callbacks = callbacks;
355   ds->cb_data = cb_data;
356 }
357
358 enum dataset_display
359 dataset_get_display (const struct dataset *ds)
360 {
361   return ds->display;
362 }
363
364 void
365 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 {
367   ds->display = display;
368 }
369 \f
370 /* Returns the last time the data was read. */
371 time_t
372 time_of_last_procedure (struct dataset *ds)
373 {
374   if (ds->last_proc_invocation == 0)
375     update_last_proc_invocation (ds);
376   return ds->last_proc_invocation;
377 }
378 \f
379 /* Regular procedure. */
380
381 /* Executes any pending transformations, if necessary.
382    This is not identical to the EXECUTE command in that it won't
383    always read the source data.  This can be important when the
384    source data is given inline within BEGIN DATA...END FILE. */
385 bool
386 proc_execute (struct dataset *ds)
387 {
388   bool ok;
389
390   if ((ds->temporary_trns_chain == NULL
391        || trns_chain_is_empty (ds->temporary_trns_chain))
392       && trns_chain_is_empty (ds->permanent_trns_chain))
393     {
394       ds->n_lag = 0;
395       ds->discard_output = false;
396       dict_set_case_limit (ds->dict, 0);
397       dict_clear_vectors (ds->dict);
398       return true;
399     }
400
401   ok = casereader_destroy (proc_open (ds));
402   return proc_commit (ds) && ok;
403 }
404
405 static const struct casereader_class proc_casereader_class;
406
407 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
408    cases filtered out with FILTER BY will not be included in the casereader
409    (which is usually desirable).  If FILTER is false, all cases will be
410    included regardless of FILTER BY settings.
411
412    proc_commit must be called when done. */
413 struct casereader *
414 proc_open_filtering (struct dataset *ds, bool filter)
415 {
416   struct casereader *reader;
417
418   assert (ds->source != NULL);
419   assert (ds->proc_state == PROC_COMMITTED);
420
421   update_last_proc_invocation (ds);
422
423   caseinit_mark_for_init (ds->caseinit, ds->dict);
424
425   /* Finish up the collection of transformations. */
426   add_case_limit_trns (ds);
427   if (filter)
428     add_filter_trns (ds);
429   trns_chain_finalize (ds->cur_trns_chain);
430
431   /* Make permanent_dict refer to the dictionary right before
432      data reaches the sink. */
433   if (ds->permanent_dict == NULL)
434     ds->permanent_dict = ds->dict;
435
436   /* Prepare sink. */
437   if (!ds->discard_output)
438     {
439       struct dictionary *pd = ds->permanent_dict;
440       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
441       if (compacted_value_cnt < dict_get_next_value_idx (pd))
442         {
443           struct caseproto *compacted_proto;
444           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
445           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
446           ds->sink = autopaging_writer_create (compacted_proto);
447           caseproto_unref (compacted_proto);
448         }
449       else
450         {
451           ds->compactor = NULL;
452           ds->sink = autopaging_writer_create (dict_get_proto (pd));
453         }
454     }
455   else
456     {
457       ds->compactor = NULL;
458       ds->sink = NULL;
459     }
460
461   /* Allocate memory for lagged cases. */
462   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463
464   ds->proc_state = PROC_OPEN;
465   ds->cases_written = 0;
466   ds->ok = true;
467
468   /* FIXME: use taint in dataset in place of `ok'? */
469   /* FIXME: for trivial cases we can just return a clone of
470      ds->source? */
471
472   /* Create casereader and insert a shim on top.  The shim allows us to
473      arbitrarily extend the casereader's lifetime, by slurping the cases into
474      the shim's buffer in proc_commit().  That is especially useful when output
475      table_items are generated directly from the procedure casereader (e.g. by
476      the LIST procedure) when we are using an output driver that keeps a
477      reference to the output items passed to it (e.g. the GUI output driver in
478      PSPPIRE). */
479   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480                                          CASENUMBER_MAX,
481                                          &proc_casereader_class, ds);
482   ds->shim = casereader_shim_insert (reader);
483   return reader;
484 }
485
486 /* Opens dataset DS for reading cases with proc_read.
487    proc_commit must be called when done. */
488 struct casereader *
489 proc_open (struct dataset *ds)
490 {
491   return proc_open_filtering (ds, true);
492 }
493
494 /* Returns true if a procedure is in progress, that is, if
495    proc_open has been called but proc_commit has not. */
496 bool
497 proc_is_open (const struct dataset *ds)
498 {
499   return ds->proc_state != PROC_COMMITTED;
500 }
501
502 /* "read" function for procedure casereader. */
503 static struct ccase *
504 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 {
506   struct dataset *ds = ds_;
507   enum trns_result retval = TRNS_DROP_CASE;
508   struct ccase *c;
509
510   assert (ds->proc_state == PROC_OPEN);
511   for (; ; case_unref (c))
512     {
513       casenumber case_nr;
514
515       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
516       if (retval == TRNS_ERROR)
517         ds->ok = false;
518       if (!ds->ok)
519         return NULL;
520
521       /* Read a case from source. */
522       c = casereader_read (ds->source);
523       if (c == NULL)
524         return NULL;
525       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
526       caseinit_init_vars (ds->caseinit, c);
527
528       /* Execute permanent transformations.  */
529       case_nr = ds->cases_written + 1;
530       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531                                    &c, case_nr);
532       caseinit_update_left_vars (ds->caseinit, c);
533       if (retval != TRNS_CONTINUE)
534         continue;
535
536       /* Write case to collection of lagged cases. */
537       if (ds->n_lag > 0)
538         {
539           while (deque_count (&ds->lag) >= ds->n_lag)
540             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542         }
543
544       /* Write case to replacement dataset. */
545       ds->cases_written++;
546       if (ds->sink != NULL)
547         casewriter_write (ds->sink,
548                           case_map_execute (ds->compactor, case_ref (c)));
549
550       /* Execute temporary transformations. */
551       if (ds->temporary_trns_chain != NULL)
552         {
553           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
554                                        &c, ds->cases_written);
555           if (retval != TRNS_CONTINUE)
556             continue;
557         }
558
559       return c;
560     }
561 }
562
563 /* "destroy" function for procedure casereader. */
564 static void
565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567   struct dataset *ds = ds_;
568   struct ccase *c;
569
570   /* We are always the subreader for a casereader_buffer, so if we're being
571      destroyed then it's because the casereader_buffer has read all the cases
572      that it ever will. */
573   ds->shim = NULL;
574
575   /* Make sure transformations happen for every input case, in
576      case they have side effects, and ensure that the replacement
577      active dataset gets all the cases it should. */
578   while ((c = casereader_read (reader)) != NULL)
579     case_unref (c);
580
581   ds->proc_state = PROC_CLOSED;
582   ds->ok = casereader_destroy (ds->source) && ds->ok;
583   ds->source = NULL;
584   dataset_set_source (ds, NULL);
585 }
586
587 /* Must return false if the source casereader, a transformation,
588    or the sink casewriter signaled an error.  (If a temporary
589    transformation signals an error, then the return value is
590    false, but the replacement active dataset may still be
591    untainted.) */
592 bool
593 proc_commit (struct dataset *ds)
594 {
595   if (ds->shim != NULL)
596     casereader_shim_slurp (ds->shim);
597
598   assert (ds->proc_state == PROC_CLOSED);
599   ds->proc_state = PROC_COMMITTED;
600
601   dataset_changed__ (ds);
602
603   /* Free memory for lagged cases. */
604   while (!deque_is_empty (&ds->lag))
605     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606   free (ds->lag_cases);
607
608   /* Dictionary from before TEMPORARY becomes permanent. */
609   proc_cancel_temporary_transformations (ds);
610
611   if (!ds->discard_output)
612     {
613       /* Finish compacting. */
614       if (ds->compactor != NULL)
615         {
616           case_map_destroy (ds->compactor);
617           ds->compactor = NULL;
618
619           dict_delete_scratch_vars (ds->dict);
620           dict_compact_values (ds->dict);
621         }
622
623       /* Old data sink becomes new data source. */
624       if (ds->sink != NULL)
625         ds->source = casewriter_make_reader (ds->sink);
626     }
627   else
628     {
629       ds->source = NULL;
630       ds->discard_output = false;
631     }
632   ds->sink = NULL;
633
634   caseinit_clear (ds->caseinit);
635   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636
637   dict_clear_vectors (ds->dict);
638   ds->permanent_dict = NULL;
639   return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644   {
645     proc_casereader_read,
646     proc_casereader_destroy,
647     NULL,
648     NULL,
649   };
650
651 /* Updates last_proc_invocation. */
652 static void
653 update_last_proc_invocation (struct dataset *ds)
654 {
655   ds->last_proc_invocation = time (NULL);
656 }
657 \f
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659    current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
661 lagged_case (const struct dataset *ds, int n_before)
662 {
663   assert (n_before >= 1);
664   assert (n_before <= ds->n_lag);
665
666   if (n_before <= deque_count (&ds->lag))
667     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668   else
669     return NULL;
670 }
671 \f
672 /* Returns the current set of permanent transformations,
673    and clears the permanent transformations.
674    For use by INPUT PROGRAM. */
675 struct trns_chain *
676 proc_capture_transformations (struct dataset *ds)
677 {
678   struct trns_chain *chain;
679
680   assert (ds->temporary_trns_chain == NULL);
681   chain = ds->permanent_trns_chain;
682   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
683   dataset_transformations_changed__ (ds, false);
684
685   return chain;
686 }
687
688 /* Adds a transformation that processes a case with PROC and
689    frees itself with FREE to the current set of transformations.
690    The functions are passed AUX as auxiliary data. */
691 void
692 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 {
694   trns_chain_append (ds->cur_trns_chain, proc, free, aux);
695   dataset_transformations_changed__ (ds, true);
696 }
697
698 /* Returns the index of the next transformation.
699    This value can be returned by a transformation procedure
700    function to indicate a "jump" to that transformation. */
701 size_t
702 next_transformation (const struct dataset *ds)
703 {
704   return trns_chain_next (ds->cur_trns_chain);
705 }
706
707 /* Returns true if the next call to add_transformation() will add
708    a temporary transformation, false if it will add a permanent
709    transformation. */
710 bool
711 proc_in_temporary_transformations (const struct dataset *ds)
712 {
713   return ds->temporary_trns_chain != NULL;
714 }
715
716 /* Marks the start of temporary transformations.
717    Further calls to add_transformation() will add temporary
718    transformations. */
719 void
720 proc_start_temporary_transformations (struct dataset *ds)
721 {
722   if (!proc_in_temporary_transformations (ds))
723     {
724       add_case_limit_trns (ds);
725
726       ds->permanent_dict = dict_clone (ds->dict);
727
728       trns_chain_finalize (ds->permanent_trns_chain);
729       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
730       dataset_transformations_changed__ (ds, true);
731     }
732 }
733
734 /* Converts all the temporary transformations, if any, to permanent
735    transformations.  Further transformations will be permanent.
736
737    The FILTER command is implemented as a temporary transformation, so a
738    procedure that uses this function should usually use proc_open_filtering()
739    with FILTER false, instead of plain proc_open().
740
741    Returns true if anything changed, false otherwise. */
742 bool
743 proc_make_temporary_transformations_permanent (struct dataset *ds)
744 {
745   if (proc_in_temporary_transformations (ds))
746     {
747       trns_chain_finalize (ds->temporary_trns_chain);
748       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
749       ds->temporary_trns_chain = NULL;
750
751       ds->cur_trns_chain = ds->permanent_trns_chain;
752
753       dict_destroy (ds->permanent_dict);
754       ds->permanent_dict = NULL;
755
756       return true;
757     }
758   else
759     return false;
760 }
761
762 /* Cancels all temporary transformations, if any.  Further
763    transformations will be permanent.
764    Returns true if anything changed, false otherwise. */
765 bool
766 proc_cancel_temporary_transformations (struct dataset *ds)
767 {
768   if (proc_in_temporary_transformations (ds))
769     {
770       dict_destroy (ds->dict);
771       ds->dict = ds->permanent_dict;
772       ds->permanent_dict = NULL;
773
774       trns_chain_destroy (ds->temporary_trns_chain);
775       ds->temporary_trns_chain = NULL;
776       dataset_transformations_changed__ (
777         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
778       return true;
779     }
780   else
781     return false;
782 }
783
784 /* Cancels all transformations, if any.
785    Returns true if successful, false on I/O error. */
786 bool
787 proc_cancel_all_transformations (struct dataset *ds)
788 {
789   bool ok;
790   assert (ds->proc_state == PROC_COMMITTED);
791   ok = trns_chain_destroy (ds->permanent_trns_chain);
792   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
793   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
794   ds->temporary_trns_chain = NULL;
795   dataset_transformations_changed__ (ds, false);
796
797   return ok;
798 }
799
800 static int
801 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
802 {
803   struct variable *var = var_;
804
805   *cc = case_unshare (*cc);
806   case_data_rw (*cc, var)->f = case_num;
807
808   return TRNS_CONTINUE;
809 }
810
811 /* Add a variable which we can sort by to get back the original order. */
812 struct variable *
813 add_permanent_ordering_transformation (struct dataset *ds)
814 {
815   struct variable *temp_var;
816
817   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
818   if (proc_in_temporary_transformations (ds))
819     {
820       struct variable *perm_var;
821
822       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
823       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
824                          NULL, perm_var);
825       trns_chain_finalize (ds->permanent_trns_chain);
826     }
827   else
828     add_transformation (ds, store_case_num, NULL, temp_var);
829
830   return temp_var;
831 }
832 \f
833 /* Causes output from the next procedure to be discarded, instead
834    of being preserved for use as input for the next procedure. */
835 void
836 proc_discard_output (struct dataset *ds)
837 {
838   ds->discard_output = true;
839 }
840
841
842 /* Checks whether DS has a corrupted active dataset.  If so,
843    discards it and returns false.  If not, returns true without
844    doing anything. */
845 bool
846 dataset_end_of_command (struct dataset *ds)
847 {
848   if (ds->source != NULL)
849     {
850       if (casereader_error (ds->source))
851         {
852           dataset_clear (ds);
853           return false;
854         }
855       else
856         {
857           const struct taint *taint = casereader_get_taint (ds->source);
858           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
859           assert (!taint_has_tainted_successor (taint));
860         }
861     }
862   return true;
863 }
864 \f
865 static trns_proc_func case_limit_trns_proc;
866 static trns_free_func case_limit_trns_free;
867
868 /* Adds a transformation that limits the number of cases that may
869    pass through, if DS->DICT has a case limit. */
870 static void
871 add_case_limit_trns (struct dataset *ds)
872 {
873   casenumber case_limit = dict_get_case_limit (ds->dict);
874   if (case_limit != 0)
875     {
876       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
877       *cases_remaining = case_limit;
878       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
879                           cases_remaining);
880       dict_set_case_limit (ds->dict, 0);
881     }
882 }
883
884 /* Limits the maximum number of cases processed to
885    *CASES_REMAINING. */
886 static int
887 case_limit_trns_proc (void *cases_remaining_,
888                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
889 {
890   size_t *cases_remaining = cases_remaining_;
891   if (*cases_remaining > 0)
892     {
893       (*cases_remaining)--;
894       return TRNS_CONTINUE;
895     }
896   else
897     return TRNS_DROP_CASE;
898 }
899
900 /* Frees the data associated with a case limit transformation. */
901 static bool
902 case_limit_trns_free (void *cases_remaining_)
903 {
904   size_t *cases_remaining = cases_remaining_;
905   free (cases_remaining);
906   return true;
907 }
908 \f
909 static trns_proc_func filter_trns_proc;
910
911 /* Adds a temporary transformation to filter data according to
912    the variable specified on FILTER, if any. */
913 static void
914 add_filter_trns (struct dataset *ds)
915 {
916   struct variable *filter_var = dict_get_filter (ds->dict);
917   if (filter_var != NULL)
918     {
919       proc_start_temporary_transformations (ds);
920       add_transformation (ds, filter_trns_proc, NULL, filter_var);
921     }
922 }
923
924 /* FILTER transformation. */
925 static int
926 filter_trns_proc (void *filter_var_,
927                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
928
929 {
930   struct variable *filter_var = filter_var_;
931   double f = case_num (*c, filter_var);
932   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
933           ? TRNS_CONTINUE : TRNS_DROP_CASE);
934 }
935
936
937 void
938 dataset_need_lag (struct dataset *ds, int n_before)
939 {
940   ds->n_lag = MAX (ds->n_lag, n_before);
941 }
942 \f
943 static void
944 dataset_changed__ (struct dataset *ds)
945 {
946   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
947     ds->callbacks->changed (ds->cb_data);
948 }
949
950 static void
951 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
952 {
953   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
954     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
955 }
956 \f
957 /* Private interface for use by session code. */
958
959 void
960 dataset_set_session__ (struct dataset *ds, struct session *session)
961 {
962   ds->session = session;
963 }