treewide: Replace <name>_cnt by n_<name>s and <name>_cap by allocated_<name>.
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds = XZALLOC (struct dataset);
147   ds->name = xstrdup (name);
148   ds->display = DATASET_FRONT;
149   ds->dict = dict_create (get_default_encoding ());
150
151   ds->caseinit = caseinit_create ();
152
153   dataset_create_finish__ (ds, session);
154
155   return ds;
156 }
157
158 /* Creates and returns a new dataset that has the same data and dictionary as
159    OLD named NAME, adds it to the same session as OLD, and returns the new
160    dataset.  If SESSION already contains a dataset named NAME, it is deleted
161    and replaced.
162
163    OLD must not have any active transformations or temporary state and must
164    not be in the middle of a procedure.
165
166    Callbacks are not cloned. */
167 struct dataset *
168 dataset_clone (struct dataset *old, const char *name)
169 {
170   struct dataset *new;
171
172   assert (old->proc_state == PROC_COMMITTED);
173   assert (trns_chain_is_empty (old->permanent_trns_chain));
174   assert (old->permanent_dict == NULL);
175   assert (old->sink == NULL);
176   assert (old->temporary_trns_chain == NULL);
177
178   new = xzalloc (sizeof *new);
179   new->name = xstrdup (name);
180   new->display = DATASET_FRONT;
181   new->source = casereader_clone (old->source);
182   new->dict = dict_clone (old->dict);
183   new->caseinit = caseinit_clone (old->caseinit);
184   new->last_proc_invocation = old->last_proc_invocation;
185   new->ok = old->ok;
186
187   dataset_create_finish__ (new, old->session);
188
189   return new;
190 }
191
192 /* Destroys DS. */
193 void
194 dataset_destroy (struct dataset *ds)
195 {
196   if (ds != NULL)
197     {
198       dataset_set_session (ds, NULL);
199       dataset_clear (ds);
200       dict_unref (ds->dict);
201       dict_unref (ds->permanent_dict);
202       caseinit_destroy (ds->caseinit);
203       trns_chain_destroy (ds->permanent_trns_chain);
204       dataset_transformations_changed__ (ds, false);
205       free (ds->name);
206       free (ds);
207     }
208 }
209
210 /* Discards the active dataset's dictionary, data, and transformations. */
211 void
212 dataset_clear (struct dataset *ds)
213 {
214   assert (ds->proc_state == PROC_COMMITTED);
215
216   dict_clear (ds->dict);
217   fh_set_default_handle (NULL);
218
219   ds->n_lag = 0;
220
221   casereader_destroy (ds->source);
222   ds->source = NULL;
223
224   proc_cancel_all_transformations (ds);
225 }
226
227 const char *
228 dataset_name (const struct dataset *ds)
229 {
230   return ds->name;
231 }
232
233 void
234 dataset_set_name (struct dataset *ds, const char *name)
235 {
236   struct session *session = ds->session;
237   bool active = false;
238
239   if (session != NULL)
240     {
241       active = session_active_dataset (session) == ds;
242       if (active)
243         session_set_active_dataset (session, NULL);
244       dataset_set_session (ds, NULL);
245     }
246
247   free (ds->name);
248   ds->name = xstrdup (name);
249
250   if (session != NULL)
251     {
252       dataset_set_session (ds, session);
253       if (active)
254         session_set_active_dataset (session, ds);
255     }
256 }
257
258 struct session *
259 dataset_session (const struct dataset *ds)
260 {
261   return ds->session;
262 }
263
264 void
265 dataset_set_session (struct dataset *ds, struct session *session)
266 {
267   if (session != ds->session)
268     {
269       if (ds->session != NULL)
270         session_remove_dataset (ds->session, ds);
271       if (session != NULL)
272         session_add_dataset (session, ds);
273     }
274 }
275
276 /* Returns the dictionary within DS.  This is always nonnull, although it
277    might not contain any variables. */
278 struct dictionary *
279 dataset_dict (const struct dataset *ds)
280 {
281   return ds->dict;
282 }
283
284 /* Replaces DS's dictionary by DICT, discarding any source and
285    transformations. */
286 void
287 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
288 {
289   assert (ds->proc_state == PROC_COMMITTED);
290   assert (ds->dict != dict);
291
292   dataset_clear (ds);
293
294   dict_unref (ds->dict);
295   ds->dict = dict;
296   dict_set_change_callback (ds->dict, dict_callback, ds);
297 }
298
299 /* Returns the casereader that will be read when a procedure is executed on
300    DS.  This can be NULL if none has been set up yet. */
301 const struct casereader *
302 dataset_source (const struct dataset *ds)
303 {
304   return ds->source;
305 }
306
307 /* Returns true if DS has a data source, false otherwise. */
308 bool
309 dataset_has_source (const struct dataset *ds)
310 {
311   return dataset_source (ds) != NULL;
312 }
313
314 /* Replaces the active dataset's data by READER.  READER's cases must have an
315    appropriate format for DS's dictionary. */
316 bool
317 dataset_set_source (struct dataset *ds, struct casereader *reader)
318 {
319   casereader_destroy (ds->source);
320   ds->source = reader;
321
322   caseinit_clear (ds->caseinit);
323   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
324
325   return reader == NULL || !casereader_error (reader);
326 }
327
328 /* Returns the data source from DS and removes it from DS.  Returns a null
329    pointer if DS has no data source. */
330 struct casereader *
331 dataset_steal_source (struct dataset *ds)
332 {
333   struct casereader *reader = ds->source;
334   ds->source = NULL;
335
336   return reader;
337 }
338
339 /* Returns a number unique to DS.  It can be used to distinguish one dataset
340    from any other within a given program run, even datasets that do not exist
341    at the same time. */
342 unsigned int
343 dataset_seqno (const struct dataset *ds)
344 {
345   return ds->seqno;
346 }
347
348 void
349 dataset_set_callbacks (struct dataset *ds,
350                        const struct dataset_callbacks *callbacks,
351                        void *cb_data)
352 {
353   ds->callbacks = callbacks;
354   ds->cb_data = cb_data;
355 }
356
357 enum dataset_display
358 dataset_get_display (const struct dataset *ds)
359 {
360   return ds->display;
361 }
362
363 void
364 dataset_set_display (struct dataset *ds, enum dataset_display display)
365 {
366   ds->display = display;
367 }
368 \f
369 /* Returns the last time the data was read. */
370 time_t
371 time_of_last_procedure (struct dataset *ds)
372 {
373   if (ds->last_proc_invocation == 0)
374     update_last_proc_invocation (ds);
375   return ds->last_proc_invocation;
376 }
377 \f
378 /* Regular procedure. */
379
380 /* Executes any pending transformations, if necessary.
381    This is not identical to the EXECUTE command in that it won't
382    always read the source data.  This can be important when the
383    source data is given inline within BEGIN DATA...END FILE. */
384 bool
385 proc_execute (struct dataset *ds)
386 {
387   bool ok;
388
389   if ((ds->temporary_trns_chain == NULL
390        || trns_chain_is_empty (ds->temporary_trns_chain))
391       && trns_chain_is_empty (ds->permanent_trns_chain))
392     {
393       ds->n_lag = 0;
394       ds->discard_output = false;
395       dict_set_case_limit (ds->dict, 0);
396       dict_clear_vectors (ds->dict);
397       return true;
398     }
399
400   ok = casereader_destroy (proc_open (ds));
401   return proc_commit (ds) && ok;
402 }
403
404 static const struct casereader_class proc_casereader_class;
405
406 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
407    cases filtered out with FILTER BY will not be included in the casereader
408    (which is usually desirable).  If FILTER is false, all cases will be
409    included regardless of FILTER BY settings.
410
411    proc_commit must be called when done. */
412 struct casereader *
413 proc_open_filtering (struct dataset *ds, bool filter)
414 {
415   struct casereader *reader;
416
417   assert (ds->source != NULL);
418   assert (ds->proc_state == PROC_COMMITTED);
419
420   update_last_proc_invocation (ds);
421
422   caseinit_mark_for_init (ds->caseinit, ds->dict);
423
424   /* Finish up the collection of transformations. */
425   add_case_limit_trns (ds);
426   if (filter)
427     add_filter_trns (ds);
428   trns_chain_finalize (ds->cur_trns_chain);
429
430   /* Make permanent_dict refer to the dictionary right before
431      data reaches the sink. */
432   if (ds->permanent_dict == NULL)
433     ds->permanent_dict = ds->dict;
434
435   /* Prepare sink. */
436   if (!ds->discard_output)
437     {
438       struct dictionary *pd = ds->permanent_dict;
439       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
440       if (compacted_n_values < dict_get_next_value_idx (pd))
441         {
442           struct caseproto *compacted_proto;
443           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
444           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
445           ds->sink = autopaging_writer_create (compacted_proto);
446           caseproto_unref (compacted_proto);
447         }
448       else
449         {
450           ds->compactor = NULL;
451           ds->sink = autopaging_writer_create (dict_get_proto (pd));
452         }
453     }
454   else
455     {
456       ds->compactor = NULL;
457       ds->sink = NULL;
458     }
459
460   /* Allocate memory for lagged cases. */
461   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
462
463   ds->proc_state = PROC_OPEN;
464   ds->cases_written = 0;
465   ds->ok = true;
466
467   /* FIXME: use taint in dataset in place of `ok'? */
468   /* FIXME: for trivial cases we can just return a clone of
469      ds->source? */
470
471   /* Create casereader and insert a shim on top.  The shim allows us to
472      arbitrarily extend the casereader's lifetime, by slurping the cases into
473      the shim's buffer in proc_commit().  That is especially useful when output
474      table_items are generated directly from the procedure casereader (e.g. by
475      the LIST procedure) when we are using an output driver that keeps a
476      reference to the output items passed to it (e.g. the GUI output driver in
477      PSPPIRE). */
478   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
479                                          CASENUMBER_MAX,
480                                          &proc_casereader_class, ds);
481   ds->shim = casereader_shim_insert (reader);
482   return reader;
483 }
484
485 /* Opens dataset DS for reading cases with proc_read.
486    proc_commit must be called when done. */
487 struct casereader *
488 proc_open (struct dataset *ds)
489 {
490   return proc_open_filtering (ds, true);
491 }
492
493 /* Returns true if a procedure is in progress, that is, if
494    proc_open has been called but proc_commit has not. */
495 bool
496 proc_is_open (const struct dataset *ds)
497 {
498   return ds->proc_state != PROC_COMMITTED;
499 }
500
501 /* "read" function for procedure casereader. */
502 static struct ccase *
503 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
504 {
505   struct dataset *ds = ds_;
506   enum trns_result retval = TRNS_DROP_CASE;
507   struct ccase *c;
508
509   assert (ds->proc_state == PROC_OPEN);
510   for (; ; case_unref (c))
511     {
512       casenumber case_nr;
513
514       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
515       if (retval == TRNS_ERROR)
516         ds->ok = false;
517       if (!ds->ok)
518         return NULL;
519
520       /* Read a case from source. */
521       c = casereader_read (ds->source);
522       if (c == NULL)
523         return NULL;
524       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
525       caseinit_init_vars (ds->caseinit, c);
526
527       /* Execute permanent transformations.  */
528       case_nr = ds->cases_written + 1;
529       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
530                                    &c, case_nr);
531       caseinit_update_left_vars (ds->caseinit, c);
532       if (retval != TRNS_CONTINUE)
533         continue;
534
535       /* Write case to collection of lagged cases. */
536       if (ds->n_lag > 0)
537         {
538           while (deque_count (&ds->lag) >= ds->n_lag)
539             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
540           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
541         }
542
543       /* Write case to replacement dataset. */
544       ds->cases_written++;
545       if (ds->sink != NULL)
546         casewriter_write (ds->sink,
547                           case_map_execute (ds->compactor, case_ref (c)));
548
549       /* Execute temporary transformations. */
550       if (ds->temporary_trns_chain != NULL)
551         {
552           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
553                                        &c, ds->cases_written);
554           if (retval != TRNS_CONTINUE)
555             continue;
556         }
557
558       return c;
559     }
560 }
561
562 /* "destroy" function for procedure casereader. */
563 static void
564 proc_casereader_destroy (struct casereader *reader, void *ds_)
565 {
566   struct dataset *ds = ds_;
567   struct ccase *c;
568
569   /* We are always the subreader for a casereader_buffer, so if we're being
570      destroyed then it's because the casereader_buffer has read all the cases
571      that it ever will. */
572   ds->shim = NULL;
573
574   /* Make sure transformations happen for every input case, in
575      case they have side effects, and ensure that the replacement
576      active dataset gets all the cases it should. */
577   while ((c = casereader_read (reader)) != NULL)
578     case_unref (c);
579
580   ds->proc_state = PROC_CLOSED;
581   ds->ok = casereader_destroy (ds->source) && ds->ok;
582   ds->source = NULL;
583   dataset_set_source (ds, NULL);
584 }
585
586 /* Must return false if the source casereader, a transformation,
587    or the sink casewriter signaled an error.  (If a temporary
588    transformation signals an error, then the return value is
589    false, but the replacement active dataset may still be
590    untainted.) */
591 bool
592 proc_commit (struct dataset *ds)
593 {
594   if (ds->shim != NULL)
595     casereader_shim_slurp (ds->shim);
596
597   assert (ds->proc_state == PROC_CLOSED);
598   ds->proc_state = PROC_COMMITTED;
599
600   dataset_changed__ (ds);
601
602   /* Free memory for lagged cases. */
603   while (!deque_is_empty (&ds->lag))
604     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
605   free (ds->lag_cases);
606
607   /* Dictionary from before TEMPORARY becomes permanent. */
608   proc_cancel_temporary_transformations (ds);
609
610   if (!ds->discard_output)
611     {
612       /* Finish compacting. */
613       if (ds->compactor != NULL)
614         {
615           case_map_destroy (ds->compactor);
616           ds->compactor = NULL;
617
618           dict_delete_scratch_vars (ds->dict);
619           dict_compact_values (ds->dict);
620         }
621
622       /* Old data sink becomes new data source. */
623       if (ds->sink != NULL)
624         ds->source = casewriter_make_reader (ds->sink);
625     }
626   else
627     {
628       ds->source = NULL;
629       ds->discard_output = false;
630     }
631   ds->sink = NULL;
632
633   caseinit_clear (ds->caseinit);
634   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
635
636   dict_clear_vectors (ds->dict);
637   ds->permanent_dict = NULL;
638   return proc_cancel_all_transformations (ds) && ds->ok;
639 }
640
641 /* Casereader class for procedure execution. */
642 static const struct casereader_class proc_casereader_class =
643   {
644     proc_casereader_read,
645     proc_casereader_destroy,
646     NULL,
647     NULL,
648   };
649
650 /* Updates last_proc_invocation. */
651 static void
652 update_last_proc_invocation (struct dataset *ds)
653 {
654   ds->last_proc_invocation = time (NULL);
655 }
656 \f
657 /* Returns a pointer to the lagged case from N_BEFORE cases before the
658    current one, or NULL if there haven't been that many cases yet. */
659 const struct ccase *
660 lagged_case (const struct dataset *ds, int n_before)
661 {
662   assert (n_before >= 1);
663   assert (n_before <= ds->n_lag);
664
665   if (n_before <= deque_count (&ds->lag))
666     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
667   else
668     return NULL;
669 }
670 \f
671 /* Returns the current set of permanent transformations,
672    and clears the permanent transformations.
673    For use by INPUT PROGRAM. */
674 struct trns_chain *
675 proc_capture_transformations (struct dataset *ds)
676 {
677   struct trns_chain *chain;
678
679   assert (ds->temporary_trns_chain == NULL);
680   chain = ds->permanent_trns_chain;
681   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
682   dataset_transformations_changed__ (ds, false);
683
684   return chain;
685 }
686
687 /* Adds a transformation that processes a case with PROC and
688    frees itself with FREE to the current set of transformations.
689    The functions are passed AUX as auxiliary data. */
690 void
691 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
692 {
693   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
694   dataset_transformations_changed__ (ds, true);
695 }
696
697 /* Adds a transformation that processes a case with PROC and
698    frees itself with FREE to the current set of transformations.
699    When parsing of the block of transformations is complete,
700    FINALIZE will be called.
701    The functions are passed AUX as auxiliary data. */
702 void
703 add_transformation_with_finalizer (struct dataset *ds,
704                                    trns_finalize_func *finalize,
705                                    trns_proc_func *proc,
706                                    trns_free_func *free, void *aux)
707 {
708   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
709   dataset_transformations_changed__ (ds, true);
710 }
711
712 /* Returns the index of the next transformation.
713    This value can be returned by a transformation procedure
714    function to indicate a "jump" to that transformation. */
715 size_t
716 next_transformation (const struct dataset *ds)
717 {
718   return trns_chain_next (ds->cur_trns_chain);
719 }
720
721 /* Returns true if the next call to add_transformation() will add
722    a temporary transformation, false if it will add a permanent
723    transformation. */
724 bool
725 proc_in_temporary_transformations (const struct dataset *ds)
726 {
727   return ds->temporary_trns_chain != NULL;
728 }
729
730 /* Marks the start of temporary transformations.
731    Further calls to add_transformation() will add temporary
732    transformations. */
733 void
734 proc_start_temporary_transformations (struct dataset *ds)
735 {
736   if (!proc_in_temporary_transformations (ds))
737     {
738       add_case_limit_trns (ds);
739
740       ds->permanent_dict = dict_clone (ds->dict);
741
742       trns_chain_finalize (ds->permanent_trns_chain);
743       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
744       dataset_transformations_changed__ (ds, true);
745     }
746 }
747
748 /* Converts all the temporary transformations, if any, to permanent
749    transformations.  Further transformations will be permanent.
750
751    The FILTER command is implemented as a temporary transformation, so a
752    procedure that uses this function should usually use proc_open_filtering()
753    with FILTER false, instead of plain proc_open().
754
755    Returns true if anything changed, false otherwise. */
756 bool
757 proc_make_temporary_transformations_permanent (struct dataset *ds)
758 {
759   if (proc_in_temporary_transformations (ds))
760     {
761       trns_chain_finalize (ds->temporary_trns_chain);
762       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
763       ds->temporary_trns_chain = NULL;
764
765       ds->cur_trns_chain = ds->permanent_trns_chain;
766
767       dict_unref (ds->permanent_dict);
768       ds->permanent_dict = NULL;
769
770       return true;
771     }
772   else
773     return false;
774 }
775
776 /* Cancels all temporary transformations, if any.  Further
777    transformations will be permanent.
778    Returns true if anything changed, false otherwise. */
779 bool
780 proc_cancel_temporary_transformations (struct dataset *ds)
781 {
782   if (proc_in_temporary_transformations (ds))
783     {
784       dict_unref (ds->dict);
785       ds->dict = ds->permanent_dict;
786       ds->permanent_dict = NULL;
787
788       trns_chain_destroy (ds->temporary_trns_chain);
789       ds->temporary_trns_chain = NULL;
790       dataset_transformations_changed__ (
791         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
792       return true;
793     }
794   else
795     return false;
796 }
797
798 /* Cancels all transformations, if any.
799    Returns true if successful, false on I/O error. */
800 bool
801 proc_cancel_all_transformations (struct dataset *ds)
802 {
803   bool ok;
804   assert (ds->proc_state == PROC_COMMITTED);
805   ok = trns_chain_destroy (ds->permanent_trns_chain);
806   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
807   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
808   ds->temporary_trns_chain = NULL;
809   dataset_transformations_changed__ (ds, false);
810
811   return ok;
812 }
813
814 static int
815 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
816 {
817   struct variable *var = var_;
818
819   *cc = case_unshare (*cc);
820   *case_num_rw (*cc, var) = case_num;
821
822   return TRNS_CONTINUE;
823 }
824
825 /* Add a variable which we can sort by to get back the original order. */
826 struct variable *
827 add_permanent_ordering_transformation (struct dataset *ds)
828 {
829   struct variable *temp_var;
830
831   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
832   if (proc_in_temporary_transformations (ds))
833     {
834       struct variable *perm_var;
835
836       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
837       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
838                          NULL, perm_var);
839       trns_chain_finalize (ds->permanent_trns_chain);
840     }
841   else
842     add_transformation (ds, store_case_num, NULL, temp_var);
843
844   return temp_var;
845 }
846 \f
847 /* Causes output from the next procedure to be discarded, instead
848    of being preserved for use as input for the next procedure. */
849 void
850 proc_discard_output (struct dataset *ds)
851 {
852   ds->discard_output = true;
853 }
854
855
856 /* Checks whether DS has a corrupted active dataset.  If so,
857    discards it and returns false.  If not, returns true without
858    doing anything. */
859 bool
860 dataset_end_of_command (struct dataset *ds)
861 {
862   if (ds->source != NULL)
863     {
864       if (casereader_error (ds->source))
865         {
866           dataset_clear (ds);
867           return false;
868         }
869       else
870         {
871           const struct taint *taint = casereader_get_taint (ds->source);
872           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
873           assert (!taint_has_tainted_successor (taint));
874         }
875     }
876   return true;
877 }
878 \f
879 static trns_proc_func case_limit_trns_proc;
880 static trns_free_func case_limit_trns_free;
881
882 /* Adds a transformation that limits the number of cases that may
883    pass through, if DS->DICT has a case limit. */
884 static void
885 add_case_limit_trns (struct dataset *ds)
886 {
887   casenumber case_limit = dict_get_case_limit (ds->dict);
888   if (case_limit != 0)
889     {
890       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
891       *cases_remaining = case_limit;
892       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
893                           cases_remaining);
894       dict_set_case_limit (ds->dict, 0);
895     }
896 }
897
898 /* Limits the maximum number of cases processed to
899    *CASES_REMAINING. */
900 static int
901 case_limit_trns_proc (void *cases_remaining_,
902                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
903 {
904   size_t *cases_remaining = cases_remaining_;
905   if (*cases_remaining > 0)
906     {
907       (*cases_remaining)--;
908       return TRNS_CONTINUE;
909     }
910   else
911     return TRNS_DROP_CASE;
912 }
913
914 /* Frees the data associated with a case limit transformation. */
915 static bool
916 case_limit_trns_free (void *cases_remaining_)
917 {
918   size_t *cases_remaining = cases_remaining_;
919   free (cases_remaining);
920   return true;
921 }
922 \f
923 static trns_proc_func filter_trns_proc;
924
925 /* Adds a temporary transformation to filter data according to
926    the variable specified on FILTER, if any. */
927 static void
928 add_filter_trns (struct dataset *ds)
929 {
930   struct variable *filter_var = dict_get_filter (ds->dict);
931   if (filter_var != NULL)
932     {
933       proc_start_temporary_transformations (ds);
934       add_transformation (ds, filter_trns_proc, NULL, filter_var);
935     }
936 }
937
938 /* FILTER transformation. */
939 static int
940 filter_trns_proc (void *filter_var_,
941                   struct ccase **c, casenumber case_nr UNUSED)
942
943 {
944   struct variable *filter_var = filter_var_;
945   double f = case_num (*c, filter_var);
946   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
947           ? TRNS_CONTINUE : TRNS_DROP_CASE);
948 }
949
950
951 void
952 dataset_need_lag (struct dataset *ds, int n_before)
953 {
954   ds->n_lag = MAX (ds->n_lag, n_before);
955 }
956 \f
957 static void
958 dataset_changed__ (struct dataset *ds)
959 {
960   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
961     ds->callbacks->changed (ds->cb_data);
962 }
963
964 static void
965 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
966 {
967   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
968     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
969 }
970 \f
971 /* Private interface for use by session code. */
972
973 void
974 dataset_set_session__ (struct dataset *ds, struct session *session)
975 {
976   ds->session = session;
977 }