648e00f04ee1c094ea7a60bf8068b3f60452f0c0
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain temporary_trns_chain;
71   bool temporary;
72   struct dictionary *dict;
73
74   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
75   struct trns_chain *stack;
76   size_t n_stack;
77   size_t allocated_stack;
78
79   /* If true, cases are discarded instead of being written to
80      sink. */
81   bool discard_output;
82
83   /* The case map used to compact a case, if necessary;
84      otherwise a null pointer. */
85   struct case_map *compactor;
86
87   /* Time at which proc was last invoked. */
88   time_t last_proc_invocation;
89
90   /* Cases just before ("lagging") the current one. */
91   int n_lag;                    /* Number of cases to lag. */
92   struct deque lag;             /* Deque of lagged cases. */
93   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
94
95   /* Procedure data. */
96   enum
97     {
98       PROC_COMMITTED,           /* No procedure in progress. */
99       PROC_OPEN,                /* proc_open called, casereader still open. */
100       PROC_CLOSED               /* casereader from proc_open destroyed,
101                                    but proc_commit not yet called. */
102     }
103   proc_state;
104   casenumber cases_written;     /* Cases output so far. */
105   bool ok;                      /* Error status. */
106   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
107
108   const struct dataset_callbacks *callbacks;
109   void *cb_data;
110
111   /* Uniquely distinguishes datasets. */
112   unsigned int seqno;
113 };
114
115 static void dataset_changed__ (struct dataset *);
116 static void dataset_transformations_changed__ (struct dataset *,
117                                                bool non_empty);
118
119 static void add_case_limit_trns (struct dataset *ds);
120 static void add_filter_trns (struct dataset *ds);
121
122 static void update_last_proc_invocation (struct dataset *ds);
123
124 static void
125 dict_callback (struct dictionary *d UNUSED, void *ds_)
126 {
127   struct dataset *ds = ds_;
128   dataset_changed__ (ds);
129 }
130 \f
131 static void
132 dataset_create_finish__ (struct dataset *ds, struct session *session)
133 {
134   static unsigned int seqno;
135
136   dict_set_change_callback (ds->dict, dict_callback, ds);
137   proc_cancel_all_transformations (ds);
138   dataset_set_session (ds, session);
139   ds->seqno = ++seqno;
140 }
141
142 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
143    SESSION already contains a dataset named NAME, it is deleted and replaced.
144    The dataset initially has an empty dictionary and no data source. */
145 struct dataset *
146 dataset_create (struct session *session, const char *name)
147 {
148   struct dataset *ds = XMALLOC (struct dataset);
149   *ds = (struct dataset) {
150     .name = xstrdup (name),
151     .display = DATASET_FRONT,
152     .dict = dict_create (get_default_encoding ()),
153     .caseinit = caseinit_create (),
154   };
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (!old->permanent_trns_chain.n);
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (!old->temporary);
179   assert (!old->temporary_trns_chain.n);
180
181   new = xzalloc (sizeof *new);
182   new->name = xstrdup (name);
183   new->display = DATASET_FRONT;
184   new->source = casereader_clone (old->source);
185   new->dict = dict_clone (old->dict);
186   new->caseinit = caseinit_clone (old->caseinit);
187   new->last_proc_invocation = old->last_proc_invocation;
188   new->ok = old->ok;
189
190   dataset_create_finish__ (new, old->session);
191
192   return new;
193 }
194
195 /* Destroys DS. */
196 void
197 dataset_destroy (struct dataset *ds)
198 {
199   if (ds != NULL)
200     {
201       dataset_set_session (ds, NULL);
202       dataset_clear (ds);
203       dict_unref (ds->dict);
204       dict_unref (ds->permanent_dict);
205       caseinit_destroy (ds->caseinit);
206       trns_chain_uninit (&ds->permanent_trns_chain);
207       for (size_t i = 0; i < ds->n_stack; i++)
208         trns_chain_uninit (&ds->stack[i]);
209       free (ds->stack);
210       dataset_transformations_changed__ (ds, false);
211       free (ds->name);
212       free (ds);
213     }
214 }
215
216 /* Discards the active dataset's dictionary, data, and transformations. */
217 void
218 dataset_clear (struct dataset *ds)
219 {
220   assert (ds->proc_state == PROC_COMMITTED);
221
222   dict_clear (ds->dict);
223   fh_set_default_handle (NULL);
224
225   ds->n_lag = 0;
226
227   casereader_destroy (ds->source);
228   ds->source = NULL;
229
230   proc_cancel_all_transformations (ds);
231 }
232
233 const char *
234 dataset_name (const struct dataset *ds)
235 {
236   return ds->name;
237 }
238
239 void
240 dataset_set_name (struct dataset *ds, const char *name)
241 {
242   struct session *session = ds->session;
243   bool active = false;
244
245   if (session != NULL)
246     {
247       active = session_active_dataset (session) == ds;
248       if (active)
249         session_set_active_dataset (session, NULL);
250       dataset_set_session (ds, NULL);
251     }
252
253   free (ds->name);
254   ds->name = xstrdup (name);
255
256   if (session != NULL)
257     {
258       dataset_set_session (ds, session);
259       if (active)
260         session_set_active_dataset (session, ds);
261     }
262 }
263
264 struct session *
265 dataset_session (const struct dataset *ds)
266 {
267   return ds->session;
268 }
269
270 void
271 dataset_set_session (struct dataset *ds, struct session *session)
272 {
273   if (session != ds->session)
274     {
275       if (ds->session != NULL)
276         session_remove_dataset (ds->session, ds);
277       if (session != NULL)
278         session_add_dataset (session, ds);
279     }
280 }
281
282 /* Returns the dictionary within DS.  This is always nonnull, although it
283    might not contain any variables. */
284 struct dictionary *
285 dataset_dict (const struct dataset *ds)
286 {
287   return ds->dict;
288 }
289
290 /* Replaces DS's dictionary by DICT, discarding any source and
291    transformations. */
292 void
293 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
294 {
295   assert (ds->proc_state == PROC_COMMITTED);
296   assert (ds->dict != dict);
297
298   dataset_clear (ds);
299
300   dict_unref (ds->dict);
301   ds->dict = dict;
302   dict_set_change_callback (ds->dict, dict_callback, ds);
303 }
304
305 /* Returns the casereader that will be read when a procedure is executed on
306    DS.  This can be NULL if none has been set up yet. */
307 const struct casereader *
308 dataset_source (const struct dataset *ds)
309 {
310   return ds->source;
311 }
312
313 /* Returns true if DS has a data source, false otherwise. */
314 bool
315 dataset_has_source (const struct dataset *ds)
316 {
317   return dataset_source (ds) != NULL;
318 }
319
320 /* Replaces the active dataset's data by READER.  READER's cases must have an
321    appropriate format for DS's dictionary. */
322 bool
323 dataset_set_source (struct dataset *ds, struct casereader *reader)
324 {
325   casereader_destroy (ds->source);
326   ds->source = reader;
327
328   caseinit_clear (ds->caseinit);
329   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
330
331   return reader == NULL || !casereader_error (reader);
332 }
333
334 /* Returns the data source from DS and removes it from DS.  Returns a null
335    pointer if DS has no data source. */
336 struct casereader *
337 dataset_steal_source (struct dataset *ds)
338 {
339   struct casereader *reader = ds->source;
340   ds->source = NULL;
341
342   return reader;
343 }
344
345 /* Returns a number unique to DS.  It can be used to distinguish one dataset
346    from any other within a given program run, even datasets that do not exist
347    at the same time. */
348 unsigned int
349 dataset_seqno (const struct dataset *ds)
350 {
351   return ds->seqno;
352 }
353
354 void
355 dataset_set_callbacks (struct dataset *ds,
356                        const struct dataset_callbacks *callbacks,
357                        void *cb_data)
358 {
359   ds->callbacks = callbacks;
360   ds->cb_data = cb_data;
361 }
362
363 enum dataset_display
364 dataset_get_display (const struct dataset *ds)
365 {
366   return ds->display;
367 }
368
369 void
370 dataset_set_display (struct dataset *ds, enum dataset_display display)
371 {
372   ds->display = display;
373 }
374 \f
375 /* Returns the last time the data was read. */
376 time_t
377 time_of_last_procedure (struct dataset *ds)
378 {
379   if (ds->last_proc_invocation == 0)
380     update_last_proc_invocation (ds);
381   return ds->last_proc_invocation;
382 }
383 \f
384 /* Regular procedure. */
385
386 /* Executes any pending transformations, if necessary.
387    This is not identical to the EXECUTE command in that it won't
388    always read the source data.  This can be important when the
389    source data is given inline within BEGIN DATA...END FILE. */
390 bool
391 proc_execute (struct dataset *ds)
392 {
393   bool ok;
394
395   if ((!ds->temporary || !ds->temporary_trns_chain.n)
396       && !ds->permanent_trns_chain.n)
397     {
398       ds->n_lag = 0;
399       ds->discard_output = false;
400       dict_set_case_limit (ds->dict, 0);
401       dict_clear_vectors (ds->dict);
402       return true;
403     }
404
405   ok = casereader_destroy (proc_open (ds));
406   return proc_commit (ds) && ok;
407 }
408
409 static const struct casereader_class proc_casereader_class;
410
411 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
412    cases filtered out with FILTER BY will not be included in the casereader
413    (which is usually desirable).  If FILTER is false, all cases will be
414    included regardless of FILTER BY settings.
415
416    proc_commit must be called when done. */
417 struct casereader *
418 proc_open_filtering (struct dataset *ds, bool filter)
419 {
420   struct casereader *reader;
421
422   assert (ds->source != NULL);
423   assert (ds->proc_state == PROC_COMMITTED);
424
425   update_last_proc_invocation (ds);
426
427   caseinit_mark_for_init (ds->caseinit, ds->dict);
428
429   /* Finish up the collection of transformations. */
430   add_case_limit_trns (ds);
431   if (filter)
432     add_filter_trns (ds);
433
434   /* Make permanent_dict refer to the dictionary right before
435      data reaches the sink. */
436   if (ds->permanent_dict == NULL)
437     ds->permanent_dict = ds->dict;
438
439   /* Prepare sink. */
440   if (!ds->discard_output)
441     {
442       struct dictionary *pd = ds->permanent_dict;
443       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
444       if (compacted_n_values < dict_get_next_value_idx (pd))
445         {
446           struct caseproto *compacted_proto;
447           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
448           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
449           ds->sink = autopaging_writer_create (compacted_proto);
450           caseproto_unref (compacted_proto);
451         }
452       else
453         {
454           ds->compactor = NULL;
455           ds->sink = autopaging_writer_create (dict_get_proto (pd));
456         }
457     }
458   else
459     {
460       ds->compactor = NULL;
461       ds->sink = NULL;
462     }
463
464   /* Allocate memory for lagged cases. */
465   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
466
467   ds->proc_state = PROC_OPEN;
468   ds->cases_written = 0;
469   ds->ok = true;
470
471   /* FIXME: use taint in dataset in place of `ok'? */
472   /* FIXME: for trivial cases we can just return a clone of
473      ds->source? */
474
475   /* Create casereader and insert a shim on top.  The shim allows us to
476      arbitrarily extend the casereader's lifetime, by slurping the cases into
477      the shim's buffer in proc_commit().  That is especially useful when output
478      table_items are generated directly from the procedure casereader (e.g. by
479      the LIST procedure) when we are using an output driver that keeps a
480      reference to the output items passed to it (e.g. the GUI output driver in
481      PSPPIRE). */
482   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
483                                          CASENUMBER_MAX,
484                                          &proc_casereader_class, ds);
485   ds->shim = casereader_shim_insert (reader);
486   return reader;
487 }
488
489 /* Opens dataset DS for reading cases with proc_read.
490    proc_commit must be called when done. */
491 struct casereader *
492 proc_open (struct dataset *ds)
493 {
494   return proc_open_filtering (ds, true);
495 }
496
497 /* Returns true if a procedure is in progress, that is, if
498    proc_open has been called but proc_commit has not. */
499 bool
500 proc_is_open (const struct dataset *ds)
501 {
502   return ds->proc_state != PROC_COMMITTED;
503 }
504
505 /* "read" function for procedure casereader. */
506 static struct ccase *
507 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
508 {
509   struct dataset *ds = ds_;
510   enum trns_result retval = TRNS_DROP_CASE;
511   struct ccase *c;
512
513   assert (ds->proc_state == PROC_OPEN);
514   for (; ; case_unref (c))
515     {
516       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
517       if (retval == TRNS_ERROR)
518         ds->ok = false;
519       if (!ds->ok)
520         return NULL;
521
522       /* Read a case from source. */
523       c = casereader_read (ds->source);
524       if (c == NULL)
525         return NULL;
526       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
527       caseinit_init_vars (ds->caseinit, c);
528
529       /* Execute permanent transformations.  */
530       casenumber case_nr = ds->cases_written + 1;
531       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
532       caseinit_update_left_vars (ds->caseinit, c);
533       if (retval != TRNS_CONTINUE)
534         continue;
535
536       /* Write case to collection of lagged cases. */
537       if (ds->n_lag > 0)
538         {
539           while (deque_count (&ds->lag) >= ds->n_lag)
540             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542         }
543
544       /* Write case to replacement dataset. */
545       ds->cases_written++;
546       if (ds->sink != NULL)
547         casewriter_write (ds->sink,
548                           case_map_execute (ds->compactor, case_ref (c)));
549
550       /* Execute temporary transformations. */
551       if (ds->temporary_trns_chain.n)
552         {
553           retval = trns_chain_execute (&ds->temporary_trns_chain,
554                                        ds->cases_written, &c);
555           if (retval != TRNS_CONTINUE)
556             continue;
557         }
558
559       return c;
560     }
561 }
562
563 /* "destroy" function for procedure casereader. */
564 static void
565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567   struct dataset *ds = ds_;
568   struct ccase *c;
569
570   /* We are always the subreader for a casereader_buffer, so if we're being
571      destroyed then it's because the casereader_buffer has read all the cases
572      that it ever will. */
573   ds->shim = NULL;
574
575   /* Make sure transformations happen for every input case, in
576      case they have side effects, and ensure that the replacement
577      active dataset gets all the cases it should. */
578   while ((c = casereader_read (reader)) != NULL)
579     case_unref (c);
580
581   ds->proc_state = PROC_CLOSED;
582   ds->ok = casereader_destroy (ds->source) && ds->ok;
583   ds->source = NULL;
584   dataset_set_source (ds, NULL);
585 }
586
587 /* Must return false if the source casereader, a transformation,
588    or the sink casewriter signaled an error.  (If a temporary
589    transformation signals an error, then the return value is
590    false, but the replacement active dataset may still be
591    untainted.) */
592 bool
593 proc_commit (struct dataset *ds)
594 {
595   if (ds->shim != NULL)
596     casereader_shim_slurp (ds->shim);
597
598   assert (ds->proc_state == PROC_CLOSED);
599   ds->proc_state = PROC_COMMITTED;
600
601   dataset_changed__ (ds);
602
603   /* Free memory for lagged cases. */
604   while (!deque_is_empty (&ds->lag))
605     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606   free (ds->lag_cases);
607
608   /* Dictionary from before TEMPORARY becomes permanent. */
609   proc_cancel_temporary_transformations (ds);
610
611   if (!ds->discard_output)
612     {
613       /* Finish compacting. */
614       if (ds->compactor != NULL)
615         {
616           case_map_destroy (ds->compactor);
617           ds->compactor = NULL;
618
619           dict_delete_scratch_vars (ds->dict);
620           dict_compact_values (ds->dict);
621         }
622
623       /* Old data sink becomes new data source. */
624       if (ds->sink != NULL)
625         ds->source = casewriter_make_reader (ds->sink);
626     }
627   else
628     {
629       ds->source = NULL;
630       ds->discard_output = false;
631     }
632   ds->sink = NULL;
633
634   caseinit_clear (ds->caseinit);
635   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636
637   dict_clear_vectors (ds->dict);
638   ds->permanent_dict = NULL;
639   return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644   {
645     proc_casereader_read,
646     proc_casereader_destroy,
647     NULL,
648     NULL,
649   };
650
651 /* Updates last_proc_invocation. */
652 static void
653 update_last_proc_invocation (struct dataset *ds)
654 {
655   ds->last_proc_invocation = time (NULL);
656 }
657 \f
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659    current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
661 lagged_case (const struct dataset *ds, int n_before)
662 {
663   assert (n_before >= 1);
664   assert (n_before <= ds->n_lag);
665
666   if (n_before <= deque_count (&ds->lag))
667     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668   else
669     return NULL;
670 }
671 \f
672 /* Adds TRNS to the current set of transformations. */
673 void
674 add_transformation (struct dataset *ds,
675                     const struct trns_class *class, void *aux)
676 {
677   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
678                               : ds->temporary ? &ds->temporary_trns_chain
679                               : &ds->permanent_trns_chain);
680   struct transformation t = { .class = class, .aux = aux };
681   trns_chain_append (chain, &t);
682   dataset_transformations_changed__ (ds, true);
683 }
684
685 /* Returns true if the next call to add_transformation() will add
686    a temporary transformation, false if it will add a permanent
687    transformation. */
688 bool
689 proc_in_temporary_transformations (const struct dataset *ds)
690 {
691   return ds->temporary;
692 }
693
694 /* Marks the start of temporary transformations.
695    Further calls to add_transformation() will add temporary
696    transformations. */
697 void
698 proc_start_temporary_transformations (struct dataset *ds)
699 {
700   if (!proc_in_temporary_transformations (ds))
701     {
702       add_case_limit_trns (ds);
703
704       ds->permanent_dict = dict_clone (ds->dict);
705
706       ds->temporary = true;
707       dataset_transformations_changed__ (ds, true);
708     }
709 }
710
711 /* Converts all the temporary transformations, if any, to permanent
712    transformations.  Further transformations will be permanent.
713
714    The FILTER command is implemented as a temporary transformation, so a
715    procedure that uses this function should usually use proc_open_filtering()
716    with FILTER false, instead of plain proc_open().
717
718    Returns true if anything changed, false otherwise. */
719 bool
720 proc_make_temporary_transformations_permanent (struct dataset *ds)
721 {
722   if (proc_in_temporary_transformations (ds))
723     {
724       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
725
726       ds->temporary = false;
727
728       dict_unref (ds->permanent_dict);
729       ds->permanent_dict = NULL;
730
731       return true;
732     }
733   else
734     return false;
735 }
736
737 /* Cancels all temporary transformations, if any.  Further
738    transformations will be permanent.
739    Returns true if anything changed, false otherwise. */
740 bool
741 proc_cancel_temporary_transformations (struct dataset *ds)
742 {
743   if (proc_in_temporary_transformations (ds))
744     {
745       dict_unref (ds->dict);
746       ds->dict = ds->permanent_dict;
747       ds->permanent_dict = NULL;
748
749       trns_chain_clear (&ds->temporary_trns_chain);
750
751       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
752       return true;
753     }
754   else
755     return false;
756 }
757
758 /* Cancels all transformations, if any.
759    Returns true if successful, false on I/O error. */
760 bool
761 proc_cancel_all_transformations (struct dataset *ds)
762 {
763   bool ok;
764   assert (ds->proc_state == PROC_COMMITTED);
765   ok = trns_chain_clear (&ds->permanent_trns_chain);
766   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
767   ds->temporary = false;
768   for (size_t i = 0; i < ds->n_stack; i++)
769     ok = trns_chain_uninit (&ds->stack[i]) && ok;
770   ds->n_stack = 0;
771   dataset_transformations_changed__ (ds, false);
772
773   return ok;
774 }
775
776 void
777 proc_push_transformations (struct dataset *ds)
778 {
779   if (ds->n_stack >= ds->allocated_stack)
780     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
781                             sizeof *ds->stack);
782   trns_chain_init (&ds->stack[ds->n_stack++]);
783 }
784
785 void
786 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
787 {
788   assert (ds->n_stack > 0);
789   *chain = ds->stack[--ds->n_stack];
790 }
791
792 static enum trns_result
793 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
794 {
795   struct variable *var = var_;
796
797   *cc = case_unshare (*cc);
798   *case_num_rw (*cc, var) = case_num;
799
800   return TRNS_CONTINUE;
801 }
802
803 /* Add a variable which we can sort by to get back the original order. */
804 struct variable *
805 add_permanent_ordering_transformation (struct dataset *ds)
806 {
807   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
808   struct variable *order_var
809     = (proc_in_temporary_transformations (ds)
810        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
811        : temp_var);
812
813   static const struct trns_class trns_class = {
814     .name = "ordering",
815     .execute = store_case_num
816   };
817   const struct transformation t = { .class = &trns_class, .aux = order_var };
818   trns_chain_append (&ds->permanent_trns_chain, &t);
819
820   return temp_var;
821 }
822 \f
823 /* Causes output from the next procedure to be discarded, instead
824    of being preserved for use as input for the next procedure. */
825 void
826 proc_discard_output (struct dataset *ds)
827 {
828   ds->discard_output = true;
829 }
830
831
832 /* Checks whether DS has a corrupted active dataset.  If so,
833    discards it and returns false.  If not, returns true without
834    doing anything. */
835 bool
836 dataset_end_of_command (struct dataset *ds)
837 {
838   if (ds->source != NULL)
839     {
840       if (casereader_error (ds->source))
841         {
842           dataset_clear (ds);
843           return false;
844         }
845       else
846         {
847           const struct taint *taint = casereader_get_taint (ds->source);
848           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
849           assert (!taint_has_tainted_successor (taint));
850         }
851     }
852   return true;
853 }
854 \f
855 /* Limits the maximum number of cases processed to
856    *CASES_REMAINING. */
857 static enum trns_result
858 case_limit_trns_proc (void *cases_remaining_,
859                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
860 {
861   size_t *cases_remaining = cases_remaining_;
862   if (*cases_remaining > 0)
863     {
864       (*cases_remaining)--;
865       return TRNS_CONTINUE;
866     }
867   else
868     return TRNS_DROP_CASE;
869 }
870
871 /* Frees the data associated with a case limit transformation. */
872 static bool
873 case_limit_trns_free (void *cases_remaining_)
874 {
875   size_t *cases_remaining = cases_remaining_;
876   free (cases_remaining);
877   return true;
878 }
879
880 /* Adds a transformation that limits the number of cases that may
881    pass through, if DS->DICT has a case limit. */
882 static void
883 add_case_limit_trns (struct dataset *ds)
884 {
885   casenumber case_limit = dict_get_case_limit (ds->dict);
886   if (case_limit != 0)
887     {
888       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
889       *cases_remaining = case_limit;
890
891       static const struct trns_class trns_class = {
892         .name = "case limit",
893         .execute = case_limit_trns_proc,
894         .destroy = case_limit_trns_free,
895       };
896       add_transformation (ds, &trns_class, cases_remaining);
897
898       dict_set_case_limit (ds->dict, 0);
899     }
900 }
901
902 \f
903 /* FILTER transformation. */
904 static enum trns_result
905 filter_trns_proc (void *filter_var_,
906                   struct ccase **c, casenumber case_nr UNUSED)
907
908 {
909   struct variable *filter_var = filter_var_;
910   double f = case_num (*c, filter_var);
911   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
912           ? TRNS_CONTINUE : TRNS_DROP_CASE);
913 }
914
915 /* Adds a temporary transformation to filter data according to
916    the variable specified on FILTER, if any. */
917 static void
918 add_filter_trns (struct dataset *ds)
919 {
920   struct variable *filter_var = dict_get_filter (ds->dict);
921   if (filter_var != NULL)
922     {
923       proc_start_temporary_transformations (ds);
924
925       static const struct trns_class trns_class = {
926         .name = "FILTER",
927         .execute = filter_trns_proc,
928       };
929       add_transformation (ds, &trns_class, filter_var);
930     }
931 }
932
933 void
934 dataset_need_lag (struct dataset *ds, int n_before)
935 {
936   ds->n_lag = MAX (ds->n_lag, n_before);
937 }
938 \f
939 static void
940 dataset_changed__ (struct dataset *ds)
941 {
942   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
943     ds->callbacks->changed (ds->cb_data);
944 }
945
946 static void
947 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
948 {
949   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
950     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
951 }
952 \f
953 /* Private interface for use by session code. */
954
955 void
956 dataset_set_session__ (struct dataset *ds, struct session *session)
957 {
958   ds->session = session;
959 }