Implement DATASET commands.
[pspp-builds.git] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;                    /* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 \f
129 static void
130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152
153   ds->caseinit = caseinit_create ();
154
155   dataset_create_finish__ (ds, session);
156
157   return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167
168    Callbacks are not cloned. */
169 struct dataset *
170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188
189   dataset_create_finish__ (new, old->session);
190
191   return new;
192 }
193
194 /* Destroys DS. */
195 void
196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_destroy (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds);
207     }
208 }
209
210 /* Discards the active dataset's dictionary, data, and transformations. */
211 void
212 dataset_clear (struct dataset *ds)
213 {
214   assert (ds->proc_state == PROC_COMMITTED);
215
216   dict_clear (ds->dict);
217   fh_set_default_handle (NULL);
218
219   ds->n_lag = 0;
220
221   casereader_destroy (ds->source);
222   ds->source = NULL;
223
224   proc_cancel_all_transformations (ds);
225 }
226
227 const char *
228 dataset_name (const struct dataset *ds)
229 {
230   return ds->name;
231 }
232
233 void
234 dataset_set_name (struct dataset *ds, const char *name)
235 {
236   struct session *session = ds->session;
237   bool active = false;
238
239   if (session != NULL)
240     {
241       active = session_active_dataset (session) == ds;
242       if (active)
243         session_set_active_dataset (session, NULL);
244       dataset_set_session (ds, NULL);
245     }
246
247   free (ds->name);
248   ds->name = xstrdup (name);
249
250   if (session != NULL)
251     {
252       dataset_set_session (ds, session);
253       if (active)
254         session_set_active_dataset (session, ds);
255     }
256 }
257
258 struct session *
259 dataset_session (const struct dataset *ds)
260 {
261   return ds->session;
262 }
263
264 void
265 dataset_set_session (struct dataset *ds, struct session *session)
266 {
267   if (session != ds->session)
268     {
269       if (ds->session != NULL)
270         session_remove_dataset (ds->session, ds);
271       if (session != NULL)
272         session_add_dataset (session, ds);
273     }
274 }
275
276 /* Returns the dictionary within DS.  This is always nonnull, although it
277    might not contain any variables. */
278 struct dictionary *
279 dataset_dict (const struct dataset *ds)
280 {
281   return ds->dict;
282 }
283
284 /* Replaces DS's dictionary by DICT, discarding any source and
285    transformations. */
286 void
287 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
288 {
289   assert (ds->proc_state == PROC_COMMITTED);
290   assert (ds->dict != dict);
291
292   dataset_clear (ds);
293
294   dict_destroy (ds->dict);
295   ds->dict = dict;
296   dict_set_change_callback (ds->dict, dict_callback, ds);
297 }
298
299 /* Returns the casereader that will be read when a procedure is executed on
300    DS.  This can be NULL if none has been set up yet. */
301 const struct casereader *
302 dataset_source (const struct dataset *ds)
303 {
304   return ds->source;
305 }
306
307 /* Returns true if DS has a data source, false otherwise. */
308 bool
309 dataset_has_source (const struct dataset *ds)
310 {
311   return dataset_source (ds) != NULL;
312 }
313
314 /* Replaces the active dataset's data by READER.  READER's cases must have an
315    appropriate format for DS's dictionary. */
316 bool
317 dataset_set_source (struct dataset *ds, struct casereader *reader)
318 {
319   casereader_destroy (ds->source);
320   ds->source = reader;
321
322   caseinit_clear (ds->caseinit);
323   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
324
325   return reader == NULL || !casereader_error (reader);
326 }
327
328 /* Returns the data source from DS and removes it from DS.  Returns a null
329    pointer if DS has no data source. */
330 struct casereader *
331 dataset_steal_source (struct dataset *ds)
332 {
333   struct casereader *reader = ds->source;
334   ds->source = NULL;
335
336   return reader;
337 }
338
339 /* Returns a number unique to DS.  It can be used to distinguish one dataset
340    from any other within a given program run, even datasets that do not exist
341    at the same time. */
342 unsigned int
343 dataset_seqno (const struct dataset *ds)
344 {
345   return ds->seqno;
346 }
347
348 void
349 dataset_set_callbacks (struct dataset *ds,
350                        const struct dataset_callbacks *callbacks,
351                        void *cb_data)
352 {
353   ds->callbacks = callbacks;
354   ds->cb_data = cb_data;
355 }
356
357 enum dataset_display
358 dataset_get_display (const struct dataset *ds)
359 {
360   return ds->display;
361 }
362
363 void
364 dataset_set_display (struct dataset *ds, enum dataset_display display)
365 {
366   ds->display = display;
367 }
368 \f
369 /* Returns the last time the data was read. */
370 time_t
371 time_of_last_procedure (struct dataset *ds)
372 {
373   if (ds->last_proc_invocation == 0)
374     update_last_proc_invocation (ds);
375   return ds->last_proc_invocation;
376 }
377 \f
378 /* Regular procedure. */
379
380 /* Executes any pending transformations, if necessary.
381    This is not identical to the EXECUTE command in that it won't
382    always read the source data.  This can be important when the
383    source data is given inline within BEGIN DATA...END FILE. */
384 bool
385 proc_execute (struct dataset *ds)
386 {
387   bool ok;
388
389   if ((ds->temporary_trns_chain == NULL
390        || trns_chain_is_empty (ds->temporary_trns_chain))
391       && trns_chain_is_empty (ds->permanent_trns_chain))
392     {
393       ds->n_lag = 0;
394       ds->discard_output = false;
395       dict_set_case_limit (ds->dict, 0);
396       dict_clear_vectors (ds->dict);
397       return true;
398     }
399
400   ok = casereader_destroy (proc_open (ds));
401   return proc_commit (ds) && ok;
402 }
403
404 static const struct casereader_class proc_casereader_class;
405
406 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
407    cases filtered out with FILTER BY will not be included in the casereader
408    (which is usually desirable).  If FILTER is false, all cases will be
409    included regardless of FILTER BY settings.
410
411    proc_commit must be called when done. */
412 struct casereader *
413 proc_open_filtering (struct dataset *ds, bool filter)
414 {
415   struct casereader *reader;
416
417   assert (ds->source != NULL);
418   assert (ds->proc_state == PROC_COMMITTED);
419
420   update_last_proc_invocation (ds);
421
422   caseinit_mark_for_init (ds->caseinit, ds->dict);
423
424   /* Finish up the collection of transformations. */
425   add_case_limit_trns (ds);
426   if (filter)
427     add_filter_trns (ds);
428   trns_chain_finalize (ds->cur_trns_chain);
429
430   /* Make permanent_dict refer to the dictionary right before
431      data reaches the sink. */
432   if (ds->permanent_dict == NULL)
433     ds->permanent_dict = ds->dict;
434
435   /* Prepare sink. */
436   if (!ds->discard_output)
437     {
438       struct dictionary *pd = ds->permanent_dict;
439       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
440       if (compacted_value_cnt < dict_get_next_value_idx (pd))
441         {
442           struct caseproto *compacted_proto;
443           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
444           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
445           ds->sink = autopaging_writer_create (compacted_proto);
446           caseproto_unref (compacted_proto);
447         }
448       else
449         {
450           ds->compactor = NULL;
451           ds->sink = autopaging_writer_create (dict_get_proto (pd));
452         }
453     }
454   else
455     {
456       ds->compactor = NULL;
457       ds->sink = NULL;
458     }
459
460   /* Allocate memory for lagged cases. */
461   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
462
463   ds->proc_state = PROC_OPEN;
464   ds->cases_written = 0;
465   ds->ok = true;
466
467   /* FIXME: use taint in dataset in place of `ok'? */
468   /* FIXME: for trivial cases we can just return a clone of
469      ds->source? */
470
471   /* Create casereader and insert a shim on top.  The shim allows us to
472      arbitrarily extend the casereader's lifetime, by slurping the cases into
473      the shim's buffer in proc_commit().  That is especially useful when output
474      table_items are generated directly from the procedure casereader (e.g. by
475      the LIST procedure) when we are using an output driver that keeps a
476      reference to the output items passed to it (e.g. the GUI output driver in
477      PSPPIRE). */
478   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
479                                          CASENUMBER_MAX,
480                                          &proc_casereader_class, ds);
481   ds->shim = casereader_shim_insert (reader);
482   return reader;
483 }
484
485 /* Opens dataset DS for reading cases with proc_read.
486    proc_commit must be called when done. */
487 struct casereader *
488 proc_open (struct dataset *ds)
489 {
490   return proc_open_filtering (ds, true);
491 }
492
493 /* Returns true if a procedure is in progress, that is, if
494    proc_open has been called but proc_commit has not. */
495 bool
496 proc_is_open (const struct dataset *ds)
497 {
498   return ds->proc_state != PROC_COMMITTED;
499 }
500
501 /* "read" function for procedure casereader. */
502 static struct ccase *
503 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
504 {
505   struct dataset *ds = ds_;
506   enum trns_result retval = TRNS_DROP_CASE;
507   struct ccase *c;
508
509   assert (ds->proc_state == PROC_OPEN);
510   for (; ; case_unref (c))
511     {
512       casenumber case_nr;
513
514       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
515       if (retval == TRNS_ERROR)
516         ds->ok = false;
517       if (!ds->ok)
518         return NULL;
519
520       /* Read a case from source. */
521       c = casereader_read (ds->source);
522       if (c == NULL)
523         return NULL;
524       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
525       caseinit_init_vars (ds->caseinit, c);
526
527       /* Execute permanent transformations.  */
528       case_nr = ds->cases_written + 1;
529       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
530                                    &c, case_nr);
531       caseinit_update_left_vars (ds->caseinit, c);
532       if (retval != TRNS_CONTINUE)
533         continue;
534
535       /* Write case to collection of lagged cases. */
536       if (ds->n_lag > 0)
537         {
538           while (deque_count (&ds->lag) >= ds->n_lag)
539             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
540           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
541         }
542
543       /* Write case to replacement dataset. */
544       ds->cases_written++;
545       if (ds->sink != NULL)
546         casewriter_write (ds->sink,
547                           case_map_execute (ds->compactor, case_ref (c)));
548
549       /* Execute temporary transformations. */
550       if (ds->temporary_trns_chain != NULL)
551         {
552           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
553                                        &c, ds->cases_written);
554           if (retval != TRNS_CONTINUE)
555             continue;
556         }
557
558       return c;
559     }
560 }
561
562 /* "destroy" function for procedure casereader. */
563 static void
564 proc_casereader_destroy (struct casereader *reader, void *ds_)
565 {
566   struct dataset *ds = ds_;
567   struct ccase *c;
568
569   /* We are always the subreader for a casereader_buffer, so if we're being
570      destroyed then it's because the casereader_buffer has read all the cases
571      that it ever will. */
572   ds->shim = NULL;
573
574   /* Make sure transformations happen for every input case, in
575      case they have side effects, and ensure that the replacement
576      active dataset gets all the cases it should. */
577   while ((c = casereader_read (reader)) != NULL)
578     case_unref (c);
579
580   ds->proc_state = PROC_CLOSED;
581   ds->ok = casereader_destroy (ds->source) && ds->ok;
582   ds->source = NULL;
583   dataset_set_source (ds, NULL);
584 }
585
586 /* Must return false if the source casereader, a transformation,
587    or the sink casewriter signaled an error.  (If a temporary
588    transformation signals an error, then the return value is
589    false, but the replacement active dataset may still be
590    untainted.) */
591 bool
592 proc_commit (struct dataset *ds)
593 {
594   if (ds->shim != NULL)
595     casereader_shim_slurp (ds->shim);
596
597   assert (ds->proc_state == PROC_CLOSED);
598   ds->proc_state = PROC_COMMITTED;
599
600   dataset_changed__ (ds);
601
602   /* Free memory for lagged cases. */
603   while (!deque_is_empty (&ds->lag))
604     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
605   free (ds->lag_cases);
606
607   /* Dictionary from before TEMPORARY becomes permanent. */
608   proc_cancel_temporary_transformations (ds);
609
610   if (!ds->discard_output)
611     {
612       /* Finish compacting. */
613       if (ds->compactor != NULL)
614         {
615           case_map_destroy (ds->compactor);
616           ds->compactor = NULL;
617
618           dict_delete_scratch_vars (ds->dict);
619           dict_compact_values (ds->dict);
620         }
621
622       /* Old data sink becomes new data source. */
623       if (ds->sink != NULL)
624         ds->source = casewriter_make_reader (ds->sink);
625     }
626   else
627     {
628       ds->source = NULL;
629       ds->discard_output = false;
630     }
631   ds->sink = NULL;
632
633   caseinit_clear (ds->caseinit);
634   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
635
636   dict_clear_vectors (ds->dict);
637   ds->permanent_dict = NULL;
638   return proc_cancel_all_transformations (ds) && ds->ok;
639 }
640
641 /* Casereader class for procedure execution. */
642 static const struct casereader_class proc_casereader_class =
643   {
644     proc_casereader_read,
645     proc_casereader_destroy,
646     NULL,
647     NULL,
648   };
649
650 /* Updates last_proc_invocation. */
651 static void
652 update_last_proc_invocation (struct dataset *ds)
653 {
654   ds->last_proc_invocation = time (NULL);
655 }
656 \f
657 /* Returns a pointer to the lagged case from N_BEFORE cases before the
658    current one, or NULL if there haven't been that many cases yet. */
659 const struct ccase *
660 lagged_case (const struct dataset *ds, int n_before)
661 {
662   assert (n_before >= 1);
663   assert (n_before <= ds->n_lag);
664
665   if (n_before <= deque_count (&ds->lag))
666     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
667   else
668     return NULL;
669 }
670 \f
671 /* Returns the current set of permanent transformations,
672    and clears the permanent transformations.
673    For use by INPUT PROGRAM. */
674 struct trns_chain *
675 proc_capture_transformations (struct dataset *ds)
676 {
677   struct trns_chain *chain;
678
679   assert (ds->temporary_trns_chain == NULL);
680   chain = ds->permanent_trns_chain;
681   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
682   dataset_transformations_changed__ (ds, false);
683
684   return chain;
685 }
686
687 /* Adds a transformation that processes a case with PROC and
688    frees itself with FREE to the current set of transformations.
689    The functions are passed AUX as auxiliary data. */
690 void
691 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
692 {
693   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
694   dataset_transformations_changed__ (ds, true);
695 }
696
697 /* Adds a transformation that processes a case with PROC and
698    frees itself with FREE to the current set of transformations.
699    When parsing of the block of transformations is complete,
700    FINALIZE will be called.
701    The functions are passed AUX as auxiliary data. */
702 void
703 add_transformation_with_finalizer (struct dataset *ds,
704                                    trns_finalize_func *finalize,
705                                    trns_proc_func *proc,
706                                    trns_free_func *free, void *aux)
707 {
708   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
709   dataset_transformations_changed__ (ds, true);
710 }
711
712 /* Returns the index of the next transformation.
713    This value can be returned by a transformation procedure
714    function to indicate a "jump" to that transformation. */
715 size_t
716 next_transformation (const struct dataset *ds)
717 {
718   return trns_chain_next (ds->cur_trns_chain);
719 }
720
721 /* Returns true if the next call to add_transformation() will add
722    a temporary transformation, false if it will add a permanent
723    transformation. */
724 bool
725 proc_in_temporary_transformations (const struct dataset *ds)
726 {
727   return ds->temporary_trns_chain != NULL;
728 }
729
730 /* Marks the start of temporary transformations.
731    Further calls to add_transformation() will add temporary
732    transformations. */
733 void
734 proc_start_temporary_transformations (struct dataset *ds)
735 {
736   if (!proc_in_temporary_transformations (ds))
737     {
738       add_case_limit_trns (ds);
739
740       ds->permanent_dict = dict_clone (ds->dict);
741
742       trns_chain_finalize (ds->permanent_trns_chain);
743       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
744       dataset_transformations_changed__ (ds, true);
745     }
746 }
747
748 /* Converts all the temporary transformations, if any, to
749    permanent transformations.  Further transformations will be
750    permanent.
751    Returns true if anything changed, false otherwise. */
752 bool
753 proc_make_temporary_transformations_permanent (struct dataset *ds)
754 {
755   if (proc_in_temporary_transformations (ds))
756     {
757       trns_chain_finalize (ds->temporary_trns_chain);
758       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
759       ds->temporary_trns_chain = NULL;
760
761       dict_destroy (ds->permanent_dict);
762       ds->permanent_dict = NULL;
763
764       return true;
765     }
766   else
767     return false;
768 }
769
770 /* Cancels all temporary transformations, if any.  Further
771    transformations will be permanent.
772    Returns true if anything changed, false otherwise. */
773 bool
774 proc_cancel_temporary_transformations (struct dataset *ds)
775 {
776   if (proc_in_temporary_transformations (ds))
777     {
778       dict_destroy (ds->dict);
779       ds->dict = ds->permanent_dict;
780       ds->permanent_dict = NULL;
781
782       trns_chain_destroy (ds->temporary_trns_chain);
783       ds->temporary_trns_chain = NULL;
784       dataset_transformations_changed__ (
785         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
786       return true;
787     }
788   else
789     return false;
790 }
791
792 /* Cancels all transformations, if any.
793    Returns true if successful, false on I/O error. */
794 bool
795 proc_cancel_all_transformations (struct dataset *ds)
796 {
797   bool ok;
798   assert (ds->proc_state == PROC_COMMITTED);
799   ok = trns_chain_destroy (ds->permanent_trns_chain);
800   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
801   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
802   ds->temporary_trns_chain = NULL;
803   dataset_transformations_changed__ (ds, false);
804
805   return ok;
806 }
807 \f
808 /* Causes output from the next procedure to be discarded, instead
809    of being preserved for use as input for the next procedure. */
810 void
811 proc_discard_output (struct dataset *ds)
812 {
813   ds->discard_output = true;
814 }
815
816
817 /* Checks whether DS has a corrupted active dataset.  If so,
818    discards it and returns false.  If not, returns true without
819    doing anything. */
820 bool
821 dataset_end_of_command (struct dataset *ds)
822 {
823   if (ds->source != NULL)
824     {
825       if (casereader_error (ds->source))
826         {
827           dataset_clear (ds);
828           return false;
829         }
830       else
831         {
832           const struct taint *taint = casereader_get_taint (ds->source);
833           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
834           assert (!taint_has_tainted_successor (taint));
835         }
836     }
837   return true;
838 }
839 \f
840 static trns_proc_func case_limit_trns_proc;
841 static trns_free_func case_limit_trns_free;
842
843 /* Adds a transformation that limits the number of cases that may
844    pass through, if DS->DICT has a case limit. */
845 static void
846 add_case_limit_trns (struct dataset *ds)
847 {
848   casenumber case_limit = dict_get_case_limit (ds->dict);
849   if (case_limit != 0)
850     {
851       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
852       *cases_remaining = case_limit;
853       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
854                           cases_remaining);
855       dict_set_case_limit (ds->dict, 0);
856     }
857 }
858
859 /* Limits the maximum number of cases processed to
860    *CASES_REMAINING. */
861 static int
862 case_limit_trns_proc (void *cases_remaining_,
863                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
864 {
865   size_t *cases_remaining = cases_remaining_;
866   if (*cases_remaining > 0)
867     {
868       (*cases_remaining)--;
869       return TRNS_CONTINUE;
870     }
871   else
872     return TRNS_DROP_CASE;
873 }
874
875 /* Frees the data associated with a case limit transformation. */
876 static bool
877 case_limit_trns_free (void *cases_remaining_)
878 {
879   size_t *cases_remaining = cases_remaining_;
880   free (cases_remaining);
881   return true;
882 }
883 \f
884 static trns_proc_func filter_trns_proc;
885
886 /* Adds a temporary transformation to filter data according to
887    the variable specified on FILTER, if any. */
888 static void
889 add_filter_trns (struct dataset *ds)
890 {
891   struct variable *filter_var = dict_get_filter (ds->dict);
892   if (filter_var != NULL)
893     {
894       proc_start_temporary_transformations (ds);
895       add_transformation (ds, filter_trns_proc, NULL, filter_var);
896     }
897 }
898
899 /* FILTER transformation. */
900 static int
901 filter_trns_proc (void *filter_var_,
902                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
903
904 {
905   struct variable *filter_var = filter_var_;
906   double f = case_num (*c, filter_var);
907   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
908           ? TRNS_CONTINUE : TRNS_DROP_CASE);
909 }
910
911
912 void
913 dataset_need_lag (struct dataset *ds, int n_before)
914 {
915   ds->n_lag = MAX (ds->n_lag, n_before);
916 }
917 \f
918 static void
919 dataset_changed__ (struct dataset *ds)
920 {
921   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
922     ds->callbacks->changed (ds->cb_data);
923 }
924
925 static void
926 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
927 {
928   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
929     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
930 }
931 \f
932 /* Private interface for use by session code. */
933
934 void
935 dataset_set_session__ (struct dataset *ds, struct session *session)
936 {
937   ds->session = session;
938 }