more work on datasets
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152
153   ds->caseinit = caseinit_create ();
154
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188
189   dataset_create_finish__ (new, old->session);
190
191   return new;
192 }
193
194 /* Destroys DS. */
195 void
196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_destroy (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds->name);
207       free (ds);
208     }
209 }
210
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
213 dataset_clear (struct dataset *ds)
214 {
215   assert (ds->proc_state == PROC_COMMITTED);
216
217   dict_clear (ds->dict);
218   fh_set_default_handle (NULL);
219
220   ds->n_lag = 0;
221
222   casereader_destroy (ds->source);
223   ds->source = NULL;
224
225   proc_cancel_all_transformations (ds);
226 }
227
228 const char *
229 dataset_name (const struct dataset *ds)
230 {
231   return ds->name;
232 }
233
234 void
235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237   struct session *session = ds->session;
238   bool active = false;
239
240   if (session != NULL)
241     {
242       active = session_active_dataset (session) == ds;
243       if (active)
244         session_set_active_dataset (session, NULL);
245       dataset_set_session (ds, NULL);
246     }
247
248   free (ds->name);
249   ds->name = xstrdup (name);
250
251   if (session != NULL)
252     {
253       dataset_set_session (ds, session);
254       if (active)
255         session_set_active_dataset (session, ds);
256     }
257 }
258
259 struct session *
260 dataset_session (const struct dataset *ds)
261 {
262   return ds->session;
263 }
264
265 void
266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268   if (session != ds->session)
269     {
270       dataset_ref (ds);
271       if (ds->session != NULL)
272         session_remove_dataset (ds->session, ds);
273       if (session != NULL)
274         session_add_dataset (session, ds);
275       dataset_unref (ds);
276     }
277 }
278
279 /* Returns the dictionary within DS.  This is always nonnull, although it
280    might not contain any variables. */
281 struct dictionary *
282 dataset_dict (const struct dataset *ds)
283 {
284   return ds->dict;
285 }
286
287 /* Replaces DS's dictionary by DICT, discarding any source and
288    transformations. */
289 void
290 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
291 {
292   assert (ds->proc_state == PROC_COMMITTED);
293   assert (ds->dict != dict);
294
295   dataset_clear (ds);
296
297   dict_destroy (ds->dict);
298   ds->dict = dict;
299   dict_set_change_callback (ds->dict, dict_callback, ds);
300 }
301
302 /* Returns the casereader that will be read when a procedure is executed on
303    DS.  This can be NULL if none has been set up yet. */
304 const struct casereader *
305 dataset_source (const struct dataset *ds)
306 {
307   return ds->source;
308 }
309
310 /* Returns true if DS has a data source, false otherwise. */
311 bool
312 dataset_has_source (const struct dataset *ds)
313 {
314   return dataset_source (ds) != NULL;
315 }
316
317 /* Replaces the active dataset's data by READER.  READER's cases must have an
318    appropriate format for DS's dictionary. */
319 bool
320 dataset_set_source (struct dataset *ds, struct casereader *reader)
321 {
322   casereader_destroy (ds->source);
323   ds->source = reader;
324
325   caseinit_clear (ds->caseinit);
326   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
327
328   return reader == NULL || !casereader_error (reader);
329 }
330
331 /* Returns the data source from DS and removes it from DS.  Returns a null
332    pointer if DS has no data source. */
333 struct casereader *
334 dataset_steal_source (struct dataset *ds)
335 {
336   struct casereader *reader = ds->source;
337   ds->source = NULL;
338
339   return reader;
340 }
341
342 /* Returns a number unique to DS.  It can be used to distinguish one dataset
343    from any other within a given program run, even datasets that do not exist
344    at the same time. */
345 unsigned int
346 dataset_seqno (const struct dataset *ds)
347 {
348   return ds->seqno;
349 }
350
351 void
352 dataset_set_callbacks (struct dataset *ds,
353                        const struct dataset_callbacks *callbacks,
354                        void *cb_data)
355 {
356   ds->callbacks = callbacks;
357   ds->cb_data = cb_data;
358 }
359
360 enum dataset_display
361 dataset_get_display (const struct dataset *ds)
362 {
363   return ds->display;
364 }
365
366 void
367 dataset_set_display (struct dataset *ds, enum dataset_display display)
368 {
369   ds->display = display;
370 }
371 \f
372 /* Returns the last time the data was read. */
373 time_t
374 time_of_last_procedure (struct dataset *ds)
375 {
376   if (ds->last_proc_invocation == 0)
377     update_last_proc_invocation (ds);
378   return ds->last_proc_invocation;
379 }
380 \f
381 /* Regular procedure. */
382
383 /* Executes any pending transformations, if necessary.
384    This is not identical to the EXECUTE command in that it won't
385    always read the source data.  This can be important when the
386    source data is given inline within BEGIN DATA...END FILE. */
387 bool
388 proc_execute (struct dataset *ds)
389 {
390   bool ok;
391
392   if ((ds->temporary_trns_chain == NULL
393        || trns_chain_is_empty (ds->temporary_trns_chain))
394       && trns_chain_is_empty (ds->permanent_trns_chain))
395     {
396       ds->n_lag = 0;
397       ds->discard_output = false;
398       dict_set_case_limit (ds->dict, 0);
399       dict_clear_vectors (ds->dict);
400       return true;
401     }
402
403   ok = casereader_destroy (proc_open (ds));
404   return proc_commit (ds) && ok;
405 }
406
407 static const struct casereader_class proc_casereader_class;
408
409 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
410    cases filtered out with FILTER BY will not be included in the casereader
411    (which is usually desirable).  If FILTER is false, all cases will be
412    included regardless of FILTER BY settings.
413
414    proc_commit must be called when done. */
415 struct casereader *
416 proc_open_filtering (struct dataset *ds, bool filter)
417 {
418   struct casereader *reader;
419
420   assert (ds->source != NULL);
421   assert (ds->proc_state == PROC_COMMITTED);
422
423   update_last_proc_invocation (ds);
424
425   caseinit_mark_for_init (ds->caseinit, ds->dict);
426
427   /* Finish up the collection of transformations. */
428   add_case_limit_trns (ds);
429   if (filter)
430     add_filter_trns (ds);
431   trns_chain_finalize (ds->cur_trns_chain);
432
433   /* Make permanent_dict refer to the dictionary right before
434      data reaches the sink. */
435   if (ds->permanent_dict == NULL)
436     ds->permanent_dict = ds->dict;
437
438   /* Prepare sink. */
439   if (!ds->discard_output)
440     {
441       struct dictionary *pd = ds->permanent_dict;
442       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
443       if (compacted_value_cnt < dict_get_next_value_idx (pd))
444         {
445           struct caseproto *compacted_proto;
446           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
447           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
448           ds->sink = autopaging_writer_create (compacted_proto);
449           caseproto_unref (compacted_proto);
450         }
451       else
452         {
453           ds->compactor = NULL;
454           ds->sink = autopaging_writer_create (dict_get_proto (pd));
455         }
456     }
457   else
458     {
459       ds->compactor = NULL;
460       ds->sink = NULL;
461     }
462
463   /* Allocate memory for lagged cases. */
464   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
465
466   ds->proc_state = PROC_OPEN;
467   ds->cases_written = 0;
468   ds->ok = true;
469
470   /* FIXME: use taint in dataset in place of `ok'? */
471   /* FIXME: for trivial cases we can just return a clone of
472      ds->source? */
473
474   /* Create casereader and insert a shim on top.  The shim allows us to
475      arbitrarily extend the casereader's lifetime, by slurping the cases into
476      the shim's buffer in proc_commit().  That is especially useful when output
477      table_items are generated directly from the procedure casereader (e.g. by
478      the LIST procedure) when we are using an output driver that keeps a
479      reference to the output items passed to it (e.g. the GUI output driver in
480      PSPPIRE). */
481   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
482                                          CASENUMBER_MAX,
483                                          &proc_casereader_class, ds);
484   ds->shim = casereader_shim_insert (reader);
485   return reader;
486 }
487
488 /* Opens dataset DS for reading cases with proc_read.
489    proc_commit must be called when done. */
490 struct casereader *
491 proc_open (struct dataset *ds)
492 {
493   return proc_open_filtering (ds, true);
494 }
495
496 /* Returns true if a procedure is in progress, that is, if
497    proc_open has been called but proc_commit has not. */
498 bool
499 proc_is_open (const struct dataset *ds)
500 {
501   return ds->proc_state != PROC_COMMITTED;
502 }
503
504 /* "read" function for procedure casereader. */
505 static struct ccase *
506 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
507 {
508   struct dataset *ds = ds_;
509   enum trns_result retval = TRNS_DROP_CASE;
510   struct ccase *c;
511
512   assert (ds->proc_state == PROC_OPEN);
513   for (; ; case_unref (c))
514     {
515       casenumber case_nr;
516
517       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
518       if (retval == TRNS_ERROR)
519         ds->ok = false;
520       if (!ds->ok)
521         return NULL;
522
523       /* Read a case from source. */
524       c = casereader_read (ds->source);
525       if (c == NULL)
526         return NULL;
527       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
528       caseinit_init_vars (ds->caseinit, c);
529
530       /* Execute permanent transformations.  */
531       case_nr = ds->cases_written + 1;
532       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
533                                    &c, case_nr);
534       caseinit_update_left_vars (ds->caseinit, c);
535       if (retval != TRNS_CONTINUE)
536         continue;
537
538       /* Write case to collection of lagged cases. */
539       if (ds->n_lag > 0)
540         {
541           while (deque_count (&ds->lag) >= ds->n_lag)
542             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
543           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
544         }
545
546       /* Write case to replacement dataset. */
547       ds->cases_written++;
548       if (ds->sink != NULL)
549         casewriter_write (ds->sink,
550                           case_map_execute (ds->compactor, case_ref (c)));
551
552       /* Execute temporary transformations. */
553       if (ds->temporary_trns_chain != NULL)
554         {
555           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
556                                        &c, ds->cases_written);
557           if (retval != TRNS_CONTINUE)
558             continue;
559         }
560
561       return c;
562     }
563 }
564
565 /* "destroy" function for procedure casereader. */
566 static void
567 proc_casereader_destroy (struct casereader *reader, void *ds_)
568 {
569   struct dataset *ds = ds_;
570   struct ccase *c;
571
572   /* We are always the subreader for a casereader_buffer, so if we're being
573      destroyed then it's because the casereader_buffer has read all the cases
574      that it ever will. */
575   ds->shim = NULL;
576
577   /* Make sure transformations happen for every input case, in
578      case they have side effects, and ensure that the replacement
579      active dataset gets all the cases it should. */
580   while ((c = casereader_read (reader)) != NULL)
581     case_unref (c);
582
583   ds->proc_state = PROC_CLOSED;
584   ds->ok = casereader_destroy (ds->source) && ds->ok;
585   ds->source = NULL;
586   dataset_set_source (ds, NULL);
587 }
588
589 /* Must return false if the source casereader, a transformation,
590    or the sink casewriter signaled an error.  (If a temporary
591    transformation signals an error, then the return value is
592    false, but the replacement active dataset may still be
593    untainted.) */
594 bool
595 proc_commit (struct dataset *ds)
596 {
597   if (ds->shim != NULL)
598     casereader_shim_slurp (ds->shim);
599
600   assert (ds->proc_state == PROC_CLOSED);
601   ds->proc_state = PROC_COMMITTED;
602
603   dataset_changed__ (ds);
604
605   /* Free memory for lagged cases. */
606   while (!deque_is_empty (&ds->lag))
607     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
608   free (ds->lag_cases);
609
610   /* Dictionary from before TEMPORARY becomes permanent. */
611   proc_cancel_temporary_transformations (ds);
612
613   if (!ds->discard_output)
614     {
615       /* Finish compacting. */
616       if (ds->compactor != NULL)
617         {
618           case_map_destroy (ds->compactor);
619           ds->compactor = NULL;
620
621           dict_delete_scratch_vars (ds->dict);
622           dict_compact_values (ds->dict);
623         }
624
625       /* Old data sink becomes new data source. */
626       if (ds->sink != NULL)
627         ds->source = casewriter_make_reader (ds->sink);
628     }
629   else
630     {
631       ds->source = NULL;
632       ds->discard_output = false;
633     }
634   ds->sink = NULL;
635
636   caseinit_clear (ds->caseinit);
637   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
638
639   dict_clear_vectors (ds->dict);
640   ds->permanent_dict = NULL;
641   return proc_cancel_all_transformations (ds) && ds->ok;
642 }
643
644 /* Casereader class for procedure execution. */
645 static const struct casereader_class proc_casereader_class =
646   {
647     proc_casereader_read,
648     proc_casereader_destroy,
649     NULL,
650     NULL,
651   };
652
653 /* Updates last_proc_invocation. */
654 static void
655 update_last_proc_invocation (struct dataset *ds)
656 {
657   ds->last_proc_invocation = time (NULL);
658 }
659 \f
660 /* Returns a pointer to the lagged case from N_BEFORE cases before the
661    current one, or NULL if there haven't been that many cases yet. */
662 const struct ccase *
663 lagged_case (const struct dataset *ds, int n_before)
664 {
665   assert (n_before >= 1);
666   assert (n_before <= ds->n_lag);
667
668   if (n_before <= deque_count (&ds->lag))
669     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
670   else
671     return NULL;
672 }
673 \f
674 /* Returns the current set of permanent transformations,
675    and clears the permanent transformations.
676    For use by INPUT PROGRAM. */
677 struct trns_chain *
678 proc_capture_transformations (struct dataset *ds)
679 {
680   struct trns_chain *chain;
681
682   assert (ds->temporary_trns_chain == NULL);
683   chain = ds->permanent_trns_chain;
684   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
685   dataset_transformations_changed__ (ds, false);
686
687   return chain;
688 }
689
690 /* Adds a transformation that processes a case with PROC and
691    frees itself with FREE to the current set of transformations.
692    The functions are passed AUX as auxiliary data. */
693 void
694 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
695 {
696   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
697   dataset_transformations_changed__ (ds, true);
698 }
699
700 /* Adds a transformation that processes a case with PROC and
701    frees itself with FREE to the current set of transformations.
702    When parsing of the block of transformations is complete,
703    FINALIZE will be called.
704    The functions are passed AUX as auxiliary data. */
705 void
706 add_transformation_with_finalizer (struct dataset *ds,
707                                    trns_finalize_func *finalize,
708                                    trns_proc_func *proc,
709                                    trns_free_func *free, void *aux)
710 {
711   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
712   dataset_transformations_changed__ (ds, true);
713 }
714
715 /* Returns the index of the next transformation.
716    This value can be returned by a transformation procedure
717    function to indicate a "jump" to that transformation. */
718 size_t
719 next_transformation (const struct dataset *ds)
720 {
721   return trns_chain_next (ds->cur_trns_chain);
722 }
723
724 /* Returns true if the next call to add_transformation() will add
725    a temporary transformation, false if it will add a permanent
726    transformation. */
727 bool
728 proc_in_temporary_transformations (const struct dataset *ds)
729 {
730   return ds->temporary_trns_chain != NULL;
731 }
732
733 /* Marks the start of temporary transformations.
734    Further calls to add_transformation() will add temporary
735    transformations. */
736 void
737 proc_start_temporary_transformations (struct dataset *ds)
738 {
739   if (!proc_in_temporary_transformations (ds))
740     {
741       add_case_limit_trns (ds);
742
743       ds->permanent_dict = dict_clone (ds->dict);
744
745       trns_chain_finalize (ds->permanent_trns_chain);
746       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
747       dataset_transformations_changed__ (ds, true);
748     }
749 }
750
751 /* Converts all the temporary transformations, if any, to permanent
752    transformations.  Further transformations will be permanent.
753
754    The FILTER command is implemented as a temporary transformation, so a
755    procedure that uses this function should usually use proc_open_filtering()
756    with FILTER false, instead of plain proc_open().
757
758    Returns true if anything changed, false otherwise. */
759 bool
760 proc_make_temporary_transformations_permanent (struct dataset *ds)
761 {
762   if (proc_in_temporary_transformations (ds))
763     {
764       trns_chain_finalize (ds->temporary_trns_chain);
765       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
766       ds->temporary_trns_chain = NULL;
767
768       ds->cur_trns_chain = ds->permanent_trns_chain;
769
770       dict_destroy (ds->permanent_dict);
771       ds->permanent_dict = NULL;
772
773       return true;
774     }
775   else
776     return false;
777 }
778
779 /* Cancels all temporary transformations, if any.  Further
780    transformations will be permanent.
781    Returns true if anything changed, false otherwise. */
782 bool
783 proc_cancel_temporary_transformations (struct dataset *ds)
784 {
785   if (proc_in_temporary_transformations (ds))
786     {
787       dict_destroy (ds->dict);
788       ds->dict = ds->permanent_dict;
789       ds->permanent_dict = NULL;
790
791       trns_chain_destroy (ds->temporary_trns_chain);
792       ds->temporary_trns_chain = NULL;
793       dataset_transformations_changed__ (
794         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
795       return true;
796     }
797   else
798     return false;
799 }
800
801 /* Cancels all transformations, if any.
802    Returns true if successful, false on I/O error. */
803 bool
804 proc_cancel_all_transformations (struct dataset *ds)
805 {
806   bool ok;
807   assert (ds->proc_state == PROC_COMMITTED);
808   ok = trns_chain_destroy (ds->permanent_trns_chain);
809   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
810   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
811   ds->temporary_trns_chain = NULL;
812   dataset_transformations_changed__ (ds, false);
813
814   return ok;
815 }
816
817 static int
818 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
819 {
820   struct variable *var = var_;
821
822   *cc = case_unshare (*cc);
823   case_data_rw (*cc, var)->f = case_num;
824
825   return TRNS_CONTINUE;
826 }
827
828 /* Add a variable which we can sort by to get back the original order. */
829 struct variable *
830 add_permanent_ordering_transformation (struct dataset *ds)
831 {
832   struct variable *temp_var;
833
834   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
835   if (proc_in_temporary_transformations (ds))
836     {
837       struct variable *perm_var;
838
839       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
840       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
841                          NULL, perm_var);
842       trns_chain_finalize (ds->permanent_trns_chain);
843     }
844   else
845     add_transformation (ds, store_case_num, NULL, temp_var);
846
847   return temp_var;
848 }
849 \f
850 /* Causes output from the next procedure to be discarded, instead
851    of being preserved for use as input for the next procedure. */
852 void
853 proc_discard_output (struct dataset *ds)
854 {
855   ds->discard_output = true;
856 }
857
858
859 /* Checks whether DS has a corrupted active dataset.  If so,
860    discards it and returns false.  If not, returns true without
861    doing anything. */
862 bool
863 dataset_end_of_command (struct dataset *ds)
864 {
865   if (ds->source != NULL)
866     {
867       if (casereader_error (ds->source))
868         {
869           dataset_clear (ds);
870           return false;
871         }
872       else
873         {
874           const struct taint *taint = casereader_get_taint (ds->source);
875           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
876           assert (!taint_has_tainted_successor (taint));
877         }
878     }
879   return true;
880 }
881 \f
882 static trns_proc_func case_limit_trns_proc;
883 static trns_free_func case_limit_trns_free;
884
885 /* Adds a transformation that limits the number of cases that may
886    pass through, if DS->DICT has a case limit. */
887 static void
888 add_case_limit_trns (struct dataset *ds)
889 {
890   casenumber case_limit = dict_get_case_limit (ds->dict);
891   if (case_limit != 0)
892     {
893       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
894       *cases_remaining = case_limit;
895       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
896                           cases_remaining);
897       dict_set_case_limit (ds->dict, 0);
898     }
899 }
900
901 /* Limits the maximum number of cases processed to
902    *CASES_REMAINING. */
903 static int
904 case_limit_trns_proc (void *cases_remaining_,
905                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
906 {
907   size_t *cases_remaining = cases_remaining_;
908   if (*cases_remaining > 0)
909     {
910       (*cases_remaining)--;
911       return TRNS_CONTINUE;
912     }
913   else
914     return TRNS_DROP_CASE;
915 }
916
917 /* Frees the data associated with a case limit transformation. */
918 static bool
919 case_limit_trns_free (void *cases_remaining_)
920 {
921   size_t *cases_remaining = cases_remaining_;
922   free (cases_remaining);
923   return true;
924 }
925 \f
926 static trns_proc_func filter_trns_proc;
927
928 /* Adds a temporary transformation to filter data according to
929    the variable specified on FILTER, if any. */
930 static void
931 add_filter_trns (struct dataset *ds)
932 {
933   struct variable *filter_var = dict_get_filter (ds->dict);
934   if (filter_var != NULL)
935     {
936       proc_start_temporary_transformations (ds);
937       add_transformation (ds, filter_trns_proc, NULL, filter_var);
938     }
939 }
940
941 /* FILTER transformation. */
942 static int
943 filter_trns_proc (void *filter_var_,
944                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
945
946 {
947   struct variable *filter_var = filter_var_;
948   double f = case_num (*c, filter_var);
949   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
950           ? TRNS_CONTINUE : TRNS_DROP_CASE);
951 }
952
953
954 void
955 dataset_need_lag (struct dataset *ds, int n_before)
956 {
957   ds->n_lag = MAX (ds->n_lag, n_before);
958 }
959 \f
960 static void
961 dataset_changed__ (struct dataset *ds)
962 {
963   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
964     ds->callbacks->changed (ds->cb_data);
965 }
966
967 static void
968 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
969 {
970   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
971     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
972 }
973 \f
974 /* Private interface for use by session code. */
975
976 void
977 dataset_set_session__ (struct dataset *ds, struct session *session)
978 {
979   if (ds->session != session)
980     {
981       if (ds->session)
982         dataset_unref (ds->session);
983       ds->session = session;
984       if (session)
985         dataset_ref (session);
986     }
987 }