make-file: Remove superfluous fflush().
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/procedure.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* Callback which occurs whenever the transformation chain(s) have
64      been modified */
65   transformation_change_callback_func *xform_callback;
66   void *xform_callback_aux;
67
68   /* If true, cases are discarded instead of being written to
69      sink. */
70   bool discard_output;
71
72   /* The transformation chain that the next transformation will be
73      added to. */
74   struct trns_chain *cur_trns_chain;
75
76   /* The case map used to compact a case, if necessary;
77      otherwise a null pointer. */
78   struct case_map *compactor;
79
80   /* Time at which proc was last invoked. */
81   time_t last_proc_invocation;
82
83   /* Cases just before ("lagging") the current one. */
84   int n_lag;                    /* Number of cases to lag. */
85   struct deque lag;             /* Deque of lagged cases. */
86   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
87
88   /* Procedure data. */
89   enum
90     {
91       PROC_COMMITTED,           /* No procedure in progress. */
92       PROC_OPEN,                /* proc_open called, casereader still open. */
93       PROC_CLOSED               /* casereader from proc_open destroyed,
94                                    but proc_commit not yet called. */
95     }
96   proc_state;
97   casenumber cases_written;     /* Cases output so far. */
98   bool ok;                      /* Error status. */
99   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
100
101   void (*callback) (void *); /* Callback for when the dataset changes */
102   void *cb_data;
103
104 }; /* struct dataset */
105
106
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
109
110 static void update_last_proc_invocation (struct dataset *ds);
111
112 static void
113 dataset_set_unsaved (const struct dataset *ds)
114 {
115   if (ds->callback) ds->callback (ds->cb_data);
116 }
117
118 \f
119 /* Public functions. */
120
121 void
122 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
123 {
124   ds->callback = cb;
125   ds->cb_data = cb_data;
126 }
127
128
129 /* Returns the last time the data was read. */
130 time_t
131 time_of_last_procedure (struct dataset *ds)
132 {
133   if (ds->last_proc_invocation == 0)
134     update_last_proc_invocation (ds);
135   return ds->last_proc_invocation;
136 }
137 \f
138 /* Regular procedure. */
139
140 /* Executes any pending transformations, if necessary.
141    This is not identical to the EXECUTE command in that it won't
142    always read the source data.  This can be important when the
143    source data is given inline within BEGIN DATA...END FILE. */
144 bool
145 proc_execute (struct dataset *ds)
146 {
147   bool ok;
148
149   if ((ds->temporary_trns_chain == NULL
150        || trns_chain_is_empty (ds->temporary_trns_chain))
151       && trns_chain_is_empty (ds->permanent_trns_chain))
152     {
153       ds->n_lag = 0;
154       ds->discard_output = false;
155       dict_set_case_limit (ds->dict, 0);
156       dict_clear_vectors (ds->dict);
157       return true;
158     }
159
160   ok = casereader_destroy (proc_open (ds));
161   return proc_commit (ds) && ok;
162 }
163
164 static const struct casereader_class proc_casereader_class;
165
166 /* Opens dataset DS for reading cases with proc_read.
167    proc_commit must be called when done. */
168 struct casereader *
169 proc_open (struct dataset *ds)
170 {
171   struct casereader *reader;
172
173   assert (ds->source != NULL);
174   assert (ds->proc_state == PROC_COMMITTED);
175
176   update_last_proc_invocation (ds);
177
178   caseinit_mark_for_init (ds->caseinit, ds->dict);
179
180   /* Finish up the collection of transformations. */
181   add_case_limit_trns (ds);
182   add_filter_trns (ds);
183   trns_chain_finalize (ds->cur_trns_chain);
184
185   /* Make permanent_dict refer to the dictionary right before
186      data reaches the sink. */
187   if (ds->permanent_dict == NULL)
188     ds->permanent_dict = ds->dict;
189
190   /* Prepare sink. */
191   if (!ds->discard_output)
192     {
193       struct dictionary *pd = ds->permanent_dict;
194       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
195       if (compacted_value_cnt < dict_get_next_value_idx (pd))
196         {
197           struct caseproto *compacted_proto;
198           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
199           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
200           ds->sink = autopaging_writer_create (compacted_proto);
201           caseproto_unref (compacted_proto);
202         }
203       else
204         {
205           ds->compactor = NULL;
206           ds->sink = autopaging_writer_create (dict_get_proto (pd));
207         }
208     }
209   else
210     {
211       ds->compactor = NULL;
212       ds->sink = NULL;
213     }
214
215   /* Allocate memory for lagged cases. */
216   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
217
218   ds->proc_state = PROC_OPEN;
219   ds->cases_written = 0;
220   ds->ok = true;
221
222   /* FIXME: use taint in dataset in place of `ok'? */
223   /* FIXME: for trivial cases we can just return a clone of
224      ds->source? */
225
226   /* Create casereader and insert a shim on top.  The shim allows us to
227      arbitrarily extend the casereader's lifetime, by slurping the cases into
228      the shim's buffer in proc_commit().  That is especially useful when output
229      table_items are generated directly from the procedure casereader (e.g. by
230      the LIST procedure) when we are using an output driver that keeps a
231      reference to the output items passed to it (e.g. the GUI output driver in
232      PSPPIRE). */
233   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
234                                          CASENUMBER_MAX,
235                                          &proc_casereader_class, ds);
236   ds->shim = casereader_shim_insert (reader);
237   return reader;
238 }
239
240 /* Returns true if a procedure is in progress, that is, if
241    proc_open has been called but proc_commit has not. */
242 bool
243 proc_is_open (const struct dataset *ds)
244 {
245   return ds->proc_state != PROC_COMMITTED;
246 }
247
248 /* "read" function for procedure casereader. */
249 static struct ccase *
250 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
251 {
252   struct dataset *ds = ds_;
253   enum trns_result retval = TRNS_DROP_CASE;
254   struct ccase *c;
255
256   assert (ds->proc_state == PROC_OPEN);
257   for (; ; case_unref (c))
258     {
259       casenumber case_nr;
260
261       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
262       if (retval == TRNS_ERROR)
263         ds->ok = false;
264       if (!ds->ok)
265         return NULL;
266
267       /* Read a case from source. */
268       c = casereader_read (ds->source);
269       if (c == NULL)
270         return NULL;
271       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
272       caseinit_init_vars (ds->caseinit, c);
273
274       /* Execute permanent transformations.  */
275       case_nr = ds->cases_written + 1;
276       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
277                                    &c, case_nr);
278       caseinit_update_left_vars (ds->caseinit, c);
279       if (retval != TRNS_CONTINUE)
280         continue;
281
282       /* Write case to collection of lagged cases. */
283       if (ds->n_lag > 0)
284         {
285           while (deque_count (&ds->lag) >= ds->n_lag)
286             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
287           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
288         }
289
290       /* Write case to replacement active file. */
291       ds->cases_written++;
292       if (ds->sink != NULL)
293         casewriter_write (ds->sink,
294                           case_map_execute (ds->compactor, case_ref (c)));
295
296       /* Execute temporary transformations. */
297       if (ds->temporary_trns_chain != NULL)
298         {
299           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
300                                        &c, ds->cases_written);
301           if (retval != TRNS_CONTINUE)
302             continue;
303         }
304
305       return c;
306     }
307 }
308
309 /* "destroy" function for procedure casereader. */
310 static void
311 proc_casereader_destroy (struct casereader *reader, void *ds_)
312 {
313   struct dataset *ds = ds_;
314   struct ccase *c;
315
316   /* We are always the subreader for a casereader_buffer, so if we're being
317      destroyed then it's because the casereader_buffer has read all the cases
318      that it ever will. */
319   ds->shim = NULL;
320
321   /* Make sure transformations happen for every input case, in
322      case they have side effects, and ensure that the replacement
323      active file gets all the cases it should. */
324   while ((c = casereader_read (reader)) != NULL)
325     case_unref (c);
326
327   ds->proc_state = PROC_CLOSED;
328   ds->ok = casereader_destroy (ds->source) && ds->ok;
329   ds->source = NULL;
330   proc_set_active_file_data (ds, NULL);
331 }
332
333 /* Must return false if the source casereader, a transformation,
334    or the sink casewriter signaled an error.  (If a temporary
335    transformation signals an error, then the return value is
336    false, but the replacement active file may still be
337    untainted.) */
338 bool
339 proc_commit (struct dataset *ds)
340 {
341   if (ds->shim != NULL)
342     casereader_shim_slurp (ds->shim);
343
344   assert (ds->proc_state == PROC_CLOSED);
345   ds->proc_state = PROC_COMMITTED;
346
347   dataset_set_unsaved (ds);
348
349   /* Free memory for lagged cases. */
350   while (!deque_is_empty (&ds->lag))
351     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
352   free (ds->lag_cases);
353
354   /* Dictionary from before TEMPORARY becomes permanent. */
355   proc_cancel_temporary_transformations (ds);
356
357   if (!ds->discard_output)
358     {
359       /* Finish compacting. */
360       if (ds->compactor != NULL)
361         {
362           case_map_destroy (ds->compactor);
363           ds->compactor = NULL;
364
365           dict_delete_scratch_vars (ds->dict);
366           dict_compact_values (ds->dict);
367         }
368
369       /* Old data sink becomes new data source. */
370       if (ds->sink != NULL)
371         ds->source = casewriter_make_reader (ds->sink);
372     }
373   else
374     {
375       ds->source = NULL;
376       ds->discard_output = false;
377     }
378   ds->sink = NULL;
379
380   caseinit_clear (ds->caseinit);
381   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
382
383   dict_clear_vectors (ds->dict);
384   ds->permanent_dict = NULL;
385   return proc_cancel_all_transformations (ds) && ds->ok;
386 }
387
388 /* Casereader class for procedure execution. */
389 static const struct casereader_class proc_casereader_class =
390   {
391     proc_casereader_read,
392     proc_casereader_destroy,
393     NULL,
394     NULL,
395   };
396
397 /* Updates last_proc_invocation. */
398 static void
399 update_last_proc_invocation (struct dataset *ds)
400 {
401   ds->last_proc_invocation = time (NULL);
402 }
403 \f
404 /* Returns a pointer to the lagged case from N_BEFORE cases before the
405    current one, or NULL if there haven't been that many cases yet. */
406 const struct ccase *
407 lagged_case (const struct dataset *ds, int n_before)
408 {
409   assert (n_before >= 1);
410   assert (n_before <= ds->n_lag);
411
412   if (n_before <= deque_count (&ds->lag))
413     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
414   else
415     return NULL;
416 }
417 \f
418 /* Returns the current set of permanent transformations,
419    and clears the permanent transformations.
420    For use by INPUT PROGRAM. */
421 struct trns_chain *
422 proc_capture_transformations (struct dataset *ds)
423 {
424   struct trns_chain *chain;
425
426   assert (ds->temporary_trns_chain == NULL);
427   chain = ds->permanent_trns_chain;
428   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
429
430   if ( ds->xform_callback)
431     ds->xform_callback (false, ds->xform_callback_aux);
432
433   return chain;
434 }
435
436 /* Adds a transformation that processes a case with PROC and
437    frees itself with FREE to the current set of transformations.
438    The functions are passed AUX as auxiliary data. */
439 void
440 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
441 {
442   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
443   if ( ds->xform_callback)
444     ds->xform_callback (true, ds->xform_callback_aux);
445 }
446
447 /* Adds a transformation that processes a case with PROC and
448    frees itself with FREE to the current set of transformations.
449    When parsing of the block of transformations is complete,
450    FINALIZE will be called.
451    The functions are passed AUX as auxiliary data. */
452 void
453 add_transformation_with_finalizer (struct dataset *ds,
454                                    trns_finalize_func *finalize,
455                                    trns_proc_func *proc,
456                                    trns_free_func *free, void *aux)
457 {
458   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
459
460   if ( ds->xform_callback)
461     ds->xform_callback (true, ds->xform_callback_aux);
462 }
463
464 /* Returns the index of the next transformation.
465    This value can be returned by a transformation procedure
466    function to indicate a "jump" to that transformation. */
467 size_t
468 next_transformation (const struct dataset *ds)
469 {
470   return trns_chain_next (ds->cur_trns_chain);
471 }
472
473 /* Returns true if the next call to add_transformation() will add
474    a temporary transformation, false if it will add a permanent
475    transformation. */
476 bool
477 proc_in_temporary_transformations (const struct dataset *ds)
478 {
479   return ds->temporary_trns_chain != NULL;
480 }
481
482 /* Marks the start of temporary transformations.
483    Further calls to add_transformation() will add temporary
484    transformations. */
485 void
486 proc_start_temporary_transformations (struct dataset *ds)
487 {
488   if (!proc_in_temporary_transformations (ds))
489     {
490       add_case_limit_trns (ds);
491
492       ds->permanent_dict = dict_clone (ds->dict);
493
494       trns_chain_finalize (ds->permanent_trns_chain);
495       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
496
497       if ( ds->xform_callback)
498         ds->xform_callback (true, ds->xform_callback_aux);
499     }
500 }
501
502 /* Converts all the temporary transformations, if any, to
503    permanent transformations.  Further transformations will be
504    permanent.
505    Returns true if anything changed, false otherwise. */
506 bool
507 proc_make_temporary_transformations_permanent (struct dataset *ds)
508 {
509   if (proc_in_temporary_transformations (ds))
510     {
511       trns_chain_finalize (ds->temporary_trns_chain);
512       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
513       ds->temporary_trns_chain = NULL;
514
515       dict_destroy (ds->permanent_dict);
516       ds->permanent_dict = NULL;
517
518       return true;
519     }
520   else
521     return false;
522 }
523
524 /* Cancels all temporary transformations, if any.  Further
525    transformations will be permanent.
526    Returns true if anything changed, false otherwise. */
527 bool
528 proc_cancel_temporary_transformations (struct dataset *ds)
529 {
530   if (proc_in_temporary_transformations (ds))
531     {
532       dict_destroy (ds->dict);
533       ds->dict = ds->permanent_dict;
534       ds->permanent_dict = NULL;
535
536       trns_chain_destroy (ds->temporary_trns_chain);
537       ds->temporary_trns_chain = NULL;
538
539       if ( ds->xform_callback)
540         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
541                             ds->xform_callback_aux);
542
543       return true;
544     }
545   else
546     return false;
547 }
548
549 /* Cancels all transformations, if any.
550    Returns true if successful, false on I/O error. */
551 bool
552 proc_cancel_all_transformations (struct dataset *ds)
553 {
554   bool ok;
555   assert (ds->proc_state == PROC_COMMITTED);
556   ok = trns_chain_destroy (ds->permanent_trns_chain);
557   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
558   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
559   ds->temporary_trns_chain = NULL;
560   if ( ds->xform_callback)
561     ds->xform_callback (false, ds->xform_callback_aux);
562
563   return ok;
564 }
565 \f
566
567 static void
568 dict_callback (struct dictionary *d UNUSED, void *ds_)
569 {
570   struct dataset *ds = ds_;
571   dataset_set_unsaved (ds);
572 }
573
574 /* Initializes procedure handling. */
575 struct dataset *
576 create_dataset (void)
577 {
578   struct dataset *ds = xzalloc (sizeof(*ds));
579   ds->dict = dict_create ();
580
581   dict_set_change_callback (ds->dict, dict_callback, ds);
582
583   dict_set_encoding (ds->dict, get_default_encoding ());
584
585   ds->caseinit = caseinit_create ();
586   proc_cancel_all_transformations (ds);
587   return ds;
588 }
589
590
591 void
592 dataset_add_transform_change_callback (struct dataset *ds,
593                                        transformation_change_callback_func *cb,
594                                        void *aux)
595 {
596   ds->xform_callback = cb;
597   ds->xform_callback_aux = aux;
598 }
599
600 /* Finishes up procedure handling. */
601 void
602 destroy_dataset (struct dataset *ds)
603 {
604   proc_discard_active_file (ds);
605   dict_destroy (ds->dict);
606   caseinit_destroy (ds->caseinit);
607   trns_chain_destroy (ds->permanent_trns_chain);
608
609   if ( ds->xform_callback)
610     ds->xform_callback (false, ds->xform_callback_aux);
611   free (ds);
612 }
613
614 /* Causes output from the next procedure to be discarded, instead
615    of being preserved for use as input for the next procedure. */
616 void
617 proc_discard_output (struct dataset *ds)
618 {
619   ds->discard_output = true;
620 }
621
622 /* Discards the active file dictionary, data, and
623    transformations. */
624 void
625 proc_discard_active_file (struct dataset *ds)
626 {
627   assert (ds->proc_state == PROC_COMMITTED);
628
629   dict_clear (ds->dict);
630   fh_set_default_handle (NULL);
631
632   ds->n_lag = 0;
633
634   casereader_destroy (ds->source);
635   ds->source = NULL;
636
637   proc_cancel_all_transformations (ds);
638 }
639
640 /* Sets SOURCE as the source for procedure input for the next
641    procedure. */
642 void
643 proc_set_active_file (struct dataset *ds,
644                       struct casereader *source,
645                       struct dictionary *dict)
646 {
647   assert (ds->proc_state == PROC_COMMITTED);
648   assert (ds->dict != dict);
649
650   proc_discard_active_file (ds);
651
652   dict_destroy (ds->dict);
653   ds->dict = dict;
654   dict_set_change_callback (ds->dict, dict_callback, ds);
655
656   proc_set_active_file_data (ds, source);
657 }
658
659 /* Replaces the active file's data by READER without replacing
660    the associated dictionary. */
661 bool
662 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
663 {
664   casereader_destroy (ds->source);
665   ds->source = reader;
666
667   caseinit_clear (ds->caseinit);
668   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
669
670   return reader == NULL || !casereader_error (reader);
671 }
672
673 /* Returns true if an active file data source is available, false
674    otherwise. */
675 bool
676 proc_has_active_file (const struct dataset *ds)
677 {
678   return ds->source != NULL;
679 }
680
681 /* Returns the active file data source from DS, or a null pointer
682    if DS has no data source, and removes it from DS. */
683 struct casereader *
684 proc_extract_active_file_data (struct dataset *ds)
685 {
686   struct casereader *reader = ds->source;
687   ds->source = NULL;
688
689   return reader;
690 }
691
692 /* Checks whether DS has a corrupted active file.  If so,
693    discards it and returns false.  If not, returns true without
694    doing anything. */
695 bool
696 dataset_end_of_command (struct dataset *ds)
697 {
698   if (ds->source != NULL)
699     {
700       if (casereader_error (ds->source))
701         {
702           proc_discard_active_file (ds);
703           return false;
704         }
705       else
706         {
707           const struct taint *taint = casereader_get_taint (ds->source);
708           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
709           assert (!taint_has_tainted_successor (taint));
710         }
711     }
712   return true;
713 }
714 \f
715 static trns_proc_func case_limit_trns_proc;
716 static trns_free_func case_limit_trns_free;
717
718 /* Adds a transformation that limits the number of cases that may
719    pass through, if DS->DICT has a case limit. */
720 static void
721 add_case_limit_trns (struct dataset *ds)
722 {
723   casenumber case_limit = dict_get_case_limit (ds->dict);
724   if (case_limit != 0)
725     {
726       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
727       *cases_remaining = case_limit;
728       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
729                           cases_remaining);
730       dict_set_case_limit (ds->dict, 0);
731     }
732 }
733
734 /* Limits the maximum number of cases processed to
735    *CASES_REMAINING. */
736 static int
737 case_limit_trns_proc (void *cases_remaining_,
738                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
739 {
740   size_t *cases_remaining = cases_remaining_;
741   if (*cases_remaining > 0)
742     {
743       (*cases_remaining)--;
744       return TRNS_CONTINUE;
745     }
746   else
747     return TRNS_DROP_CASE;
748 }
749
750 /* Frees the data associated with a case limit transformation. */
751 static bool
752 case_limit_trns_free (void *cases_remaining_)
753 {
754   size_t *cases_remaining = cases_remaining_;
755   free (cases_remaining);
756   return true;
757 }
758 \f
759 static trns_proc_func filter_trns_proc;
760
761 /* Adds a temporary transformation to filter data according to
762    the variable specified on FILTER, if any. */
763 static void
764 add_filter_trns (struct dataset *ds)
765 {
766   struct variable *filter_var = dict_get_filter (ds->dict);
767   if (filter_var != NULL)
768     {
769       proc_start_temporary_transformations (ds);
770       add_transformation (ds, filter_trns_proc, NULL, filter_var);
771     }
772 }
773
774 /* FILTER transformation. */
775 static int
776 filter_trns_proc (void *filter_var_,
777                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
778
779 {
780   struct variable *filter_var = filter_var_;
781   double f = case_num (*c, filter_var);
782   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
783           ? TRNS_CONTINUE : TRNS_DROP_CASE);
784 }
785
786
787 struct dictionary *
788 dataset_dict (const struct dataset *ds)
789 {
790   return ds->dict;
791 }
792
793 const struct casereader *
794 dataset_source (const struct dataset *ds)
795 {
796   return ds->source;
797 }
798
799 void
800 dataset_need_lag (struct dataset *ds, int n_before)
801 {
802   ds->n_lag = MAX (ds->n_lag, n_before);
803 }