Work toward getting rid of finalizers and control stacks and jumping around among...
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds = XZALLOC (struct dataset);
147   ds->name = xstrdup (name);
148   ds->display = DATASET_FRONT;
149   ds->dict = dict_create (get_default_encoding ());
150
151   ds->caseinit = caseinit_create ();
152
153   dataset_create_finish__ (ds, session);
154
155   return ds;
156 }
157
158 /* Creates and returns a new dataset that has the same data and dictionary as
159    OLD named NAME, adds it to the same session as OLD, and returns the new
160    dataset.  If SESSION already contains a dataset named NAME, it is deleted
161    and replaced.
162
163    OLD must not have any active transformations or temporary state and must
164    not be in the middle of a procedure.
165
166    Callbacks are not cloned. */
167 struct dataset *
168 dataset_clone (struct dataset *old, const char *name)
169 {
170   struct dataset *new;
171
172   assert (old->proc_state == PROC_COMMITTED);
173   assert (trns_chain_is_empty (old->permanent_trns_chain));
174   assert (old->permanent_dict == NULL);
175   assert (old->sink == NULL);
176   assert (old->temporary_trns_chain == NULL);
177
178   new = xzalloc (sizeof *new);
179   new->name = xstrdup (name);
180   new->display = DATASET_FRONT;
181   new->source = casereader_clone (old->source);
182   new->dict = dict_clone (old->dict);
183   new->caseinit = caseinit_clone (old->caseinit);
184   new->last_proc_invocation = old->last_proc_invocation;
185   new->ok = old->ok;
186
187   dataset_create_finish__ (new, old->session);
188
189   return new;
190 }
191
192 /* Destroys DS. */
193 void
194 dataset_destroy (struct dataset *ds)
195 {
196   if (ds != NULL)
197     {
198       dataset_set_session (ds, NULL);
199       dataset_clear (ds);
200       dict_unref (ds->dict);
201       dict_unref (ds->permanent_dict);
202       caseinit_destroy (ds->caseinit);
203       trns_chain_destroy (ds->permanent_trns_chain);
204       dataset_transformations_changed__ (ds, false);
205       free (ds->name);
206       free (ds);
207     }
208 }
209
210 /* Discards the active dataset's dictionary, data, and transformations. */
211 void
212 dataset_clear (struct dataset *ds)
213 {
214   assert (ds->proc_state == PROC_COMMITTED);
215
216   dict_clear (ds->dict);
217   fh_set_default_handle (NULL);
218
219   ds->n_lag = 0;
220
221   casereader_destroy (ds->source);
222   ds->source = NULL;
223
224   proc_cancel_all_transformations (ds);
225 }
226
227 const char *
228 dataset_name (const struct dataset *ds)
229 {
230   return ds->name;
231 }
232
233 void
234 dataset_set_name (struct dataset *ds, const char *name)
235 {
236   struct session *session = ds->session;
237   bool active = false;
238
239   if (session != NULL)
240     {
241       active = session_active_dataset (session) == ds;
242       if (active)
243         session_set_active_dataset (session, NULL);
244       dataset_set_session (ds, NULL);
245     }
246
247   free (ds->name);
248   ds->name = xstrdup (name);
249
250   if (session != NULL)
251     {
252       dataset_set_session (ds, session);
253       if (active)
254         session_set_active_dataset (session, ds);
255     }
256 }
257
258 struct session *
259 dataset_session (const struct dataset *ds)
260 {
261   return ds->session;
262 }
263
264 void
265 dataset_set_session (struct dataset *ds, struct session *session)
266 {
267   if (session != ds->session)
268     {
269       if (ds->session != NULL)
270         session_remove_dataset (ds->session, ds);
271       if (session != NULL)
272         session_add_dataset (session, ds);
273     }
274 }
275
276 /* Returns the dictionary within DS.  This is always nonnull, although it
277    might not contain any variables. */
278 struct dictionary *
279 dataset_dict (const struct dataset *ds)
280 {
281   return ds->dict;
282 }
283
284 /* Replaces DS's dictionary by DICT, discarding any source and
285    transformations. */
286 void
287 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
288 {
289   assert (ds->proc_state == PROC_COMMITTED);
290   assert (ds->dict != dict);
291
292   dataset_clear (ds);
293
294   dict_unref (ds->dict);
295   ds->dict = dict;
296   dict_set_change_callback (ds->dict, dict_callback, ds);
297 }
298
299 /* Returns the casereader that will be read when a procedure is executed on
300    DS.  This can be NULL if none has been set up yet. */
301 const struct casereader *
302 dataset_source (const struct dataset *ds)
303 {
304   return ds->source;
305 }
306
307 /* Returns true if DS has a data source, false otherwise. */
308 bool
309 dataset_has_source (const struct dataset *ds)
310 {
311   return dataset_source (ds) != NULL;
312 }
313
314 /* Replaces the active dataset's data by READER.  READER's cases must have an
315    appropriate format for DS's dictionary. */
316 bool
317 dataset_set_source (struct dataset *ds, struct casereader *reader)
318 {
319   casereader_destroy (ds->source);
320   ds->source = reader;
321
322   caseinit_clear (ds->caseinit);
323   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
324
325   return reader == NULL || !casereader_error (reader);
326 }
327
328 /* Returns the data source from DS and removes it from DS.  Returns a null
329    pointer if DS has no data source. */
330 struct casereader *
331 dataset_steal_source (struct dataset *ds)
332 {
333   struct casereader *reader = ds->source;
334   ds->source = NULL;
335
336   return reader;
337 }
338
339 /* Returns a number unique to DS.  It can be used to distinguish one dataset
340    from any other within a given program run, even datasets that do not exist
341    at the same time. */
342 unsigned int
343 dataset_seqno (const struct dataset *ds)
344 {
345   return ds->seqno;
346 }
347
348 void
349 dataset_set_callbacks (struct dataset *ds,
350                        const struct dataset_callbacks *callbacks,
351                        void *cb_data)
352 {
353   ds->callbacks = callbacks;
354   ds->cb_data = cb_data;
355 }
356
357 enum dataset_display
358 dataset_get_display (const struct dataset *ds)
359 {
360   return ds->display;
361 }
362
363 void
364 dataset_set_display (struct dataset *ds, enum dataset_display display)
365 {
366   ds->display = display;
367 }
368 \f
369 /* Returns the last time the data was read. */
370 time_t
371 time_of_last_procedure (struct dataset *ds)
372 {
373   if (ds->last_proc_invocation == 0)
374     update_last_proc_invocation (ds);
375   return ds->last_proc_invocation;
376 }
377 \f
378 /* Regular procedure. */
379
380 /* Executes any pending transformations, if necessary.
381    This is not identical to the EXECUTE command in that it won't
382    always read the source data.  This can be important when the
383    source data is given inline within BEGIN DATA...END FILE. */
384 bool
385 proc_execute (struct dataset *ds)
386 {
387   bool ok;
388
389   if ((ds->temporary_trns_chain == NULL
390        || trns_chain_is_empty (ds->temporary_trns_chain))
391       && trns_chain_is_empty (ds->permanent_trns_chain))
392     {
393       ds->n_lag = 0;
394       ds->discard_output = false;
395       dict_set_case_limit (ds->dict, 0);
396       dict_clear_vectors (ds->dict);
397       return true;
398     }
399
400   ok = casereader_destroy (proc_open (ds));
401   return proc_commit (ds) && ok;
402 }
403
404 static const struct casereader_class proc_casereader_class;
405
406 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
407    cases filtered out with FILTER BY will not be included in the casereader
408    (which is usually desirable).  If FILTER is false, all cases will be
409    included regardless of FILTER BY settings.
410
411    proc_commit must be called when done. */
412 struct casereader *
413 proc_open_filtering (struct dataset *ds, bool filter)
414 {
415   struct casereader *reader;
416
417   assert (ds->source != NULL);
418   assert (ds->proc_state == PROC_COMMITTED);
419
420   update_last_proc_invocation (ds);
421
422   caseinit_mark_for_init (ds->caseinit, ds->dict);
423
424   /* Finish up the collection of transformations. */
425   add_case_limit_trns (ds);
426   if (filter)
427     add_filter_trns (ds);
428
429   /* Make permanent_dict refer to the dictionary right before
430      data reaches the sink. */
431   if (ds->permanent_dict == NULL)
432     ds->permanent_dict = ds->dict;
433
434   /* Prepare sink. */
435   if (!ds->discard_output)
436     {
437       struct dictionary *pd = ds->permanent_dict;
438       size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
439       if (compacted_n_values < dict_get_next_value_idx (pd))
440         {
441           struct caseproto *compacted_proto;
442           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
443           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
444           ds->sink = autopaging_writer_create (compacted_proto);
445           caseproto_unref (compacted_proto);
446         }
447       else
448         {
449           ds->compactor = NULL;
450           ds->sink = autopaging_writer_create (dict_get_proto (pd));
451         }
452     }
453   else
454     {
455       ds->compactor = NULL;
456       ds->sink = NULL;
457     }
458
459   /* Allocate memory for lagged cases. */
460   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
461
462   ds->proc_state = PROC_OPEN;
463   ds->cases_written = 0;
464   ds->ok = true;
465
466   /* FIXME: use taint in dataset in place of `ok'? */
467   /* FIXME: for trivial cases we can just return a clone of
468      ds->source? */
469
470   /* Create casereader and insert a shim on top.  The shim allows us to
471      arbitrarily extend the casereader's lifetime, by slurping the cases into
472      the shim's buffer in proc_commit().  That is especially useful when output
473      table_items are generated directly from the procedure casereader (e.g. by
474      the LIST procedure) when we are using an output driver that keeps a
475      reference to the output items passed to it (e.g. the GUI output driver in
476      PSPPIRE). */
477   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
478                                          CASENUMBER_MAX,
479                                          &proc_casereader_class, ds);
480   ds->shim = casereader_shim_insert (reader);
481   return reader;
482 }
483
484 /* Opens dataset DS for reading cases with proc_read.
485    proc_commit must be called when done. */
486 struct casereader *
487 proc_open (struct dataset *ds)
488 {
489   return proc_open_filtering (ds, true);
490 }
491
492 /* Returns true if a procedure is in progress, that is, if
493    proc_open has been called but proc_commit has not. */
494 bool
495 proc_is_open (const struct dataset *ds)
496 {
497   return ds->proc_state != PROC_COMMITTED;
498 }
499
500 /* "read" function for procedure casereader. */
501 static struct ccase *
502 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
503 {
504   struct dataset *ds = ds_;
505   enum trns_result retval = TRNS_DROP_CASE;
506   struct ccase *c;
507
508   assert (ds->proc_state == PROC_OPEN);
509   for (; ; case_unref (c))
510     {
511       casenumber case_nr;
512
513       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
514       if (retval == TRNS_ERROR)
515         ds->ok = false;
516       if (!ds->ok)
517         return NULL;
518
519       /* Read a case from source. */
520       c = casereader_read (ds->source);
521       if (c == NULL)
522         return NULL;
523       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
524       caseinit_init_vars (ds->caseinit, c);
525
526       /* Execute permanent transformations.  */
527       case_nr = ds->cases_written + 1;
528       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
529                                    &c, case_nr);
530       caseinit_update_left_vars (ds->caseinit, c);
531       if (retval != TRNS_CONTINUE)
532         continue;
533
534       /* Write case to collection of lagged cases. */
535       if (ds->n_lag > 0)
536         {
537           while (deque_count (&ds->lag) >= ds->n_lag)
538             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
539           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
540         }
541
542       /* Write case to replacement dataset. */
543       ds->cases_written++;
544       if (ds->sink != NULL)
545         casewriter_write (ds->sink,
546                           case_map_execute (ds->compactor, case_ref (c)));
547
548       /* Execute temporary transformations. */
549       if (ds->temporary_trns_chain != NULL)
550         {
551           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
552                                        &c, ds->cases_written);
553           if (retval != TRNS_CONTINUE)
554             continue;
555         }
556
557       return c;
558     }
559 }
560
561 /* "destroy" function for procedure casereader. */
562 static void
563 proc_casereader_destroy (struct casereader *reader, void *ds_)
564 {
565   struct dataset *ds = ds_;
566   struct ccase *c;
567
568   /* We are always the subreader for a casereader_buffer, so if we're being
569      destroyed then it's because the casereader_buffer has read all the cases
570      that it ever will. */
571   ds->shim = NULL;
572
573   /* Make sure transformations happen for every input case, in
574      case they have side effects, and ensure that the replacement
575      active dataset gets all the cases it should. */
576   while ((c = casereader_read (reader)) != NULL)
577     case_unref (c);
578
579   ds->proc_state = PROC_CLOSED;
580   ds->ok = casereader_destroy (ds->source) && ds->ok;
581   ds->source = NULL;
582   dataset_set_source (ds, NULL);
583 }
584
585 /* Must return false if the source casereader, a transformation,
586    or the sink casewriter signaled an error.  (If a temporary
587    transformation signals an error, then the return value is
588    false, but the replacement active dataset may still be
589    untainted.) */
590 bool
591 proc_commit (struct dataset *ds)
592 {
593   if (ds->shim != NULL)
594     casereader_shim_slurp (ds->shim);
595
596   assert (ds->proc_state == PROC_CLOSED);
597   ds->proc_state = PROC_COMMITTED;
598
599   dataset_changed__ (ds);
600
601   /* Free memory for lagged cases. */
602   while (!deque_is_empty (&ds->lag))
603     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
604   free (ds->lag_cases);
605
606   /* Dictionary from before TEMPORARY becomes permanent. */
607   proc_cancel_temporary_transformations (ds);
608
609   if (!ds->discard_output)
610     {
611       /* Finish compacting. */
612       if (ds->compactor != NULL)
613         {
614           case_map_destroy (ds->compactor);
615           ds->compactor = NULL;
616
617           dict_delete_scratch_vars (ds->dict);
618           dict_compact_values (ds->dict);
619         }
620
621       /* Old data sink becomes new data source. */
622       if (ds->sink != NULL)
623         ds->source = casewriter_make_reader (ds->sink);
624     }
625   else
626     {
627       ds->source = NULL;
628       ds->discard_output = false;
629     }
630   ds->sink = NULL;
631
632   caseinit_clear (ds->caseinit);
633   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
634
635   dict_clear_vectors (ds->dict);
636   ds->permanent_dict = NULL;
637   return proc_cancel_all_transformations (ds) && ds->ok;
638 }
639
640 /* Casereader class for procedure execution. */
641 static const struct casereader_class proc_casereader_class =
642   {
643     proc_casereader_read,
644     proc_casereader_destroy,
645     NULL,
646     NULL,
647   };
648
649 /* Updates last_proc_invocation. */
650 static void
651 update_last_proc_invocation (struct dataset *ds)
652 {
653   ds->last_proc_invocation = time (NULL);
654 }
655 \f
656 /* Returns a pointer to the lagged case from N_BEFORE cases before the
657    current one, or NULL if there haven't been that many cases yet. */
658 const struct ccase *
659 lagged_case (const struct dataset *ds, int n_before)
660 {
661   assert (n_before >= 1);
662   assert (n_before <= ds->n_lag);
663
664   if (n_before <= deque_count (&ds->lag))
665     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
666   else
667     return NULL;
668 }
669 \f
670 /* Returns the current set of permanent transformations,
671    and clears the permanent transformations.
672    For use by INPUT PROGRAM. */
673 struct trns_chain *
674 proc_capture_transformations (struct dataset *ds)
675 {
676   struct trns_chain *chain;
677
678   assert (ds->temporary_trns_chain == NULL);
679   chain = ds->permanent_trns_chain;
680   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
681   dataset_transformations_changed__ (ds, false);
682
683   return chain;
684 }
685
686 /* Adds a transformation that processes a case with PROC and
687    frees itself with FREE to the current set of transformations.
688    The functions are passed AUX as auxiliary data. */
689 void
690 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
691 {
692   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
693   dataset_transformations_changed__ (ds, true);
694 }
695
696 /* Returns true if the next call to add_transformation() will add
697    a temporary transformation, false if it will add a permanent
698    transformation. */
699 bool
700 proc_in_temporary_transformations (const struct dataset *ds)
701 {
702   return ds->temporary_trns_chain != NULL;
703 }
704
705 /* Marks the start of temporary transformations.
706    Further calls to add_transformation() will add temporary
707    transformations. */
708 void
709 proc_start_temporary_transformations (struct dataset *ds)
710 {
711   if (!proc_in_temporary_transformations (ds))
712     {
713       add_case_limit_trns (ds);
714
715       ds->permanent_dict = dict_clone (ds->dict);
716
717       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
718       dataset_transformations_changed__ (ds, true);
719     }
720 }
721
722 /* Converts all the temporary transformations, if any, to permanent
723    transformations.  Further transformations will be permanent.
724
725    The FILTER command is implemented as a temporary transformation, so a
726    procedure that uses this function should usually use proc_open_filtering()
727    with FILTER false, instead of plain proc_open().
728
729    Returns true if anything changed, false otherwise. */
730 bool
731 proc_make_temporary_transformations_permanent (struct dataset *ds)
732 {
733   if (proc_in_temporary_transformations (ds))
734     {
735       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
736       ds->temporary_trns_chain = NULL;
737
738       ds->cur_trns_chain = ds->permanent_trns_chain;
739
740       dict_unref (ds->permanent_dict);
741       ds->permanent_dict = NULL;
742
743       return true;
744     }
745   else
746     return false;
747 }
748
749 /* Cancels all temporary transformations, if any.  Further
750    transformations will be permanent.
751    Returns true if anything changed, false otherwise. */
752 bool
753 proc_cancel_temporary_transformations (struct dataset *ds)
754 {
755   if (proc_in_temporary_transformations (ds))
756     {
757       dict_unref (ds->dict);
758       ds->dict = ds->permanent_dict;
759       ds->permanent_dict = NULL;
760
761       trns_chain_destroy (ds->temporary_trns_chain);
762       ds->temporary_trns_chain = NULL;
763       dataset_transformations_changed__ (
764         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
765       return true;
766     }
767   else
768     return false;
769 }
770
771 /* Cancels all transformations, if any.
772    Returns true if successful, false on I/O error. */
773 bool
774 proc_cancel_all_transformations (struct dataset *ds)
775 {
776   bool ok;
777   assert (ds->proc_state == PROC_COMMITTED);
778   ok = trns_chain_destroy (ds->permanent_trns_chain);
779   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
780   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
781   ds->temporary_trns_chain = NULL;
782   dataset_transformations_changed__ (ds, false);
783
784   return ok;
785 }
786
787 static enum trns_result
788 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
789 {
790   struct variable *var = var_;
791
792   *cc = case_unshare (*cc);
793   *case_num_rw (*cc, var) = case_num;
794
795   return TRNS_CONTINUE;
796 }
797
798 /* Add a variable which we can sort by to get back the original order. */
799 struct variable *
800 add_permanent_ordering_transformation (struct dataset *ds)
801 {
802   struct variable *temp_var;
803
804   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
805   if (proc_in_temporary_transformations (ds))
806     {
807       struct variable *perm_var;
808
809       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
810       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
811                          NULL, perm_var);
812     }
813   else
814     add_transformation (ds, store_case_num, NULL, temp_var);
815
816   return temp_var;
817 }
818 \f
819 /* Causes output from the next procedure to be discarded, instead
820    of being preserved for use as input for the next procedure. */
821 void
822 proc_discard_output (struct dataset *ds)
823 {
824   ds->discard_output = true;
825 }
826
827
828 /* Checks whether DS has a corrupted active dataset.  If so,
829    discards it and returns false.  If not, returns true without
830    doing anything. */
831 bool
832 dataset_end_of_command (struct dataset *ds)
833 {
834   if (ds->source != NULL)
835     {
836       if (casereader_error (ds->source))
837         {
838           dataset_clear (ds);
839           return false;
840         }
841       else
842         {
843           const struct taint *taint = casereader_get_taint (ds->source);
844           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
845           assert (!taint_has_tainted_successor (taint));
846         }
847     }
848   return true;
849 }
850 \f
851 static trns_proc_func case_limit_trns_proc;
852 static trns_free_func case_limit_trns_free;
853
854 /* Adds a transformation that limits the number of cases that may
855    pass through, if DS->DICT has a case limit. */
856 static void
857 add_case_limit_trns (struct dataset *ds)
858 {
859   casenumber case_limit = dict_get_case_limit (ds->dict);
860   if (case_limit != 0)
861     {
862       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
863       *cases_remaining = case_limit;
864       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
865                           cases_remaining);
866       dict_set_case_limit (ds->dict, 0);
867     }
868 }
869
870 /* Limits the maximum number of cases processed to
871    *CASES_REMAINING. */
872 static enum trns_result
873 case_limit_trns_proc (void *cases_remaining_,
874                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
875 {
876   size_t *cases_remaining = cases_remaining_;
877   if (*cases_remaining > 0)
878     {
879       (*cases_remaining)--;
880       return TRNS_CONTINUE;
881     }
882   else
883     return TRNS_DROP_CASE;
884 }
885
886 /* Frees the data associated with a case limit transformation. */
887 static bool
888 case_limit_trns_free (void *cases_remaining_)
889 {
890   size_t *cases_remaining = cases_remaining_;
891   free (cases_remaining);
892   return true;
893 }
894 \f
895 static trns_proc_func filter_trns_proc;
896
897 /* Adds a temporary transformation to filter data according to
898    the variable specified on FILTER, if any. */
899 static void
900 add_filter_trns (struct dataset *ds)
901 {
902   struct variable *filter_var = dict_get_filter (ds->dict);
903   if (filter_var != NULL)
904     {
905       proc_start_temporary_transformations (ds);
906       add_transformation (ds, filter_trns_proc, NULL, filter_var);
907     }
908 }
909
910 /* FILTER transformation. */
911 static enum trns_result
912 filter_trns_proc (void *filter_var_,
913                   struct ccase **c, casenumber case_nr UNUSED)
914
915 {
916   struct variable *filter_var = filter_var_;
917   double f = case_num (*c, filter_var);
918   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
919           ? TRNS_CONTINUE : TRNS_DROP_CASE);
920 }
921
922
923 void
924 dataset_need_lag (struct dataset *ds, int n_before)
925 {
926   ds->n_lag = MAX (ds->n_lag, n_before);
927 }
928 \f
929 static void
930 dataset_changed__ (struct dataset *ds)
931 {
932   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
933     ds->callbacks->changed (ds->cb_data);
934 }
935
936 static void
937 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
938 {
939   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
940     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
941 }
942 \f
943 /* Private interface for use by session code. */
944
945 void
946 dataset_set_session__ (struct dataset *ds, struct session *session)
947 {
948   ds->session = session;
949 }