Removed unreachable code section
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152
153   ds->caseinit = caseinit_create ();
154
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188
189   dataset_create_finish__ (new, old->session);
190
191   return new;
192 }
193
194 /* Destroys DS. */
195 void
196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_destroy (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds->name);
207       free (ds);
208     }
209 }
210
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
213 dataset_clear (struct dataset *ds)
214 {
215   assert (ds->proc_state == PROC_COMMITTED);
216
217   dict_clear (ds->dict);
218   fh_set_default_handle (NULL);
219
220   ds->n_lag = 0;
221
222   casereader_destroy (ds->source);
223   ds->source = NULL;
224
225   proc_cancel_all_transformations (ds);
226 }
227
228 const char *
229 dataset_name (const struct dataset *ds)
230 {
231   return ds->name;
232 }
233
234 void
235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237   struct session *session = ds->session;
238   bool active = false;
239
240   if (session != NULL)
241     {
242       active = session_active_dataset (session) == ds;
243       if (active)
244         session_set_active_dataset (session, NULL);
245       dataset_set_session (ds, NULL);
246     }
247
248   free (ds->name);
249   ds->name = xstrdup (name);
250
251   if (session != NULL)
252     {
253       dataset_set_session (ds, session);
254       if (active)
255         session_set_active_dataset (session, ds);
256     }
257 }
258
259 struct session *
260 dataset_session (const struct dataset *ds)
261 {
262   return ds->session;
263 }
264
265 void
266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268   if (session != ds->session)
269     {
270       if (ds->session != NULL)
271         session_remove_dataset (ds->session, ds);
272       if (session != NULL)
273         session_add_dataset (session, ds);
274     }
275 }
276
277 /* Returns the dictionary within DS.  This is always nonnull, although it
278    might not contain any variables. */
279 struct dictionary *
280 dataset_dict (const struct dataset *ds)
281 {
282   return ds->dict;
283 }
284
285 /* Replaces DS's dictionary by DICT, discarding any source and
286    transformations. */
287 void
288 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 {
290   assert (ds->proc_state == PROC_COMMITTED);
291   assert (ds->dict != dict);
292
293   dataset_clear (ds);
294
295   dict_destroy (ds->dict);
296   ds->dict = dict;
297   dict_set_change_callback (ds->dict, dict_callback, ds);
298 }
299
300 /* Returns the casereader that will be read when a procedure is executed on
301    DS.  This can be NULL if none has been set up yet. */
302 const struct casereader *
303 dataset_source (const struct dataset *ds)
304 {
305   return ds->source;
306 }
307
308 /* Returns true if DS has a data source, false otherwise. */
309 bool
310 dataset_has_source (const struct dataset *ds)
311 {
312   return dataset_source (ds) != NULL;
313 }
314
315 /* Replaces the active dataset's data by READER.  READER's cases must have an
316    appropriate format for DS's dictionary. */
317 bool
318 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 {
320   casereader_destroy (ds->source);
321   ds->source = reader;
322
323   caseinit_clear (ds->caseinit);
324   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325
326   return reader == NULL || !casereader_error (reader);
327 }
328
329 /* Returns the data source from DS and removes it from DS.  Returns a null
330    pointer if DS has no data source. */
331 struct casereader *
332 dataset_steal_source (struct dataset *ds)
333 {
334   struct casereader *reader = ds->source;
335   ds->source = NULL;
336
337   return reader;
338 }
339
340 /* Returns a number unique to DS.  It can be used to distinguish one dataset
341    from any other within a given program run, even datasets that do not exist
342    at the same time. */
343 unsigned int
344 dataset_seqno (const struct dataset *ds)
345 {
346   return ds->seqno;
347 }
348
349 void
350 dataset_set_callbacks (struct dataset *ds,
351                        const struct dataset_callbacks *callbacks,
352                        void *cb_data)
353 {
354   ds->callbacks = callbacks;
355   ds->cb_data = cb_data;
356 }
357
358 enum dataset_display
359 dataset_get_display (const struct dataset *ds)
360 {
361   return ds->display;
362 }
363
364 void
365 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 {
367   ds->display = display;
368 }
369 \f
370 /* Returns the last time the data was read. */
371 time_t
372 time_of_last_procedure (struct dataset *ds)
373 {
374   if (ds->last_proc_invocation == 0)
375     update_last_proc_invocation (ds);
376   return ds->last_proc_invocation;
377 }
378 \f
379 /* Regular procedure. */
380
381 /* Executes any pending transformations, if necessary.
382    This is not identical to the EXECUTE command in that it won't
383    always read the source data.  This can be important when the
384    source data is given inline within BEGIN DATA...END FILE. */
385 bool
386 proc_execute (struct dataset *ds)
387 {
388   bool ok;
389
390   if ((ds->temporary_trns_chain == NULL
391        || trns_chain_is_empty (ds->temporary_trns_chain))
392       && trns_chain_is_empty (ds->permanent_trns_chain))
393     {
394       ds->n_lag = 0;
395       ds->discard_output = false;
396       dict_set_case_limit (ds->dict, 0);
397       dict_clear_vectors (ds->dict);
398       return true;
399     }
400
401   ok = casereader_destroy (proc_open (ds));
402   return proc_commit (ds) && ok;
403 }
404
405 static const struct casereader_class proc_casereader_class;
406
407 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
408    cases filtered out with FILTER BY will not be included in the casereader
409    (which is usually desirable).  If FILTER is false, all cases will be
410    included regardless of FILTER BY settings.
411
412    proc_commit must be called when done. */
413 struct casereader *
414 proc_open_filtering (struct dataset *ds, bool filter)
415 {
416   struct casereader *reader;
417
418   assert (ds->source != NULL);
419   assert (ds->proc_state == PROC_COMMITTED);
420
421   update_last_proc_invocation (ds);
422
423   caseinit_mark_for_init (ds->caseinit, ds->dict);
424
425   /* Finish up the collection of transformations. */
426   add_case_limit_trns (ds);
427   if (filter)
428     add_filter_trns (ds);
429   trns_chain_finalize (ds->cur_trns_chain);
430
431   /* Make permanent_dict refer to the dictionary right before
432      data reaches the sink. */
433   if (ds->permanent_dict == NULL)
434     ds->permanent_dict = ds->dict;
435
436   /* Prepare sink. */
437   if (!ds->discard_output)
438     {
439       struct dictionary *pd = ds->permanent_dict;
440       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
441       if (compacted_value_cnt < dict_get_next_value_idx (pd))
442         {
443           struct caseproto *compacted_proto;
444           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
445           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
446           ds->sink = autopaging_writer_create (compacted_proto);
447           caseproto_unref (compacted_proto);
448         }
449       else
450         {
451           ds->compactor = NULL;
452           ds->sink = autopaging_writer_create (dict_get_proto (pd));
453         }
454     }
455   else
456     {
457       ds->compactor = NULL;
458       ds->sink = NULL;
459     }
460
461   /* Allocate memory for lagged cases. */
462   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463
464   ds->proc_state = PROC_OPEN;
465   ds->cases_written = 0;
466   ds->ok = true;
467
468   /* FIXME: use taint in dataset in place of `ok'? */
469   /* FIXME: for trivial cases we can just return a clone of
470      ds->source? */
471
472   /* Create casereader and insert a shim on top.  The shim allows us to
473      arbitrarily extend the casereader's lifetime, by slurping the cases into
474      the shim's buffer in proc_commit().  That is especially useful when output
475      table_items are generated directly from the procedure casereader (e.g. by
476      the LIST procedure) when we are using an output driver that keeps a
477      reference to the output items passed to it (e.g. the GUI output driver in
478      PSPPIRE). */
479   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480                                          CASENUMBER_MAX,
481                                          &proc_casereader_class, ds);
482   ds->shim = casereader_shim_insert (reader);
483   return reader;
484 }
485
486 /* Opens dataset DS for reading cases with proc_read.
487    proc_commit must be called when done. */
488 struct casereader *
489 proc_open (struct dataset *ds)
490 {
491   return proc_open_filtering (ds, true);
492 }
493
494 /* Returns true if a procedure is in progress, that is, if
495    proc_open has been called but proc_commit has not. */
496 bool
497 proc_is_open (const struct dataset *ds)
498 {
499   return ds->proc_state != PROC_COMMITTED;
500 }
501
502 /* "read" function for procedure casereader. */
503 static struct ccase *
504 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 {
506   struct dataset *ds = ds_;
507   enum trns_result retval = TRNS_DROP_CASE;
508   struct ccase *c;
509
510   assert (ds->proc_state == PROC_OPEN);
511   for (; ; case_unref (c))
512     {
513       casenumber case_nr;
514
515       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
516       if (retval == TRNS_ERROR)
517         ds->ok = false;
518       if (!ds->ok)
519         return NULL;
520
521       /* Read a case from source. */
522       c = casereader_read (ds->source);
523       if (c == NULL)
524         return NULL;
525       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
526       caseinit_init_vars (ds->caseinit, c);
527
528       /* Execute permanent transformations.  */
529       case_nr = ds->cases_written + 1;
530       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531                                    &c, case_nr);
532       caseinit_update_left_vars (ds->caseinit, c);
533       if (retval != TRNS_CONTINUE)
534         continue;
535
536       /* Write case to collection of lagged cases. */
537       if (ds->n_lag > 0)
538         {
539           while (deque_count (&ds->lag) >= ds->n_lag)
540             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542         }
543
544       /* Write case to replacement dataset. */
545       ds->cases_written++;
546       if (ds->sink != NULL)
547         casewriter_write (ds->sink,
548                           case_map_execute (ds->compactor, case_ref (c)));
549
550       /* Execute temporary transformations. */
551       if (ds->temporary_trns_chain != NULL)
552         {
553           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
554                                        &c, ds->cases_written);
555           if (retval != TRNS_CONTINUE)
556             continue;
557         }
558
559       return c;
560     }
561 }
562
563 /* "destroy" function for procedure casereader. */
564 static void
565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567   struct dataset *ds = ds_;
568   struct ccase *c;
569
570   /* We are always the subreader for a casereader_buffer, so if we're being
571      destroyed then it's because the casereader_buffer has read all the cases
572      that it ever will. */
573   ds->shim = NULL;
574
575   /* Make sure transformations happen for every input case, in
576      case they have side effects, and ensure that the replacement
577      active dataset gets all the cases it should. */
578   while ((c = casereader_read (reader)) != NULL)
579     case_unref (c);
580
581   ds->proc_state = PROC_CLOSED;
582   ds->ok = casereader_destroy (ds->source) && ds->ok;
583   ds->source = NULL;
584   dataset_set_source (ds, NULL);
585 }
586
587 /* Must return false if the source casereader, a transformation,
588    or the sink casewriter signaled an error.  (If a temporary
589    transformation signals an error, then the return value is
590    false, but the replacement active dataset may still be
591    untainted.) */
592 bool
593 proc_commit (struct dataset *ds)
594 {
595   if (ds->shim != NULL)
596     casereader_shim_slurp (ds->shim);
597
598   assert (ds->proc_state == PROC_CLOSED);
599   ds->proc_state = PROC_COMMITTED;
600
601   dataset_changed__ (ds);
602
603   /* Free memory for lagged cases. */
604   while (!deque_is_empty (&ds->lag))
605     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606   free (ds->lag_cases);
607
608   /* Dictionary from before TEMPORARY becomes permanent. */
609   proc_cancel_temporary_transformations (ds);
610
611   if (!ds->discard_output)
612     {
613       /* Finish compacting. */
614       if (ds->compactor != NULL)
615         {
616           case_map_destroy (ds->compactor);
617           ds->compactor = NULL;
618
619           dict_delete_scratch_vars (ds->dict);
620           dict_compact_values (ds->dict);
621         }
622
623       /* Old data sink becomes new data source. */
624       if (ds->sink != NULL)
625         ds->source = casewriter_make_reader (ds->sink);
626     }
627   else
628     {
629       ds->source = NULL;
630       ds->discard_output = false;
631     }
632   ds->sink = NULL;
633
634   caseinit_clear (ds->caseinit);
635   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636
637   dict_clear_vectors (ds->dict);
638   ds->permanent_dict = NULL;
639   return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644   {
645     proc_casereader_read,
646     proc_casereader_destroy,
647     NULL,
648     NULL,
649   };
650
651 /* Updates last_proc_invocation. */
652 static void
653 update_last_proc_invocation (struct dataset *ds)
654 {
655   ds->last_proc_invocation = time (NULL);
656 }
657 \f
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659    current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
661 lagged_case (const struct dataset *ds, int n_before)
662 {
663   assert (n_before >= 1);
664   assert (n_before <= ds->n_lag);
665
666   if (n_before <= deque_count (&ds->lag))
667     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668   else
669     return NULL;
670 }
671 \f
672 /* Returns the current set of permanent transformations,
673    and clears the permanent transformations.
674    For use by INPUT PROGRAM. */
675 struct trns_chain *
676 proc_capture_transformations (struct dataset *ds)
677 {
678   struct trns_chain *chain;
679
680   assert (ds->temporary_trns_chain == NULL);
681   chain = ds->permanent_trns_chain;
682   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
683   dataset_transformations_changed__ (ds, false);
684
685   return chain;
686 }
687
688 /* Adds a transformation that processes a case with PROC and
689    frees itself with FREE to the current set of transformations.
690    The functions are passed AUX as auxiliary data. */
691 void
692 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 {
694   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
695   dataset_transformations_changed__ (ds, true);
696 }
697
698 /* Adds a transformation that processes a case with PROC and
699    frees itself with FREE to the current set of transformations.
700    When parsing of the block of transformations is complete,
701    FINALIZE will be called.
702    The functions are passed AUX as auxiliary data. */
703 void
704 add_transformation_with_finalizer (struct dataset *ds,
705                                    trns_finalize_func *finalize,
706                                    trns_proc_func *proc,
707                                    trns_free_func *free, void *aux)
708 {
709   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
710   dataset_transformations_changed__ (ds, true);
711 }
712
713 /* Returns the index of the next transformation.
714    This value can be returned by a transformation procedure
715    function to indicate a "jump" to that transformation. */
716 size_t
717 next_transformation (const struct dataset *ds)
718 {
719   return trns_chain_next (ds->cur_trns_chain);
720 }
721
722 /* Returns true if the next call to add_transformation() will add
723    a temporary transformation, false if it will add a permanent
724    transformation. */
725 bool
726 proc_in_temporary_transformations (const struct dataset *ds)
727 {
728   return ds->temporary_trns_chain != NULL;
729 }
730
731 /* Marks the start of temporary transformations.
732    Further calls to add_transformation() will add temporary
733    transformations. */
734 void
735 proc_start_temporary_transformations (struct dataset *ds)
736 {
737   if (!proc_in_temporary_transformations (ds))
738     {
739       add_case_limit_trns (ds);
740
741       ds->permanent_dict = dict_clone (ds->dict);
742
743       trns_chain_finalize (ds->permanent_trns_chain);
744       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
745       dataset_transformations_changed__ (ds, true);
746     }
747 }
748
749 /* Converts all the temporary transformations, if any, to permanent
750    transformations.  Further transformations will be permanent.
751
752    The FILTER command is implemented as a temporary transformation, so a
753    procedure that uses this function should usually use proc_open_filtering()
754    with FILTER false, instead of plain proc_open().
755
756    Returns true if anything changed, false otherwise. */
757 bool
758 proc_make_temporary_transformations_permanent (struct dataset *ds)
759 {
760   if (proc_in_temporary_transformations (ds))
761     {
762       trns_chain_finalize (ds->temporary_trns_chain);
763       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
764       ds->temporary_trns_chain = NULL;
765
766       ds->cur_trns_chain = ds->permanent_trns_chain;
767
768       dict_destroy (ds->permanent_dict);
769       ds->permanent_dict = NULL;
770
771       return true;
772     }
773   else
774     return false;
775 }
776
777 /* Cancels all temporary transformations, if any.  Further
778    transformations will be permanent.
779    Returns true if anything changed, false otherwise. */
780 bool
781 proc_cancel_temporary_transformations (struct dataset *ds)
782 {
783   if (proc_in_temporary_transformations (ds))
784     {
785       dict_destroy (ds->dict);
786       ds->dict = ds->permanent_dict;
787       ds->permanent_dict = NULL;
788
789       trns_chain_destroy (ds->temporary_trns_chain);
790       ds->temporary_trns_chain = NULL;
791       dataset_transformations_changed__ (
792         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
793       return true;
794     }
795   else
796     return false;
797 }
798
799 /* Cancels all transformations, if any.
800    Returns true if successful, false on I/O error. */
801 bool
802 proc_cancel_all_transformations (struct dataset *ds)
803 {
804   bool ok;
805   assert (ds->proc_state == PROC_COMMITTED);
806   ok = trns_chain_destroy (ds->permanent_trns_chain);
807   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
808   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
809   ds->temporary_trns_chain = NULL;
810   dataset_transformations_changed__ (ds, false);
811
812   return ok;
813 }
814
815 static int
816 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
817 {
818   struct variable *var = var_;
819
820   *cc = case_unshare (*cc);
821   case_data_rw (*cc, var)->f = case_num;
822
823   return TRNS_CONTINUE;
824 }
825
826 /* Add a variable which we can sort by to get back the original order. */
827 struct variable *
828 add_permanent_ordering_transformation (struct dataset *ds)
829 {
830   struct variable *temp_var;
831
832   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
833   if (proc_in_temporary_transformations (ds))
834     {
835       struct variable *perm_var;
836
837       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
838       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
839                          NULL, perm_var);
840       trns_chain_finalize (ds->permanent_trns_chain);
841     }
842   else
843     add_transformation (ds, store_case_num, NULL, temp_var);
844
845   return temp_var;
846 }
847 \f
848 /* Causes output from the next procedure to be discarded, instead
849    of being preserved for use as input for the next procedure. */
850 void
851 proc_discard_output (struct dataset *ds)
852 {
853   ds->discard_output = true;
854 }
855
856
857 /* Checks whether DS has a corrupted active dataset.  If so,
858    discards it and returns false.  If not, returns true without
859    doing anything. */
860 bool
861 dataset_end_of_command (struct dataset *ds)
862 {
863   if (ds->source != NULL)
864     {
865       if (casereader_error (ds->source))
866         {
867           dataset_clear (ds);
868           return false;
869         }
870       else
871         {
872           const struct taint *taint = casereader_get_taint (ds->source);
873           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
874           assert (!taint_has_tainted_successor (taint));
875         }
876     }
877   return true;
878 }
879 \f
880 static trns_proc_func case_limit_trns_proc;
881 static trns_free_func case_limit_trns_free;
882
883 /* Adds a transformation that limits the number of cases that may
884    pass through, if DS->DICT has a case limit. */
885 static void
886 add_case_limit_trns (struct dataset *ds)
887 {
888   casenumber case_limit = dict_get_case_limit (ds->dict);
889   if (case_limit != 0)
890     {
891       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
892       *cases_remaining = case_limit;
893       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
894                           cases_remaining);
895       dict_set_case_limit (ds->dict, 0);
896     }
897 }
898
899 /* Limits the maximum number of cases processed to
900    *CASES_REMAINING. */
901 static int
902 case_limit_trns_proc (void *cases_remaining_,
903                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
904 {
905   size_t *cases_remaining = cases_remaining_;
906   if (*cases_remaining > 0)
907     {
908       (*cases_remaining)--;
909       return TRNS_CONTINUE;
910     }
911   else
912     return TRNS_DROP_CASE;
913 }
914
915 /* Frees the data associated with a case limit transformation. */
916 static bool
917 case_limit_trns_free (void *cases_remaining_)
918 {
919   size_t *cases_remaining = cases_remaining_;
920   free (cases_remaining);
921   return true;
922 }
923 \f
924 static trns_proc_func filter_trns_proc;
925
926 /* Adds a temporary transformation to filter data according to
927    the variable specified on FILTER, if any. */
928 static void
929 add_filter_trns (struct dataset *ds)
930 {
931   struct variable *filter_var = dict_get_filter (ds->dict);
932   if (filter_var != NULL)
933     {
934       proc_start_temporary_transformations (ds);
935       add_transformation (ds, filter_trns_proc, NULL, filter_var);
936     }
937 }
938
939 /* FILTER transformation. */
940 static int
941 filter_trns_proc (void *filter_var_,
942                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
943
944 {
945   struct variable *filter_var = filter_var_;
946   double f = case_num (*c, filter_var);
947   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
948           ? TRNS_CONTINUE : TRNS_DROP_CASE);
949 }
950
951
952 void
953 dataset_need_lag (struct dataset *ds, int n_before)
954 {
955   ds->n_lag = MAX (ds->n_lag, n_before);
956 }
957 \f
958 static void
959 dataset_changed__ (struct dataset *ds)
960 {
961   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
962     ds->callbacks->changed (ds->cb_data);
963 }
964
965 static void
966 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
967 {
968   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
969     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
970 }
971 \f
972 /* Private interface for use by session code. */
973
974 void
975 dataset_set_session__ (struct dataset *ds, struct session *session)
976 {
977   ds->session = session;
978 }