10005e0aca67968a72c12a45d4042104fc9ff327
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds = XZALLOC (struct dataset);
147   ds->name = xstrdup (name);
148   ds->display = DATASET_FRONT;
149   ds->dict = dict_create (get_default_encoding ());
150
151   ds->caseinit = caseinit_create ();
152
153   dataset_create_finish__ (ds, session);
154
155   return ds;
156 }
157
158 /* Creates and returns a new dataset that has the same data and dictionary as
159    OLD named NAME, adds it to the same session as OLD, and returns the new
160    dataset.  If SESSION already contains a dataset named NAME, it is deleted
161    and replaced.
162
163    OLD must not have any active transformations or temporary state and must
164    not be in the middle of a procedure.
165
166    Callbacks are not cloned. */
167 struct dataset *
168 dataset_clone (struct dataset *old, const char *name)
169 {
170   struct dataset *new;
171
172   assert (old->proc_state == PROC_COMMITTED);
173   assert (trns_chain_is_empty (old->permanent_trns_chain));
174   assert (old->permanent_dict == NULL);
175   assert (old->sink == NULL);
176   assert (old->temporary_trns_chain == NULL);
177
178   new = xzalloc (sizeof *new);
179   new->name = xstrdup (name);
180   new->display = DATASET_FRONT;
181   new->source = casereader_clone (old->source);
182   new->dict = dict_clone (old->dict);
183   new->caseinit = caseinit_clone (old->caseinit);
184   new->last_proc_invocation = old->last_proc_invocation;
185   new->ok = old->ok;
186
187   dataset_create_finish__ (new, old->session);
188
189   return new;
190 }
191
192 /* Destroys DS. */
193 void
194 dataset_destroy (struct dataset *ds)
195 {
196   if (ds != NULL)
197     {
198       dataset_set_session (ds, NULL);
199       dataset_clear (ds);
200       dict_unref (ds->dict);
201       caseinit_destroy (ds->caseinit);
202       trns_chain_destroy (ds->permanent_trns_chain);
203       dataset_transformations_changed__ (ds, false);
204       free (ds->name);
205       free (ds);
206     }
207 }
208
209 /* Discards the active dataset's dictionary, data, and transformations. */
210 void
211 dataset_clear (struct dataset *ds)
212 {
213   assert (ds->proc_state == PROC_COMMITTED);
214
215   dict_clear (ds->dict);
216   fh_set_default_handle (NULL);
217
218   ds->n_lag = 0;
219
220   casereader_destroy (ds->source);
221   ds->source = NULL;
222
223   proc_cancel_all_transformations (ds);
224 }
225
226 const char *
227 dataset_name (const struct dataset *ds)
228 {
229   return ds->name;
230 }
231
232 void
233 dataset_set_name (struct dataset *ds, const char *name)
234 {
235   struct session *session = ds->session;
236   bool active = false;
237
238   if (session != NULL)
239     {
240       active = session_active_dataset (session) == ds;
241       if (active)
242         session_set_active_dataset (session, NULL);
243       dataset_set_session (ds, NULL);
244     }
245
246   free (ds->name);
247   ds->name = xstrdup (name);
248
249   if (session != NULL)
250     {
251       dataset_set_session (ds, session);
252       if (active)
253         session_set_active_dataset (session, ds);
254     }
255 }
256
257 struct session *
258 dataset_session (const struct dataset *ds)
259 {
260   return ds->session;
261 }
262
263 void
264 dataset_set_session (struct dataset *ds, struct session *session)
265 {
266   if (session != ds->session)
267     {
268       if (ds->session != NULL)
269         session_remove_dataset (ds->session, ds);
270       if (session != NULL)
271         session_add_dataset (session, ds);
272     }
273 }
274
275 /* Returns the dictionary within DS.  This is always nonnull, although it
276    might not contain any variables. */
277 struct dictionary *
278 dataset_dict (const struct dataset *ds)
279 {
280   return ds->dict;
281 }
282
283 /* Replaces DS's dictionary by DICT, discarding any source and
284    transformations. */
285 void
286 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
287 {
288   assert (ds->proc_state == PROC_COMMITTED);
289   assert (ds->dict != dict);
290
291   dataset_clear (ds);
292
293   dict_unref (ds->dict);
294   ds->dict = dict;
295   dict_set_change_callback (ds->dict, dict_callback, ds);
296 }
297
298 /* Returns the casereader that will be read when a procedure is executed on
299    DS.  This can be NULL if none has been set up yet. */
300 const struct casereader *
301 dataset_source (const struct dataset *ds)
302 {
303   return ds->source;
304 }
305
306 /* Returns true if DS has a data source, false otherwise. */
307 bool
308 dataset_has_source (const struct dataset *ds)
309 {
310   return dataset_source (ds) != NULL;
311 }
312
313 /* Replaces the active dataset's data by READER.  READER's cases must have an
314    appropriate format for DS's dictionary. */
315 bool
316 dataset_set_source (struct dataset *ds, struct casereader *reader)
317 {
318   casereader_destroy (ds->source);
319   ds->source = reader;
320
321   caseinit_clear (ds->caseinit);
322   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
323
324   return reader == NULL || !casereader_error (reader);
325 }
326
327 /* Returns the data source from DS and removes it from DS.  Returns a null
328    pointer if DS has no data source. */
329 struct casereader *
330 dataset_steal_source (struct dataset *ds)
331 {
332   struct casereader *reader = ds->source;
333   ds->source = NULL;
334
335   return reader;
336 }
337
338 /* Returns a number unique to DS.  It can be used to distinguish one dataset
339    from any other within a given program run, even datasets that do not exist
340    at the same time. */
341 unsigned int
342 dataset_seqno (const struct dataset *ds)
343 {
344   return ds->seqno;
345 }
346
347 void
348 dataset_set_callbacks (struct dataset *ds,
349                        const struct dataset_callbacks *callbacks,
350                        void *cb_data)
351 {
352   ds->callbacks = callbacks;
353   ds->cb_data = cb_data;
354 }
355
356 enum dataset_display
357 dataset_get_display (const struct dataset *ds)
358 {
359   return ds->display;
360 }
361
362 void
363 dataset_set_display (struct dataset *ds, enum dataset_display display)
364 {
365   ds->display = display;
366 }
367 \f
368 /* Returns the last time the data was read. */
369 time_t
370 time_of_last_procedure (struct dataset *ds)
371 {
372   if (ds->last_proc_invocation == 0)
373     update_last_proc_invocation (ds);
374   return ds->last_proc_invocation;
375 }
376 \f
377 /* Regular procedure. */
378
379 /* Executes any pending transformations, if necessary.
380    This is not identical to the EXECUTE command in that it won't
381    always read the source data.  This can be important when the
382    source data is given inline within BEGIN DATA...END FILE. */
383 bool
384 proc_execute (struct dataset *ds)
385 {
386   bool ok;
387
388   if ((ds->temporary_trns_chain == NULL
389        || trns_chain_is_empty (ds->temporary_trns_chain))
390       && trns_chain_is_empty (ds->permanent_trns_chain))
391     {
392       ds->n_lag = 0;
393       ds->discard_output = false;
394       dict_set_case_limit (ds->dict, 0);
395       dict_clear_vectors (ds->dict);
396       return true;
397     }
398
399   ok = casereader_destroy (proc_open (ds));
400   return proc_commit (ds) && ok;
401 }
402
403 static const struct casereader_class proc_casereader_class;
404
405 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
406    cases filtered out with FILTER BY will not be included in the casereader
407    (which is usually desirable).  If FILTER is false, all cases will be
408    included regardless of FILTER BY settings.
409
410    proc_commit must be called when done. */
411 struct casereader *
412 proc_open_filtering (struct dataset *ds, bool filter)
413 {
414   struct casereader *reader;
415
416   assert (ds->source != NULL);
417   assert (ds->proc_state == PROC_COMMITTED);
418
419   update_last_proc_invocation (ds);
420
421   caseinit_mark_for_init (ds->caseinit, ds->dict);
422
423   /* Finish up the collection of transformations. */
424   add_case_limit_trns (ds);
425   if (filter)
426     add_filter_trns (ds);
427   trns_chain_finalize (ds->cur_trns_chain);
428
429   /* Make permanent_dict refer to the dictionary right before
430      data reaches the sink. */
431   if (ds->permanent_dict == NULL)
432     ds->permanent_dict = ds->dict;
433
434   /* Prepare sink. */
435   if (!ds->discard_output)
436     {
437       struct dictionary *pd = ds->permanent_dict;
438       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
439       if (compacted_value_cnt < dict_get_next_value_idx (pd))
440         {
441           struct caseproto *compacted_proto;
442           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
443           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
444           ds->sink = autopaging_writer_create (compacted_proto);
445           caseproto_unref (compacted_proto);
446         }
447       else
448         {
449           ds->compactor = NULL;
450           ds->sink = autopaging_writer_create (dict_get_proto (pd));
451         }
452     }
453   else
454     {
455       ds->compactor = NULL;
456       ds->sink = NULL;
457     }
458
459   /* Allocate memory for lagged cases. */
460   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
461
462   ds->proc_state = PROC_OPEN;
463   ds->cases_written = 0;
464   ds->ok = true;
465
466   /* FIXME: use taint in dataset in place of `ok'? */
467   /* FIXME: for trivial cases we can just return a clone of
468      ds->source? */
469
470   /* Create casereader and insert a shim on top.  The shim allows us to
471      arbitrarily extend the casereader's lifetime, by slurping the cases into
472      the shim's buffer in proc_commit().  That is especially useful when output
473      table_items are generated directly from the procedure casereader (e.g. by
474      the LIST procedure) when we are using an output driver that keeps a
475      reference to the output items passed to it (e.g. the GUI output driver in
476      PSPPIRE). */
477   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
478                                          CASENUMBER_MAX,
479                                          &proc_casereader_class, ds);
480   ds->shim = casereader_shim_insert (reader);
481   return reader;
482 }
483
484 /* Opens dataset DS for reading cases with proc_read.
485    proc_commit must be called when done. */
486 struct casereader *
487 proc_open (struct dataset *ds)
488 {
489   return proc_open_filtering (ds, true);
490 }
491
492 /* Returns true if a procedure is in progress, that is, if
493    proc_open has been called but proc_commit has not. */
494 bool
495 proc_is_open (const struct dataset *ds)
496 {
497   return ds->proc_state != PROC_COMMITTED;
498 }
499
500 /* "read" function for procedure casereader. */
501 static struct ccase *
502 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
503 {
504   struct dataset *ds = ds_;
505   enum trns_result retval = TRNS_DROP_CASE;
506   struct ccase *c;
507
508   assert (ds->proc_state == PROC_OPEN);
509   for (; ; case_unref (c))
510     {
511       casenumber case_nr;
512
513       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
514       if (retval == TRNS_ERROR)
515         ds->ok = false;
516       if (!ds->ok)
517         return NULL;
518
519       /* Read a case from source. */
520       c = casereader_read (ds->source);
521       if (c == NULL)
522         return NULL;
523       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
524       caseinit_init_vars (ds->caseinit, c);
525
526       /* Execute permanent transformations.  */
527       case_nr = ds->cases_written + 1;
528       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
529                                    &c, case_nr);
530       caseinit_update_left_vars (ds->caseinit, c);
531       if (retval != TRNS_CONTINUE)
532         continue;
533
534       /* Write case to collection of lagged cases. */
535       if (ds->n_lag > 0)
536         {
537           while (deque_count (&ds->lag) >= ds->n_lag)
538             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
539           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
540         }
541
542       /* Write case to replacement dataset. */
543       ds->cases_written++;
544       if (ds->sink != NULL)
545         casewriter_write (ds->sink,
546                           case_map_execute (ds->compactor, case_ref (c)));
547
548       /* Execute temporary transformations. */
549       if (ds->temporary_trns_chain != NULL)
550         {
551           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
552                                        &c, ds->cases_written);
553           if (retval != TRNS_CONTINUE)
554             continue;
555         }
556
557       return c;
558     }
559 }
560
561 /* "destroy" function for procedure casereader. */
562 static void
563 proc_casereader_destroy (struct casereader *reader, void *ds_)
564 {
565   struct dataset *ds = ds_;
566   struct ccase *c;
567
568   /* We are always the subreader for a casereader_buffer, so if we're being
569      destroyed then it's because the casereader_buffer has read all the cases
570      that it ever will. */
571   ds->shim = NULL;
572
573   /* Make sure transformations happen for every input case, in
574      case they have side effects, and ensure that the replacement
575      active dataset gets all the cases it should. */
576   while ((c = casereader_read (reader)) != NULL)
577     case_unref (c);
578
579   ds->proc_state = PROC_CLOSED;
580   ds->ok = casereader_destroy (ds->source) && ds->ok;
581   ds->source = NULL;
582   dataset_set_source (ds, NULL);
583 }
584
585 /* Must return false if the source casereader, a transformation,
586    or the sink casewriter signaled an error.  (If a temporary
587    transformation signals an error, then the return value is
588    false, but the replacement active dataset may still be
589    untainted.) */
590 bool
591 proc_commit (struct dataset *ds)
592 {
593   if (ds->shim != NULL)
594     casereader_shim_slurp (ds->shim);
595
596   assert (ds->proc_state == PROC_CLOSED);
597   ds->proc_state = PROC_COMMITTED;
598
599   dataset_changed__ (ds);
600
601   /* Free memory for lagged cases. */
602   while (!deque_is_empty (&ds->lag))
603     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
604   free (ds->lag_cases);
605
606   /* Dictionary from before TEMPORARY becomes permanent. */
607   proc_cancel_temporary_transformations (ds);
608
609   if (!ds->discard_output)
610     {
611       /* Finish compacting. */
612       if (ds->compactor != NULL)
613         {
614           case_map_destroy (ds->compactor);
615           ds->compactor = NULL;
616
617           dict_delete_scratch_vars (ds->dict);
618           dict_compact_values (ds->dict);
619         }
620
621       /* Old data sink becomes new data source. */
622       if (ds->sink != NULL)
623         ds->source = casewriter_make_reader (ds->sink);
624     }
625   else
626     {
627       ds->source = NULL;
628       ds->discard_output = false;
629     }
630   ds->sink = NULL;
631
632   caseinit_clear (ds->caseinit);
633   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
634
635   dict_clear_vectors (ds->dict);
636   ds->permanent_dict = NULL;
637   return proc_cancel_all_transformations (ds) && ds->ok;
638 }
639
640 /* Casereader class for procedure execution. */
641 static const struct casereader_class proc_casereader_class =
642   {
643     proc_casereader_read,
644     proc_casereader_destroy,
645     NULL,
646     NULL,
647   };
648
649 /* Updates last_proc_invocation. */
650 static void
651 update_last_proc_invocation (struct dataset *ds)
652 {
653   ds->last_proc_invocation = time (NULL);
654 }
655 \f
656 /* Returns a pointer to the lagged case from N_BEFORE cases before the
657    current one, or NULL if there haven't been that many cases yet. */
658 const struct ccase *
659 lagged_case (const struct dataset *ds, int n_before)
660 {
661   assert (n_before >= 1);
662   assert (n_before <= ds->n_lag);
663
664   if (n_before <= deque_count (&ds->lag))
665     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
666   else
667     return NULL;
668 }
669 \f
670 /* Returns the current set of permanent transformations,
671    and clears the permanent transformations.
672    For use by INPUT PROGRAM. */
673 struct trns_chain *
674 proc_capture_transformations (struct dataset *ds)
675 {
676   struct trns_chain *chain;
677
678   assert (ds->temporary_trns_chain == NULL);
679   chain = ds->permanent_trns_chain;
680   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
681   dataset_transformations_changed__ (ds, false);
682
683   return chain;
684 }
685
686 /* Adds a transformation that processes a case with PROC and
687    frees itself with FREE to the current set of transformations.
688    The functions are passed AUX as auxiliary data. */
689 void
690 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
691 {
692   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
693   dataset_transformations_changed__ (ds, true);
694 }
695
696 /* Adds a transformation that processes a case with PROC and
697    frees itself with FREE to the current set of transformations.
698    When parsing of the block of transformations is complete,
699    FINALIZE will be called.
700    The functions are passed AUX as auxiliary data. */
701 void
702 add_transformation_with_finalizer (struct dataset *ds,
703                                    trns_finalize_func *finalize,
704                                    trns_proc_func *proc,
705                                    trns_free_func *free, void *aux)
706 {
707   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
708   dataset_transformations_changed__ (ds, true);
709 }
710
711 /* Returns the index of the next transformation.
712    This value can be returned by a transformation procedure
713    function to indicate a "jump" to that transformation. */
714 size_t
715 next_transformation (const struct dataset *ds)
716 {
717   return trns_chain_next (ds->cur_trns_chain);
718 }
719
720 /* Returns true if the next call to add_transformation() will add
721    a temporary transformation, false if it will add a permanent
722    transformation. */
723 bool
724 proc_in_temporary_transformations (const struct dataset *ds)
725 {
726   return ds->temporary_trns_chain != NULL;
727 }
728
729 /* Marks the start of temporary transformations.
730    Further calls to add_transformation() will add temporary
731    transformations. */
732 void
733 proc_start_temporary_transformations (struct dataset *ds)
734 {
735   if (!proc_in_temporary_transformations (ds))
736     {
737       add_case_limit_trns (ds);
738
739       ds->permanent_dict = dict_clone (ds->dict);
740
741       trns_chain_finalize (ds->permanent_trns_chain);
742       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
743       dataset_transformations_changed__ (ds, true);
744     }
745 }
746
747 /* Converts all the temporary transformations, if any, to permanent
748    transformations.  Further transformations will be permanent.
749
750    The FILTER command is implemented as a temporary transformation, so a
751    procedure that uses this function should usually use proc_open_filtering()
752    with FILTER false, instead of plain proc_open().
753
754    Returns true if anything changed, false otherwise. */
755 bool
756 proc_make_temporary_transformations_permanent (struct dataset *ds)
757 {
758   if (proc_in_temporary_transformations (ds))
759     {
760       trns_chain_finalize (ds->temporary_trns_chain);
761       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
762       ds->temporary_trns_chain = NULL;
763
764       ds->cur_trns_chain = ds->permanent_trns_chain;
765
766       dict_unref (ds->permanent_dict);
767       ds->permanent_dict = NULL;
768
769       return true;
770     }
771   else
772     return false;
773 }
774
775 /* Cancels all temporary transformations, if any.  Further
776    transformations will be permanent.
777    Returns true if anything changed, false otherwise. */
778 bool
779 proc_cancel_temporary_transformations (struct dataset *ds)
780 {
781   if (proc_in_temporary_transformations (ds))
782     {
783       dict_unref (ds->dict);
784       ds->dict = ds->permanent_dict;
785       ds->permanent_dict = NULL;
786
787       trns_chain_destroy (ds->temporary_trns_chain);
788       ds->temporary_trns_chain = NULL;
789       dataset_transformations_changed__ (
790         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
791       return true;
792     }
793   else
794     return false;
795 }
796
797 /* Cancels all transformations, if any.
798    Returns true if successful, false on I/O error. */
799 bool
800 proc_cancel_all_transformations (struct dataset *ds)
801 {
802   bool ok;
803   assert (ds->proc_state == PROC_COMMITTED);
804   ok = trns_chain_destroy (ds->permanent_trns_chain);
805   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
806   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
807   ds->temporary_trns_chain = NULL;
808   dataset_transformations_changed__ (ds, false);
809
810   return ok;
811 }
812
813 static int
814 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
815 {
816   struct variable *var = var_;
817
818   *cc = case_unshare (*cc);
819   *case_num_rw (*cc, var) = case_num;
820
821   return TRNS_CONTINUE;
822 }
823
824 /* Add a variable which we can sort by to get back the original order. */
825 struct variable *
826 add_permanent_ordering_transformation (struct dataset *ds)
827 {
828   struct variable *temp_var;
829
830   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
831   if (proc_in_temporary_transformations (ds))
832     {
833       struct variable *perm_var;
834
835       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
836       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
837                          NULL, perm_var);
838       trns_chain_finalize (ds->permanent_trns_chain);
839     }
840   else
841     add_transformation (ds, store_case_num, NULL, temp_var);
842
843   return temp_var;
844 }
845 \f
846 /* Causes output from the next procedure to be discarded, instead
847    of being preserved for use as input for the next procedure. */
848 void
849 proc_discard_output (struct dataset *ds)
850 {
851   ds->discard_output = true;
852 }
853
854
855 /* Checks whether DS has a corrupted active dataset.  If so,
856    discards it and returns false.  If not, returns true without
857    doing anything. */
858 bool
859 dataset_end_of_command (struct dataset *ds)
860 {
861   if (ds->source != NULL)
862     {
863       if (casereader_error (ds->source))
864         {
865           dataset_clear (ds);
866           return false;
867         }
868       else
869         {
870           const struct taint *taint = casereader_get_taint (ds->source);
871           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
872           assert (!taint_has_tainted_successor (taint));
873         }
874     }
875   return true;
876 }
877 \f
878 static trns_proc_func case_limit_trns_proc;
879 static trns_free_func case_limit_trns_free;
880
881 /* Adds a transformation that limits the number of cases that may
882    pass through, if DS->DICT has a case limit. */
883 static void
884 add_case_limit_trns (struct dataset *ds)
885 {
886   casenumber case_limit = dict_get_case_limit (ds->dict);
887   if (case_limit != 0)
888     {
889       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
890       *cases_remaining = case_limit;
891       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
892                           cases_remaining);
893       dict_set_case_limit (ds->dict, 0);
894     }
895 }
896
897 /* Limits the maximum number of cases processed to
898    *CASES_REMAINING. */
899 static int
900 case_limit_trns_proc (void *cases_remaining_,
901                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
902 {
903   size_t *cases_remaining = cases_remaining_;
904   if (*cases_remaining > 0)
905     {
906       (*cases_remaining)--;
907       return TRNS_CONTINUE;
908     }
909   else
910     return TRNS_DROP_CASE;
911 }
912
913 /* Frees the data associated with a case limit transformation. */
914 static bool
915 case_limit_trns_free (void *cases_remaining_)
916 {
917   size_t *cases_remaining = cases_remaining_;
918   free (cases_remaining);
919   return true;
920 }
921 \f
922 static trns_proc_func filter_trns_proc;
923
924 /* Adds a temporary transformation to filter data according to
925    the variable specified on FILTER, if any. */
926 static void
927 add_filter_trns (struct dataset *ds)
928 {
929   struct variable *filter_var = dict_get_filter (ds->dict);
930   if (filter_var != NULL)
931     {
932       proc_start_temporary_transformations (ds);
933       add_transformation (ds, filter_trns_proc, NULL, filter_var);
934     }
935 }
936
937 /* FILTER transformation. */
938 static int
939 filter_trns_proc (void *filter_var_,
940                   struct ccase **c, casenumber case_nr UNUSED)
941
942 {
943   struct variable *filter_var = filter_var_;
944   double f = case_num (*c, filter_var);
945   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
946           ? TRNS_CONTINUE : TRNS_DROP_CASE);
947 }
948
949
950 void
951 dataset_need_lag (struct dataset *ds, int n_before)
952 {
953   ds->n_lag = MAX (ds->n_lag, n_before);
954 }
955 \f
956 static void
957 dataset_changed__ (struct dataset *ds)
958 {
959   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
960     ds->callbacks->changed (ds->cb_data);
961 }
962
963 static void
964 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
965 {
966   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
967     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
968 }
969 \f
970 /* Private interface for use by session code. */
971
972 void
973 dataset_set_session__ (struct dataset *ds, struct session *session)
974 {
975   ds->session = session;
976 }