fa52806f60f6795154234af340b8226f95f5172d
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/procedure.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* Callback which occurs whenever the transformation chain(s) have
64      been modified */
65   transformation_change_callback_func *xform_callback;
66   void *xform_callback_aux;
67
68   /* If true, cases are discarded instead of being written to
69      sink. */
70   bool discard_output;
71
72   /* The transformation chain that the next transformation will be
73      added to. */
74   struct trns_chain *cur_trns_chain;
75
76   /* The case map used to compact a case, if necessary;
77      otherwise a null pointer. */
78   struct case_map *compactor;
79
80   /* Time at which proc was last invoked. */
81   time_t last_proc_invocation;
82
83   /* Cases just before ("lagging") the current one. */
84   int n_lag;                    /* Number of cases to lag. */
85   struct deque lag;             /* Deque of lagged cases. */
86   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
87
88   /* Procedure data. */
89   enum
90     {
91       PROC_COMMITTED,           /* No procedure in progress. */
92       PROC_OPEN,                /* proc_open called, casereader still open. */
93       PROC_CLOSED               /* casereader from proc_open destroyed,
94                                    but proc_commit not yet called. */
95     }
96   proc_state;
97   casenumber cases_written;     /* Cases output so far. */
98   bool ok;                      /* Error status. */
99   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
100
101   void (*callback) (void *); /* Callback for when the dataset changes */
102   void *cb_data;
103
104 }; /* struct dataset */
105
106
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
109
110 static void update_last_proc_invocation (struct dataset *ds);
111
112 static void
113 dataset_set_unsaved (const struct dataset *ds)
114 {
115   if (ds->callback) ds->callback (ds->cb_data);
116 }
117
118 \f
119 /* Public functions. */
120
121 void
122 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
123 {
124   ds->callback = cb;
125   ds->cb_data = cb_data;
126 }
127
128
129 /* Returns the last time the data was read. */
130 time_t
131 time_of_last_procedure (struct dataset *ds)
132 {
133   if (ds->last_proc_invocation == 0)
134     update_last_proc_invocation (ds);
135   return ds->last_proc_invocation;
136 }
137 \f
138 /* Regular procedure. */
139
140 /* Executes any pending transformations, if necessary.
141    This is not identical to the EXECUTE command in that it won't
142    always read the source data.  This can be important when the
143    source data is given inline within BEGIN DATA...END FILE. */
144 bool
145 proc_execute (struct dataset *ds)
146 {
147   bool ok;
148
149   if ((ds->temporary_trns_chain == NULL
150        || trns_chain_is_empty (ds->temporary_trns_chain))
151       && trns_chain_is_empty (ds->permanent_trns_chain))
152     {
153       ds->n_lag = 0;
154       ds->discard_output = false;
155       dict_set_case_limit (ds->dict, 0);
156       dict_clear_vectors (ds->dict);
157       return true;
158     }
159
160   ok = casereader_destroy (proc_open (ds));
161   return proc_commit (ds) && ok;
162 }
163
164 static const struct casereader_class proc_casereader_class;
165
166 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
167    cases filtered out with FILTER BY will not be included in the casereader
168    (which is usually desirable).  If FILTER is false, all cases will be
169    included regardless of FILTER BY settings.
170
171    proc_commit must be called when done. */
172 struct casereader *
173 proc_open_filtering (struct dataset *ds, bool filter)
174 {
175   struct casereader *reader;
176
177   assert (ds->source != NULL);
178   assert (ds->proc_state == PROC_COMMITTED);
179
180   update_last_proc_invocation (ds);
181
182   caseinit_mark_for_init (ds->caseinit, ds->dict);
183
184   /* Finish up the collection of transformations. */
185   add_case_limit_trns (ds);
186   if (filter)
187     add_filter_trns (ds);
188   trns_chain_finalize (ds->cur_trns_chain);
189
190   /* Make permanent_dict refer to the dictionary right before
191      data reaches the sink. */
192   if (ds->permanent_dict == NULL)
193     ds->permanent_dict = ds->dict;
194
195   /* Prepare sink. */
196   if (!ds->discard_output)
197     {
198       struct dictionary *pd = ds->permanent_dict;
199       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
200       if (compacted_value_cnt < dict_get_next_value_idx (pd))
201         {
202           struct caseproto *compacted_proto;
203           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
204           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
205           ds->sink = autopaging_writer_create (compacted_proto);
206           caseproto_unref (compacted_proto);
207         }
208       else
209         {
210           ds->compactor = NULL;
211           ds->sink = autopaging_writer_create (dict_get_proto (pd));
212         }
213     }
214   else
215     {
216       ds->compactor = NULL;
217       ds->sink = NULL;
218     }
219
220   /* Allocate memory for lagged cases. */
221   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
222
223   ds->proc_state = PROC_OPEN;
224   ds->cases_written = 0;
225   ds->ok = true;
226
227   /* FIXME: use taint in dataset in place of `ok'? */
228   /* FIXME: for trivial cases we can just return a clone of
229      ds->source? */
230
231   /* Create casereader and insert a shim on top.  The shim allows us to
232      arbitrarily extend the casereader's lifetime, by slurping the cases into
233      the shim's buffer in proc_commit().  That is especially useful when output
234      table_items are generated directly from the procedure casereader (e.g. by
235      the LIST procedure) when we are using an output driver that keeps a
236      reference to the output items passed to it (e.g. the GUI output driver in
237      PSPPIRE). */
238   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
239                                          CASENUMBER_MAX,
240                                          &proc_casereader_class, ds);
241   ds->shim = casereader_shim_insert (reader);
242   return reader;
243 }
244
245 /* Opens dataset DS for reading cases with proc_read.
246    proc_commit must be called when done. */
247 struct casereader *
248 proc_open (struct dataset *ds)
249 {
250   return proc_open_filtering (ds, true);
251 }
252
253 /* Returns true if a procedure is in progress, that is, if
254    proc_open has been called but proc_commit has not. */
255 bool
256 proc_is_open (const struct dataset *ds)
257 {
258   return ds->proc_state != PROC_COMMITTED;
259 }
260
261 /* "read" function for procedure casereader. */
262 static struct ccase *
263 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
264 {
265   struct dataset *ds = ds_;
266   enum trns_result retval = TRNS_DROP_CASE;
267   struct ccase *c;
268
269   assert (ds->proc_state == PROC_OPEN);
270   for (; ; case_unref (c))
271     {
272       casenumber case_nr;
273
274       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
275       if (retval == TRNS_ERROR)
276         ds->ok = false;
277       if (!ds->ok)
278         return NULL;
279
280       /* Read a case from source. */
281       c = casereader_read (ds->source);
282       if (c == NULL)
283         return NULL;
284       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
285       caseinit_init_vars (ds->caseinit, c);
286
287       /* Execute permanent transformations.  */
288       case_nr = ds->cases_written + 1;
289       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
290                                    &c, case_nr);
291       caseinit_update_left_vars (ds->caseinit, c);
292       if (retval != TRNS_CONTINUE)
293         continue;
294
295       /* Write case to collection of lagged cases. */
296       if (ds->n_lag > 0)
297         {
298           while (deque_count (&ds->lag) >= ds->n_lag)
299             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
300           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
301         }
302
303       /* Write case to replacement active file. */
304       ds->cases_written++;
305       if (ds->sink != NULL)
306         casewriter_write (ds->sink,
307                           case_map_execute (ds->compactor, case_ref (c)));
308
309       /* Execute temporary transformations. */
310       if (ds->temporary_trns_chain != NULL)
311         {
312           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
313                                        &c, ds->cases_written);
314           if (retval != TRNS_CONTINUE)
315             continue;
316         }
317
318       return c;
319     }
320 }
321
322 /* "destroy" function for procedure casereader. */
323 static void
324 proc_casereader_destroy (struct casereader *reader, void *ds_)
325 {
326   struct dataset *ds = ds_;
327   struct ccase *c;
328
329   /* We are always the subreader for a casereader_buffer, so if we're being
330      destroyed then it's because the casereader_buffer has read all the cases
331      that it ever will. */
332   ds->shim = NULL;
333
334   /* Make sure transformations happen for every input case, in
335      case they have side effects, and ensure that the replacement
336      active file gets all the cases it should. */
337   while ((c = casereader_read (reader)) != NULL)
338     case_unref (c);
339
340   ds->proc_state = PROC_CLOSED;
341   ds->ok = casereader_destroy (ds->source) && ds->ok;
342   ds->source = NULL;
343   proc_set_active_file_data (ds, NULL);
344 }
345
346 /* Must return false if the source casereader, a transformation,
347    or the sink casewriter signaled an error.  (If a temporary
348    transformation signals an error, then the return value is
349    false, but the replacement active file may still be
350    untainted.) */
351 bool
352 proc_commit (struct dataset *ds)
353 {
354   if (ds->shim != NULL)
355     casereader_shim_slurp (ds->shim);
356
357   assert (ds->proc_state == PROC_CLOSED);
358   ds->proc_state = PROC_COMMITTED;
359
360   dataset_set_unsaved (ds);
361
362   /* Free memory for lagged cases. */
363   while (!deque_is_empty (&ds->lag))
364     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
365   free (ds->lag_cases);
366
367   /* Dictionary from before TEMPORARY becomes permanent. */
368   proc_cancel_temporary_transformations (ds);
369
370   if (!ds->discard_output)
371     {
372       /* Finish compacting. */
373       if (ds->compactor != NULL)
374         {
375           case_map_destroy (ds->compactor);
376           ds->compactor = NULL;
377
378           dict_delete_scratch_vars (ds->dict);
379           dict_compact_values (ds->dict);
380         }
381
382       /* Old data sink becomes new data source. */
383       if (ds->sink != NULL)
384         ds->source = casewriter_make_reader (ds->sink);
385     }
386   else
387     {
388       ds->source = NULL;
389       ds->discard_output = false;
390     }
391   ds->sink = NULL;
392
393   caseinit_clear (ds->caseinit);
394   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
395
396   dict_clear_vectors (ds->dict);
397   ds->permanent_dict = NULL;
398   return proc_cancel_all_transformations (ds) && ds->ok;
399 }
400
401 /* Casereader class for procedure execution. */
402 static const struct casereader_class proc_casereader_class =
403   {
404     proc_casereader_read,
405     proc_casereader_destroy,
406     NULL,
407     NULL,
408   };
409
410 /* Updates last_proc_invocation. */
411 static void
412 update_last_proc_invocation (struct dataset *ds)
413 {
414   ds->last_proc_invocation = time (NULL);
415 }
416 \f
417 /* Returns a pointer to the lagged case from N_BEFORE cases before the
418    current one, or NULL if there haven't been that many cases yet. */
419 const struct ccase *
420 lagged_case (const struct dataset *ds, int n_before)
421 {
422   assert (n_before >= 1);
423   assert (n_before <= ds->n_lag);
424
425   if (n_before <= deque_count (&ds->lag))
426     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
427   else
428     return NULL;
429 }
430 \f
431 /* Returns the current set of permanent transformations,
432    and clears the permanent transformations.
433    For use by INPUT PROGRAM. */
434 struct trns_chain *
435 proc_capture_transformations (struct dataset *ds)
436 {
437   struct trns_chain *chain;
438
439   assert (ds->temporary_trns_chain == NULL);
440   chain = ds->permanent_trns_chain;
441   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
442
443   if ( ds->xform_callback)
444     ds->xform_callback (false, ds->xform_callback_aux);
445
446   return chain;
447 }
448
449 /* Adds a transformation that processes a case with PROC and
450    frees itself with FREE to the current set of transformations.
451    The functions are passed AUX as auxiliary data. */
452 void
453 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
454 {
455   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
456   if ( ds->xform_callback)
457     ds->xform_callback (true, ds->xform_callback_aux);
458 }
459
460 /* Adds a transformation that processes a case with PROC and
461    frees itself with FREE to the current set of transformations.
462    When parsing of the block of transformations is complete,
463    FINALIZE will be called.
464    The functions are passed AUX as auxiliary data. */
465 void
466 add_transformation_with_finalizer (struct dataset *ds,
467                                    trns_finalize_func *finalize,
468                                    trns_proc_func *proc,
469                                    trns_free_func *free, void *aux)
470 {
471   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
472
473   if ( ds->xform_callback)
474     ds->xform_callback (true, ds->xform_callback_aux);
475 }
476
477 /* Returns the index of the next transformation.
478    This value can be returned by a transformation procedure
479    function to indicate a "jump" to that transformation. */
480 size_t
481 next_transformation (const struct dataset *ds)
482 {
483   return trns_chain_next (ds->cur_trns_chain);
484 }
485
486 /* Returns true if the next call to add_transformation() will add
487    a temporary transformation, false if it will add a permanent
488    transformation. */
489 bool
490 proc_in_temporary_transformations (const struct dataset *ds)
491 {
492   return ds->temporary_trns_chain != NULL;
493 }
494
495 /* Marks the start of temporary transformations.
496    Further calls to add_transformation() will add temporary
497    transformations. */
498 void
499 proc_start_temporary_transformations (struct dataset *ds)
500 {
501   if (!proc_in_temporary_transformations (ds))
502     {
503       add_case_limit_trns (ds);
504
505       ds->permanent_dict = dict_clone (ds->dict);
506
507       trns_chain_finalize (ds->permanent_trns_chain);
508       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
509
510       if ( ds->xform_callback)
511         ds->xform_callback (true, ds->xform_callback_aux);
512     }
513 }
514
515 /* Converts all the temporary transformations, if any, to
516    permanent transformations.  Further transformations will be
517    permanent.
518    Returns true if anything changed, false otherwise. */
519 bool
520 proc_make_temporary_transformations_permanent (struct dataset *ds)
521 {
522   if (proc_in_temporary_transformations (ds))
523     {
524       trns_chain_finalize (ds->temporary_trns_chain);
525       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
526       ds->temporary_trns_chain = NULL;
527
528       dict_destroy (ds->permanent_dict);
529       ds->permanent_dict = NULL;
530
531       return true;
532     }
533   else
534     return false;
535 }
536
537 /* Cancels all temporary transformations, if any.  Further
538    transformations will be permanent.
539    Returns true if anything changed, false otherwise. */
540 bool
541 proc_cancel_temporary_transformations (struct dataset *ds)
542 {
543   if (proc_in_temporary_transformations (ds))
544     {
545       dict_destroy (ds->dict);
546       ds->dict = ds->permanent_dict;
547       ds->permanent_dict = NULL;
548
549       trns_chain_destroy (ds->temporary_trns_chain);
550       ds->temporary_trns_chain = NULL;
551
552       if ( ds->xform_callback)
553         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
554                             ds->xform_callback_aux);
555
556       return true;
557     }
558   else
559     return false;
560 }
561
562 /* Cancels all transformations, if any.
563    Returns true if successful, false on I/O error. */
564 bool
565 proc_cancel_all_transformations (struct dataset *ds)
566 {
567   bool ok;
568   assert (ds->proc_state == PROC_COMMITTED);
569   ok = trns_chain_destroy (ds->permanent_trns_chain);
570   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
571   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
572   ds->temporary_trns_chain = NULL;
573   if ( ds->xform_callback)
574     ds->xform_callback (false, ds->xform_callback_aux);
575
576   return ok;
577 }
578 \f
579
580 static void
581 dict_callback (struct dictionary *d UNUSED, void *ds_)
582 {
583   struct dataset *ds = ds_;
584   dataset_set_unsaved (ds);
585 }
586
587 /* Initializes procedure handling. */
588 struct dataset *
589 create_dataset (void)
590 {
591   struct dataset *ds = xzalloc (sizeof(*ds));
592   ds->dict = dict_create ();
593
594   dict_set_change_callback (ds->dict, dict_callback, ds);
595
596   dict_set_encoding (ds->dict, get_default_encoding ());
597
598   ds->caseinit = caseinit_create ();
599   proc_cancel_all_transformations (ds);
600   return ds;
601 }
602
603
604 void
605 dataset_add_transform_change_callback (struct dataset *ds,
606                                        transformation_change_callback_func *cb,
607                                        void *aux)
608 {
609   ds->xform_callback = cb;
610   ds->xform_callback_aux = aux;
611 }
612
613 /* Finishes up procedure handling. */
614 void
615 destroy_dataset (struct dataset *ds)
616 {
617   proc_discard_active_file (ds);
618   dict_destroy (ds->dict);
619   caseinit_destroy (ds->caseinit);
620   trns_chain_destroy (ds->permanent_trns_chain);
621
622   if ( ds->xform_callback)
623     ds->xform_callback (false, ds->xform_callback_aux);
624   free (ds);
625 }
626
627 /* Causes output from the next procedure to be discarded, instead
628    of being preserved for use as input for the next procedure. */
629 void
630 proc_discard_output (struct dataset *ds)
631 {
632   ds->discard_output = true;
633 }
634
635 /* Discards the active file dictionary, data, and
636    transformations. */
637 void
638 proc_discard_active_file (struct dataset *ds)
639 {
640   assert (ds->proc_state == PROC_COMMITTED);
641
642   dict_clear (ds->dict);
643   fh_set_default_handle (NULL);
644
645   ds->n_lag = 0;
646
647   casereader_destroy (ds->source);
648   ds->source = NULL;
649
650   proc_cancel_all_transformations (ds);
651 }
652
653 /* Sets SOURCE as the source for procedure input for the next
654    procedure. */
655 void
656 proc_set_active_file (struct dataset *ds,
657                       struct casereader *source,
658                       struct dictionary *dict)
659 {
660   assert (ds->proc_state == PROC_COMMITTED);
661   assert (ds->dict != dict);
662
663   proc_discard_active_file (ds);
664
665   dict_destroy (ds->dict);
666   ds->dict = dict;
667   dict_set_change_callback (ds->dict, dict_callback, ds);
668
669   proc_set_active_file_data (ds, source);
670 }
671
672 /* Replaces the active file's data by READER without replacing
673    the associated dictionary. */
674 bool
675 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
676 {
677   casereader_destroy (ds->source);
678   ds->source = reader;
679
680   caseinit_clear (ds->caseinit);
681   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
682
683   return reader == NULL || !casereader_error (reader);
684 }
685
686 /* Returns true if an active file data source is available, false
687    otherwise. */
688 bool
689 proc_has_active_file (const struct dataset *ds)
690 {
691   return ds->source != NULL;
692 }
693
694 /* Returns the active file data source from DS, or a null pointer
695    if DS has no data source, and removes it from DS. */
696 struct casereader *
697 proc_extract_active_file_data (struct dataset *ds)
698 {
699   struct casereader *reader = ds->source;
700   ds->source = NULL;
701
702   return reader;
703 }
704
705 /* Checks whether DS has a corrupted active file.  If so,
706    discards it and returns false.  If not, returns true without
707    doing anything. */
708 bool
709 dataset_end_of_command (struct dataset *ds)
710 {
711   if (ds->source != NULL)
712     {
713       if (casereader_error (ds->source))
714         {
715           proc_discard_active_file (ds);
716           return false;
717         }
718       else
719         {
720           const struct taint *taint = casereader_get_taint (ds->source);
721           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
722           assert (!taint_has_tainted_successor (taint));
723         }
724     }
725   return true;
726 }
727 \f
728 static trns_proc_func case_limit_trns_proc;
729 static trns_free_func case_limit_trns_free;
730
731 /* Adds a transformation that limits the number of cases that may
732    pass through, if DS->DICT has a case limit. */
733 static void
734 add_case_limit_trns (struct dataset *ds)
735 {
736   casenumber case_limit = dict_get_case_limit (ds->dict);
737   if (case_limit != 0)
738     {
739       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
740       *cases_remaining = case_limit;
741       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
742                           cases_remaining);
743       dict_set_case_limit (ds->dict, 0);
744     }
745 }
746
747 /* Limits the maximum number of cases processed to
748    *CASES_REMAINING. */
749 static int
750 case_limit_trns_proc (void *cases_remaining_,
751                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
752 {
753   size_t *cases_remaining = cases_remaining_;
754   if (*cases_remaining > 0)
755     {
756       (*cases_remaining)--;
757       return TRNS_CONTINUE;
758     }
759   else
760     return TRNS_DROP_CASE;
761 }
762
763 /* Frees the data associated with a case limit transformation. */
764 static bool
765 case_limit_trns_free (void *cases_remaining_)
766 {
767   size_t *cases_remaining = cases_remaining_;
768   free (cases_remaining);
769   return true;
770 }
771 \f
772 static trns_proc_func filter_trns_proc;
773
774 /* Adds a temporary transformation to filter data according to
775    the variable specified on FILTER, if any. */
776 static void
777 add_filter_trns (struct dataset *ds)
778 {
779   struct variable *filter_var = dict_get_filter (ds->dict);
780   if (filter_var != NULL)
781     {
782       proc_start_temporary_transformations (ds);
783       add_transformation (ds, filter_trns_proc, NULL, filter_var);
784     }
785 }
786
787 /* FILTER transformation. */
788 static int
789 filter_trns_proc (void *filter_var_,
790                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
791
792 {
793   struct variable *filter_var = filter_var_;
794   double f = case_num (*c, filter_var);
795   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
796           ? TRNS_CONTINUE : TRNS_DROP_CASE);
797 }
798
799
800 struct dictionary *
801 dataset_dict (const struct dataset *ds)
802 {
803   return ds->dict;
804 }
805
806 const struct casereader *
807 dataset_source (const struct dataset *ds)
808 {
809   return ds->source;
810 }
811
812 void
813 dataset_need_lag (struct dataset *ds, int n_before)
814 {
815   ds->n_lag = MAX (ds->n_lag, n_before);
816 }