better tests
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain temporary_trns_chain;
71   bool temporary;
72   struct dictionary *dict;
73
74   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
75   struct trns_chain *stack;
76   size_t n_stack;
77   size_t allocated_stack;
78
79   /* If true, cases are discarded instead of being written to
80      sink. */
81   bool discard_output;
82
83   /* The case map used to compact a case, if necessary;
84      otherwise a null pointer. */
85   struct case_map *compactor;
86
87   /* Time at which proc was last invoked. */
88   time_t last_proc_invocation;
89
90   /* Cases just before ("lagging") the current one. */
91   int n_lag;                    /* Number of cases to lag. */
92   struct deque lag;             /* Deque of lagged cases. */
93   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
94
95   /* Procedure data. */
96   enum
97     {
98       PROC_COMMITTED,           /* No procedure in progress. */
99       PROC_OPEN,                /* proc_open called, casereader still open. */
100       PROC_CLOSED               /* casereader from proc_open destroyed,
101                                    but proc_commit not yet called. */
102     }
103   proc_state;
104   casenumber cases_written;     /* Cases output so far. */
105   bool ok;                      /* Error status. */
106   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
107
108   const struct dataset_callbacks *callbacks;
109   void *cb_data;
110
111   /* Uniquely distinguishes datasets. */
112   unsigned int seqno;
113 };
114
115 static void dataset_changed__ (struct dataset *);
116 static void dataset_transformations_changed__ (struct dataset *,
117                                                bool non_empty);
118
119 static void add_case_limit_trns (struct dataset *ds);
120 static void add_filter_trns (struct dataset *ds);
121
122 static void update_last_proc_invocation (struct dataset *ds);
123
124 static void
125 dict_callback (struct dictionary *d UNUSED, void *ds_)
126 {
127   struct dataset *ds = ds_;
128   dataset_changed__ (ds);
129 }
130 \f
131 static void
132 dataset_create_finish__ (struct dataset *ds, struct session *session)
133 {
134   static unsigned int seqno;
135
136   dict_set_change_callback (ds->dict, dict_callback, ds);
137   proc_cancel_all_transformations (ds);
138   dataset_set_session (ds, session);
139   ds->seqno = ++seqno;
140 }
141
142 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
143    SESSION already contains a dataset named NAME, it is deleted and replaced.
144    The dataset initially has an empty dictionary and no data source. */
145 struct dataset *
146 dataset_create (struct session *session, const char *name)
147 {
148   struct dataset *ds = XMALLOC (struct dataset);
149   *ds = (struct dataset) {
150     .name = xstrdup (name),
151     .display = DATASET_FRONT,
152     .dict = dict_create (get_default_encoding ()),
153     .caseinit = caseinit_create (),
154   };
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (!old->permanent_trns_chain.n);
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (!old->temporary);
179   assert (!old->temporary_trns_chain.n);
180
181   new = xzalloc (sizeof *new);
182   new->name = xstrdup (name);
183   new->display = DATASET_FRONT;
184   new->source = casereader_clone (old->source);
185   new->dict = dict_clone (old->dict);
186   new->caseinit = caseinit_clone (old->caseinit);
187   new->last_proc_invocation = old->last_proc_invocation;
188   new->ok = old->ok;
189
190   dataset_create_finish__ (new, old->session);
191
192   return new;
193 }
194
195 /* Destroys DS. */
196 void
197 dataset_destroy (struct dataset *ds)
198 {
199   if (ds != NULL)
200     {
201       dataset_set_session (ds, NULL);
202       dataset_clear (ds);
203       dict_unref (ds->dict);
204       dict_unref (ds->permanent_dict);
205       caseinit_destroy (ds->caseinit);
206       trns_chain_uninit (&ds->permanent_trns_chain);
207       for (size_t i = 0; i < ds->n_stack; i++)
208         trns_chain_uninit (&ds->stack[i]);
209       free (ds->stack);
210       dataset_transformations_changed__ (ds, false);
211       free (ds->name);
212       free (ds);
213     }
214 }
215
216 /* Discards the active dataset's dictionary, data, and transformations. */
217 void
218 dataset_clear (struct dataset *ds)
219 {
220   assert (ds->proc_state == PROC_COMMITTED);
221
222   dict_clear (ds->dict);
223   fh_set_default_handle (NULL);
224
225   ds->n_lag = 0;
226
227   casereader_destroy (ds->source);
228   ds->source = NULL;
229
230   proc_cancel_all_transformations (ds);
231 }
232
233 const char *
234 dataset_name (const struct dataset *ds)
235 {
236   return ds->name;
237 }
238
239 void
240 dataset_set_name (struct dataset *ds, const char *name)
241 {
242   struct session *session = ds->session;
243   bool active = false;
244
245   if (session != NULL)
246     {
247       active = session_active_dataset (session) == ds;
248       if (active)
249         session_set_active_dataset (session, NULL);
250       dataset_set_session (ds, NULL);
251     }
252
253   free (ds->name);
254   ds->name = xstrdup (name);
255
256   if (session != NULL)
257     {
258       dataset_set_session (ds, session);
259       if (active)
260         session_set_active_dataset (session, ds);
261     }
262 }
263
264 struct session *
265 dataset_session (const struct dataset *ds)
266 {
267   return ds->session;
268 }
269
270 void
271 dataset_set_session (struct dataset *ds, struct session *session)
272 {
273   if (session != ds->session)
274     {
275       if (ds->session != NULL)
276         session_remove_dataset (ds->session, ds);
277       if (session != NULL)
278         session_add_dataset (session, ds);
279     }
280 }
281
282 /* Returns the dictionary within DS.  This is always nonnull, although it
283    might not contain any variables. */
284 struct dictionary *
285 dataset_dict (const struct dataset *ds)
286 {
287   return ds->dict;
288 }
289
290 /* Replaces DS's dictionary by DICT, discarding any source and
291    transformations. */
292 void
293 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
294 {
295   assert (ds->proc_state == PROC_COMMITTED);
296   assert (ds->dict != dict);
297
298   dataset_clear (ds);
299
300   dict_unref (ds->dict);
301   ds->dict = dict;
302   dict_set_change_callback (ds->dict, dict_callback, ds);
303 }
304
305 /* Returns the casereader that will be read when a procedure is executed on
306    DS.  This can be NULL if none has been set up yet. */
307 const struct casereader *
308 dataset_source (const struct dataset *ds)
309 {
310   return ds->source;
311 }
312
313 /* Returns true if DS has a data source, false otherwise. */
314 bool
315 dataset_has_source (const struct dataset *ds)
316 {
317   return dataset_source (ds) != NULL;
318 }
319
320 /* Replaces the active dataset's data by READER.  READER's cases must have an
321    appropriate format for DS's dictionary. */
322 bool
323 dataset_set_source (struct dataset *ds, struct casereader *reader)
324 {
325   casereader_destroy (ds->source);
326   ds->source = reader;
327
328   caseinit_clear (ds->caseinit);
329   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
330
331   return reader == NULL || !casereader_error (reader);
332 }
333
334 /* Returns the data source from DS and removes it from DS.  Returns a null
335    pointer if DS has no data source. */
336 struct casereader *
337 dataset_steal_source (struct dataset *ds)
338 {
339   struct casereader *reader = ds->source;
340   ds->source = NULL;
341
342   return reader;
343 }
344
345 /* Returns a number unique to DS.  It can be used to distinguish one dataset
346    from any other within a given program run, even datasets that do not exist
347    at the same time. */
348 unsigned int
349 dataset_seqno (const struct dataset *ds)
350 {
351   return ds->seqno;
352 }
353
354 void
355 dataset_set_callbacks (struct dataset *ds,
356                        const struct dataset_callbacks *callbacks,
357                        void *cb_data)
358 {
359   ds->callbacks = callbacks;
360   ds->cb_data = cb_data;
361 }
362
363 enum dataset_display
364 dataset_get_display (const struct dataset *ds)
365 {
366   return ds->display;
367 }
368
369 void
370 dataset_set_display (struct dataset *ds, enum dataset_display display)
371 {
372   ds->display = display;
373 }
374 \f
375 /* Returns the last time the data was read. */
376 time_t
377 time_of_last_procedure (struct dataset *ds)
378 {
379   if (!ds)
380     return time (NULL);
381   if (ds->last_proc_invocation == 0)
382     update_last_proc_invocation (ds);
383   return ds->last_proc_invocation;
384 }
385 \f
386 /* Regular procedure. */
387
388 /* Executes any pending transformations, if necessary.
389    This is not identical to the EXECUTE command in that it won't
390    always read the source data.  This can be important when the
391    source data is given inline within BEGIN DATA...END FILE. */
392 bool
393 proc_execute (struct dataset *ds)
394 {
395   bool ok;
396
397   if ((!ds->temporary || !ds->temporary_trns_chain.n)
398       && !ds->permanent_trns_chain.n)
399     {
400       ds->n_lag = 0;
401       ds->discard_output = false;
402       dict_set_case_limit (ds->dict, 0);
403       dict_clear_vectors (ds->dict);
404       return true;
405     }
406
407   ok = casereader_destroy (proc_open (ds));
408   return proc_commit (ds) && ok;
409 }
410
411 static const struct casereader_class proc_casereader_class;
412
413 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
414    cases filtered out with FILTER BY will not be included in the casereader
415    (which is usually desirable).  If FILTER is false, all cases will be
416    included regardless of FILTER BY settings.
417
418    proc_commit must be called when done. */
419 struct casereader *
420 proc_open_filtering (struct dataset *ds, bool filter)
421 {
422   struct casereader *reader;
423
424   assert (ds->source != NULL);
425   assert (ds->proc_state == PROC_COMMITTED);
426
427   update_last_proc_invocation (ds);
428
429   caseinit_mark_for_init (ds->caseinit, ds->dict);
430
431   /* Finish up the collection of transformations. */
432   add_case_limit_trns (ds);
433   if (filter)
434     add_filter_trns (ds);
435
436   /* Make permanent_dict refer to the dictionary right before
437      data reaches the sink. */
438   if (ds->permanent_dict == NULL)
439     ds->permanent_dict = ds->dict;
440
441   /* Prepare sink. */
442   if (!ds->discard_output)
443     {
444       struct dictionary *pd = ds->permanent_dict;
445       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
446       if (compacted_n_values < dict_get_next_value_idx (pd))
447         {
448           struct caseproto *compacted_proto;
449           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
450           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
451           ds->sink = autopaging_writer_create (compacted_proto);
452           caseproto_unref (compacted_proto);
453         }
454       else
455         {
456           ds->compactor = NULL;
457           ds->sink = autopaging_writer_create (dict_get_proto (pd));
458         }
459     }
460   else
461     {
462       ds->compactor = NULL;
463       ds->sink = NULL;
464     }
465
466   /* Allocate memory for lagged cases. */
467   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
468
469   ds->proc_state = PROC_OPEN;
470   ds->cases_written = 0;
471   ds->ok = true;
472
473   /* FIXME: use taint in dataset in place of `ok'? */
474   /* FIXME: for trivial cases we can just return a clone of
475      ds->source? */
476
477   /* Create casereader and insert a shim on top.  The shim allows us to
478      arbitrarily extend the casereader's lifetime, by slurping the cases into
479      the shim's buffer in proc_commit().  That is especially useful when output
480      table_items are generated directly from the procedure casereader (e.g. by
481      the LIST procedure) when we are using an output driver that keeps a
482      reference to the output items passed to it (e.g. the GUI output driver in
483      PSPPIRE). */
484   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
485                                          CASENUMBER_MAX,
486                                          &proc_casereader_class, ds);
487   ds->shim = casereader_shim_insert (reader);
488   return reader;
489 }
490
491 /* Opens dataset DS for reading cases with proc_read.
492    proc_commit must be called when done. */
493 struct casereader *
494 proc_open (struct dataset *ds)
495 {
496   return proc_open_filtering (ds, true);
497 }
498
499 /* Returns true if a procedure is in progress, that is, if
500    proc_open has been called but proc_commit has not. */
501 bool
502 proc_is_open (const struct dataset *ds)
503 {
504   return ds->proc_state != PROC_COMMITTED;
505 }
506
507 /* "read" function for procedure casereader. */
508 static struct ccase *
509 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
510 {
511   struct dataset *ds = ds_;
512   enum trns_result retval = TRNS_DROP_CASE;
513   struct ccase *c;
514
515   assert (ds->proc_state == PROC_OPEN);
516   for (; ; case_unref (c))
517     {
518       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
519       if (retval == TRNS_ERROR)
520         ds->ok = false;
521       if (!ds->ok)
522         return NULL;
523
524       /* Read a case from source. */
525       c = casereader_read (ds->source);
526       if (c == NULL)
527         return NULL;
528       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
529       caseinit_init_vars (ds->caseinit, c);
530
531       /* Execute permanent transformations.  */
532       casenumber case_nr = ds->cases_written + 1;
533       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
534       caseinit_update_left_vars (ds->caseinit, c);
535       if (retval != TRNS_CONTINUE)
536         continue;
537
538       /* Write case to collection of lagged cases. */
539       if (ds->n_lag > 0)
540         {
541           while (deque_count (&ds->lag) >= ds->n_lag)
542             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
543           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
544         }
545
546       /* Write case to replacement dataset. */
547       ds->cases_written++;
548       if (ds->sink != NULL)
549         casewriter_write (ds->sink,
550                           case_map_execute (ds->compactor, case_ref (c)));
551
552       /* Execute temporary transformations. */
553       if (ds->temporary_trns_chain.n)
554         {
555           retval = trns_chain_execute (&ds->temporary_trns_chain,
556                                        ds->cases_written, &c);
557           if (retval != TRNS_CONTINUE)
558             continue;
559         }
560
561       return c;
562     }
563 }
564
565 /* "destroy" function for procedure casereader. */
566 static void
567 proc_casereader_destroy (struct casereader *reader, void *ds_)
568 {
569   struct dataset *ds = ds_;
570   struct ccase *c;
571
572   /* We are always the subreader for a casereader_buffer, so if we're being
573      destroyed then it's because the casereader_buffer has read all the cases
574      that it ever will. */
575   ds->shim = NULL;
576
577   /* Make sure transformations happen for every input case, in
578      case they have side effects, and ensure that the replacement
579      active dataset gets all the cases it should. */
580   while ((c = casereader_read (reader)) != NULL)
581     case_unref (c);
582
583   ds->proc_state = PROC_CLOSED;
584   ds->ok = casereader_destroy (ds->source) && ds->ok;
585   ds->source = NULL;
586   dataset_set_source (ds, NULL);
587 }
588
589 /* Must return false if the source casereader, a transformation,
590    or the sink casewriter signaled an error.  (If a temporary
591    transformation signals an error, then the return value is
592    false, but the replacement active dataset may still be
593    untainted.) */
594 bool
595 proc_commit (struct dataset *ds)
596 {
597   if (ds->shim != NULL)
598     casereader_shim_slurp (ds->shim);
599
600   assert (ds->proc_state == PROC_CLOSED);
601   ds->proc_state = PROC_COMMITTED;
602
603   dataset_changed__ (ds);
604
605   /* Free memory for lagged cases. */
606   while (!deque_is_empty (&ds->lag))
607     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
608   free (ds->lag_cases);
609
610   /* Dictionary from before TEMPORARY becomes permanent. */
611   proc_cancel_temporary_transformations (ds);
612
613   if (!ds->discard_output)
614     {
615       /* Finish compacting. */
616       if (ds->compactor != NULL)
617         {
618           case_map_destroy (ds->compactor);
619           ds->compactor = NULL;
620
621           dict_delete_scratch_vars (ds->dict);
622           dict_compact_values (ds->dict);
623         }
624
625       /* Old data sink becomes new data source. */
626       if (ds->sink != NULL)
627         ds->source = casewriter_make_reader (ds->sink);
628     }
629   else
630     {
631       ds->source = NULL;
632       ds->discard_output = false;
633     }
634   ds->sink = NULL;
635
636   caseinit_clear (ds->caseinit);
637   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
638
639   dict_clear_vectors (ds->dict);
640   ds->permanent_dict = NULL;
641   return proc_cancel_all_transformations (ds) && ds->ok;
642 }
643
644 /* Casereader class for procedure execution. */
645 static const struct casereader_class proc_casereader_class =
646   {
647     proc_casereader_read,
648     proc_casereader_destroy,
649     NULL,
650     NULL,
651   };
652
653 /* Updates last_proc_invocation. */
654 static void
655 update_last_proc_invocation (struct dataset *ds)
656 {
657   ds->last_proc_invocation = time (NULL);
658 }
659 \f
660 /* Returns a pointer to the lagged case from N_BEFORE cases before the
661    current one, or NULL if there haven't been that many cases yet. */
662 const struct ccase *
663 lagged_case (const struct dataset *ds, int n_before)
664 {
665   assert (n_before >= 1);
666   assert (n_before <= ds->n_lag);
667
668   if (n_before <= deque_count (&ds->lag))
669     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
670   else
671     return NULL;
672 }
673 \f
674 /* Adds TRNS to the current set of transformations. */
675 void
676 add_transformation (struct dataset *ds,
677                     const struct trns_class *class, void *aux)
678 {
679   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
680                               : ds->temporary ? &ds->temporary_trns_chain
681                               : &ds->permanent_trns_chain);
682   struct transformation t = { .class = class, .aux = aux };
683   trns_chain_append (chain, &t);
684   dataset_transformations_changed__ (ds, true);
685 }
686
687 /* Returns true if the next call to add_transformation() will add
688    a temporary transformation, false if it will add a permanent
689    transformation. */
690 bool
691 proc_in_temporary_transformations (const struct dataset *ds)
692 {
693   return ds->temporary;
694 }
695
696 /* Marks the start of temporary transformations.
697    Further calls to add_transformation() will add temporary
698    transformations. */
699 void
700 proc_start_temporary_transformations (struct dataset *ds)
701 {
702   if (!proc_in_temporary_transformations (ds))
703     {
704       add_case_limit_trns (ds);
705
706       ds->permanent_dict = dict_clone (ds->dict);
707
708       ds->temporary = true;
709       dataset_transformations_changed__ (ds, true);
710     }
711 }
712
713 /* Converts all the temporary transformations, if any, to permanent
714    transformations.  Further transformations will be permanent.
715
716    The FILTER command is implemented as a temporary transformation, so a
717    procedure that uses this function should usually use proc_open_filtering()
718    with FILTER false, instead of plain proc_open().
719
720    Returns true if anything changed, false otherwise. */
721 bool
722 proc_make_temporary_transformations_permanent (struct dataset *ds)
723 {
724   if (proc_in_temporary_transformations (ds))
725     {
726       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
727
728       ds->temporary = false;
729
730       dict_unref (ds->permanent_dict);
731       ds->permanent_dict = NULL;
732
733       return true;
734     }
735   else
736     return false;
737 }
738
739 /* Cancels all temporary transformations, if any.  Further
740    transformations will be permanent.
741    Returns true if anything changed, false otherwise. */
742 bool
743 proc_cancel_temporary_transformations (struct dataset *ds)
744 {
745   if (proc_in_temporary_transformations (ds))
746     {
747       dict_unref (ds->dict);
748       ds->dict = ds->permanent_dict;
749       ds->permanent_dict = NULL;
750
751       trns_chain_clear (&ds->temporary_trns_chain);
752
753       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
754       return true;
755     }
756   else
757     return false;
758 }
759
760 /* Cancels all transformations, if any.
761    Returns true if successful, false on I/O error. */
762 bool
763 proc_cancel_all_transformations (struct dataset *ds)
764 {
765   bool ok;
766   assert (ds->proc_state == PROC_COMMITTED);
767   ok = trns_chain_clear (&ds->permanent_trns_chain);
768   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
769   ds->temporary = false;
770   for (size_t i = 0; i < ds->n_stack; i++)
771     ok = trns_chain_uninit (&ds->stack[i]) && ok;
772   ds->n_stack = 0;
773   dataset_transformations_changed__ (ds, false);
774
775   return ok;
776 }
777
778 void
779 proc_push_transformations (struct dataset *ds)
780 {
781   if (ds->n_stack >= ds->allocated_stack)
782     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
783                             sizeof *ds->stack);
784   trns_chain_init (&ds->stack[ds->n_stack++]);
785 }
786
787 void
788 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
789 {
790   assert (ds->n_stack > 0);
791   *chain = ds->stack[--ds->n_stack];
792 }
793
794 static enum trns_result
795 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
796 {
797   struct variable *var = var_;
798
799   *cc = case_unshare (*cc);
800   *case_num_rw (*cc, var) = case_num;
801
802   return TRNS_CONTINUE;
803 }
804
805 /* Add a variable which we can sort by to get back the original order. */
806 struct variable *
807 add_permanent_ordering_transformation (struct dataset *ds)
808 {
809   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
810   struct variable *order_var
811     = (proc_in_temporary_transformations (ds)
812        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
813        : temp_var);
814
815   static const struct trns_class trns_class = {
816     .name = "ordering",
817     .execute = store_case_num
818   };
819   const struct transformation t = { .class = &trns_class, .aux = order_var };
820   trns_chain_append (&ds->permanent_trns_chain, &t);
821
822   return temp_var;
823 }
824 \f
825 /* Causes output from the next procedure to be discarded, instead
826    of being preserved for use as input for the next procedure. */
827 void
828 proc_discard_output (struct dataset *ds)
829 {
830   ds->discard_output = true;
831 }
832
833
834 /* Checks whether DS has a corrupted active dataset.  If so,
835    discards it and returns false.  If not, returns true without
836    doing anything. */
837 bool
838 dataset_end_of_command (struct dataset *ds)
839 {
840   if (ds->source != NULL)
841     {
842       if (casereader_error (ds->source))
843         {
844           dataset_clear (ds);
845           return false;
846         }
847       else
848         {
849           const struct taint *taint = casereader_get_taint (ds->source);
850           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
851           assert (!taint_has_tainted_successor (taint));
852         }
853     }
854   return true;
855 }
856 \f
857 /* Limits the maximum number of cases processed to
858    *CASES_REMAINING. */
859 static enum trns_result
860 case_limit_trns_proc (void *cases_remaining_,
861                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
862 {
863   size_t *cases_remaining = cases_remaining_;
864   if (*cases_remaining > 0)
865     {
866       (*cases_remaining)--;
867       return TRNS_CONTINUE;
868     }
869   else
870     return TRNS_DROP_CASE;
871 }
872
873 /* Frees the data associated with a case limit transformation. */
874 static bool
875 case_limit_trns_free (void *cases_remaining_)
876 {
877   size_t *cases_remaining = cases_remaining_;
878   free (cases_remaining);
879   return true;
880 }
881
882 /* Adds a transformation that limits the number of cases that may
883    pass through, if DS->DICT has a case limit. */
884 static void
885 add_case_limit_trns (struct dataset *ds)
886 {
887   casenumber case_limit = dict_get_case_limit (ds->dict);
888   if (case_limit != 0)
889     {
890       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
891       *cases_remaining = case_limit;
892
893       static const struct trns_class trns_class = {
894         .name = "case limit",
895         .execute = case_limit_trns_proc,
896         .destroy = case_limit_trns_free,
897       };
898       add_transformation (ds, &trns_class, cases_remaining);
899
900       dict_set_case_limit (ds->dict, 0);
901     }
902 }
903
904 \f
905 /* FILTER transformation. */
906 static enum trns_result
907 filter_trns_proc (void *filter_var_,
908                   struct ccase **c, casenumber case_nr UNUSED)
909
910 {
911   struct variable *filter_var = filter_var_;
912   double f = case_num (*c, filter_var);
913   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
914           ? TRNS_CONTINUE : TRNS_DROP_CASE);
915 }
916
917 /* Adds a temporary transformation to filter data according to
918    the variable specified on FILTER, if any. */
919 static void
920 add_filter_trns (struct dataset *ds)
921 {
922   struct variable *filter_var = dict_get_filter (ds->dict);
923   if (filter_var != NULL)
924     {
925       proc_start_temporary_transformations (ds);
926
927       static const struct trns_class trns_class = {
928         .name = "FILTER",
929         .execute = filter_trns_proc,
930       };
931       add_transformation (ds, &trns_class, filter_var);
932     }
933 }
934
935 void
936 dataset_need_lag (struct dataset *ds, int n_before)
937 {
938   ds->n_lag = MAX (ds->n_lag, n_before);
939 }
940 \f
941 static void
942 dataset_changed__ (struct dataset *ds)
943 {
944   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
945     ds->callbacks->changed (ds->cb_data);
946 }
947
948 static void
949 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
950 {
951   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
952     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
953 }
954 \f
955 /* Private interface for use by session code. */
956
957 void
958 dataset_set_session__ (struct dataset *ds, struct session *session)
959 {
960   ds->session = session;
961 }