dataset: Use similar form to dictionary code for callbacks, and document.
[pspp-builds.git] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* If true, cases are discarded instead of being written to
64      sink. */
65   bool discard_output;
66
67   /* The transformation chain that the next transformation will be
68      added to. */
69   struct trns_chain *cur_trns_chain;
70
71   /* The case map used to compact a case, if necessary;
72      otherwise a null pointer. */
73   struct case_map *compactor;
74
75   /* Time at which proc was last invoked. */
76   time_t last_proc_invocation;
77
78   /* Cases just before ("lagging") the current one. */
79   int n_lag;                    /* Number of cases to lag. */
80   struct deque lag;             /* Deque of lagged cases. */
81   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
82
83   /* Procedure data. */
84   enum
85     {
86       PROC_COMMITTED,           /* No procedure in progress. */
87       PROC_OPEN,                /* proc_open called, casereader still open. */
88       PROC_CLOSED               /* casereader from proc_open destroyed,
89                                    but proc_commit not yet called. */
90     }
91   proc_state;
92   casenumber cases_written;     /* Cases output so far. */
93   bool ok;                      /* Error status. */
94   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
95
96   const struct dataset_callbacks *callbacks;
97   void *cb_data;
98
99   /* Default encoding for reading syntax files. */
100   char *syntax_encoding;
101 };
102
103 static void dataset_changed__ (struct dataset *);
104 static void dataset_transformations_changed__ (struct dataset *,
105                                                bool non_empty);
106
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
109
110 static void update_last_proc_invocation (struct dataset *ds);
111
112 static void
113 dict_callback (struct dictionary *d UNUSED, void *ds_)
114 {
115   struct dataset *ds = ds_;
116   dataset_changed__ (ds);
117 }
118 \f
119 /* Creates and returns a new dataset.  The dataset initially has an empty
120    dictionary and no data source. */
121 struct dataset *
122 dataset_create (void)
123 {
124   struct dataset *ds;
125
126   ds = xzalloc (sizeof *ds);
127   ds->dict = dict_create ();
128   dict_set_change_callback (ds->dict, dict_callback, ds);
129   dict_set_encoding (ds->dict, get_default_encoding ());
130
131   ds->caseinit = caseinit_create ();
132   proc_cancel_all_transformations (ds);
133   ds->syntax_encoding = xstrdup ("Auto");
134   return ds;
135 }
136
137 /* Destroys DS. */
138 void
139 dataset_destroy (struct dataset *ds)
140 {
141   if (ds != NULL)
142     {
143       dataset_clear (ds);
144       dict_destroy (ds->dict);
145       caseinit_destroy (ds->caseinit);
146       trns_chain_destroy (ds->permanent_trns_chain);
147       dataset_transformations_changed__ (ds, false);
148       free (ds->syntax_encoding);
149       free (ds);
150     }
151 }
152
153 /* Discards the active file dictionary, data, and transformations. */
154 void
155 dataset_clear (struct dataset *ds)
156 {
157   assert (ds->proc_state == PROC_COMMITTED);
158
159   dict_clear (ds->dict);
160   fh_set_default_handle (NULL);
161
162   ds->n_lag = 0;
163
164   casereader_destroy (ds->source);
165   ds->source = NULL;
166
167   proc_cancel_all_transformations (ds);
168 }
169
170 /* Returns the dictionary within DS.  This is always nonnull, although it
171    might not contain any variables. */
172 struct dictionary *
173 dataset_dict (const struct dataset *ds)
174 {
175   return ds->dict;
176 }
177
178 /* Replaces DS's dictionary by DICT, discarding any source and
179    transformations. */
180 void
181 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
182 {
183   assert (ds->proc_state == PROC_COMMITTED);
184   assert (ds->dict != dict);
185
186   dataset_clear (ds);
187
188   dict_destroy (ds->dict);
189   ds->dict = dict;
190   dict_set_change_callback (ds->dict, dict_callback, ds);
191 }
192
193 /* Returns the casereader that will be read when a procedure is executed on
194    DS.  This can be NULL if none has been set up yet. */
195 const struct casereader *
196 dataset_source (const struct dataset *ds)
197 {
198   return ds->source;
199 }
200
201 /* Returns true if DS has a data source, false otherwise. */
202 bool
203 dataset_has_source (const struct dataset *ds)
204 {
205   return dataset_source (ds) != NULL;
206 }
207
208 /* Replaces the active file's data by READER.  READER's cases must have an
209    appropriate format for DS's dictionary. */
210 bool
211 dataset_set_source (struct dataset *ds, struct casereader *reader)
212 {
213   casereader_destroy (ds->source);
214   ds->source = reader;
215
216   caseinit_clear (ds->caseinit);
217   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
218
219   return reader == NULL || !casereader_error (reader);
220 }
221
222 /* Returns the data source from DS and removes it from DS.  Returns a null
223    pointer if DS has no data source. */
224 struct casereader *
225 dataset_steal_source (struct dataset *ds)
226 {
227   struct casereader *reader = ds->source;
228   ds->source = NULL;
229
230   return reader;
231 }
232
233 void
234 dataset_set_callbacks (struct dataset *ds,
235                        const struct dataset_callbacks *callbacks,
236                        void *cb_data)
237 {
238   ds->callbacks = callbacks;
239   ds->cb_data = cb_data;
240 }
241
242 void
243 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
244 {
245   free (ds->syntax_encoding);
246   ds->syntax_encoding = xstrdup (encoding);
247 }
248
249 const char *
250 dataset_get_default_syntax_encoding (const struct dataset *ds)
251 {
252   return ds->syntax_encoding;
253 }
254 \f
255 /* Returns the last time the data was read. */
256 time_t
257 time_of_last_procedure (struct dataset *ds)
258 {
259   if (ds->last_proc_invocation == 0)
260     update_last_proc_invocation (ds);
261   return ds->last_proc_invocation;
262 }
263 \f
264 /* Regular procedure. */
265
266 /* Executes any pending transformations, if necessary.
267    This is not identical to the EXECUTE command in that it won't
268    always read the source data.  This can be important when the
269    source data is given inline within BEGIN DATA...END FILE. */
270 bool
271 proc_execute (struct dataset *ds)
272 {
273   bool ok;
274
275   if ((ds->temporary_trns_chain == NULL
276        || trns_chain_is_empty (ds->temporary_trns_chain))
277       && trns_chain_is_empty (ds->permanent_trns_chain))
278     {
279       ds->n_lag = 0;
280       ds->discard_output = false;
281       dict_set_case_limit (ds->dict, 0);
282       dict_clear_vectors (ds->dict);
283       return true;
284     }
285
286   ok = casereader_destroy (proc_open (ds));
287   return proc_commit (ds) && ok;
288 }
289
290 static const struct casereader_class proc_casereader_class;
291
292 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
293    cases filtered out with FILTER BY will not be included in the casereader
294    (which is usually desirable).  If FILTER is false, all cases will be
295    included regardless of FILTER BY settings.
296
297    proc_commit must be called when done. */
298 struct casereader *
299 proc_open_filtering (struct dataset *ds, bool filter)
300 {
301   struct casereader *reader;
302
303   assert (ds->source != NULL);
304   assert (ds->proc_state == PROC_COMMITTED);
305
306   update_last_proc_invocation (ds);
307
308   caseinit_mark_for_init (ds->caseinit, ds->dict);
309
310   /* Finish up the collection of transformations. */
311   add_case_limit_trns (ds);
312   if (filter)
313     add_filter_trns (ds);
314   trns_chain_finalize (ds->cur_trns_chain);
315
316   /* Make permanent_dict refer to the dictionary right before
317      data reaches the sink. */
318   if (ds->permanent_dict == NULL)
319     ds->permanent_dict = ds->dict;
320
321   /* Prepare sink. */
322   if (!ds->discard_output)
323     {
324       struct dictionary *pd = ds->permanent_dict;
325       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
326       if (compacted_value_cnt < dict_get_next_value_idx (pd))
327         {
328           struct caseproto *compacted_proto;
329           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
330           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
331           ds->sink = autopaging_writer_create (compacted_proto);
332           caseproto_unref (compacted_proto);
333         }
334       else
335         {
336           ds->compactor = NULL;
337           ds->sink = autopaging_writer_create (dict_get_proto (pd));
338         }
339     }
340   else
341     {
342       ds->compactor = NULL;
343       ds->sink = NULL;
344     }
345
346   /* Allocate memory for lagged cases. */
347   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
348
349   ds->proc_state = PROC_OPEN;
350   ds->cases_written = 0;
351   ds->ok = true;
352
353   /* FIXME: use taint in dataset in place of `ok'? */
354   /* FIXME: for trivial cases we can just return a clone of
355      ds->source? */
356
357   /* Create casereader and insert a shim on top.  The shim allows us to
358      arbitrarily extend the casereader's lifetime, by slurping the cases into
359      the shim's buffer in proc_commit().  That is especially useful when output
360      table_items are generated directly from the procedure casereader (e.g. by
361      the LIST procedure) when we are using an output driver that keeps a
362      reference to the output items passed to it (e.g. the GUI output driver in
363      PSPPIRE). */
364   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
365                                          CASENUMBER_MAX,
366                                          &proc_casereader_class, ds);
367   ds->shim = casereader_shim_insert (reader);
368   return reader;
369 }
370
371 /* Opens dataset DS for reading cases with proc_read.
372    proc_commit must be called when done. */
373 struct casereader *
374 proc_open (struct dataset *ds)
375 {
376   return proc_open_filtering (ds, true);
377 }
378
379 /* Returns true if a procedure is in progress, that is, if
380    proc_open has been called but proc_commit has not. */
381 bool
382 proc_is_open (const struct dataset *ds)
383 {
384   return ds->proc_state != PROC_COMMITTED;
385 }
386
387 /* "read" function for procedure casereader. */
388 static struct ccase *
389 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
390 {
391   struct dataset *ds = ds_;
392   enum trns_result retval = TRNS_DROP_CASE;
393   struct ccase *c;
394
395   assert (ds->proc_state == PROC_OPEN);
396   for (; ; case_unref (c))
397     {
398       casenumber case_nr;
399
400       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
401       if (retval == TRNS_ERROR)
402         ds->ok = false;
403       if (!ds->ok)
404         return NULL;
405
406       /* Read a case from source. */
407       c = casereader_read (ds->source);
408       if (c == NULL)
409         return NULL;
410       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
411       caseinit_init_vars (ds->caseinit, c);
412
413       /* Execute permanent transformations.  */
414       case_nr = ds->cases_written + 1;
415       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
416                                    &c, case_nr);
417       caseinit_update_left_vars (ds->caseinit, c);
418       if (retval != TRNS_CONTINUE)
419         continue;
420
421       /* Write case to collection of lagged cases. */
422       if (ds->n_lag > 0)
423         {
424           while (deque_count (&ds->lag) >= ds->n_lag)
425             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
426           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
427         }
428
429       /* Write case to replacement active file. */
430       ds->cases_written++;
431       if (ds->sink != NULL)
432         casewriter_write (ds->sink,
433                           case_map_execute (ds->compactor, case_ref (c)));
434
435       /* Execute temporary transformations. */
436       if (ds->temporary_trns_chain != NULL)
437         {
438           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
439                                        &c, ds->cases_written);
440           if (retval != TRNS_CONTINUE)
441             continue;
442         }
443
444       return c;
445     }
446 }
447
448 /* "destroy" function for procedure casereader. */
449 static void
450 proc_casereader_destroy (struct casereader *reader, void *ds_)
451 {
452   struct dataset *ds = ds_;
453   struct ccase *c;
454
455   /* We are always the subreader for a casereader_buffer, so if we're being
456      destroyed then it's because the casereader_buffer has read all the cases
457      that it ever will. */
458   ds->shim = NULL;
459
460   /* Make sure transformations happen for every input case, in
461      case they have side effects, and ensure that the replacement
462      active file gets all the cases it should. */
463   while ((c = casereader_read (reader)) != NULL)
464     case_unref (c);
465
466   ds->proc_state = PROC_CLOSED;
467   ds->ok = casereader_destroy (ds->source) && ds->ok;
468   ds->source = NULL;
469   dataset_set_source (ds, NULL);
470 }
471
472 /* Must return false if the source casereader, a transformation,
473    or the sink casewriter signaled an error.  (If a temporary
474    transformation signals an error, then the return value is
475    false, but the replacement active file may still be
476    untainted.) */
477 bool
478 proc_commit (struct dataset *ds)
479 {
480   if (ds->shim != NULL)
481     casereader_shim_slurp (ds->shim);
482
483   assert (ds->proc_state == PROC_CLOSED);
484   ds->proc_state = PROC_COMMITTED;
485
486   dataset_changed__ (ds);
487
488   /* Free memory for lagged cases. */
489   while (!deque_is_empty (&ds->lag))
490     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
491   free (ds->lag_cases);
492
493   /* Dictionary from before TEMPORARY becomes permanent. */
494   proc_cancel_temporary_transformations (ds);
495
496   if (!ds->discard_output)
497     {
498       /* Finish compacting. */
499       if (ds->compactor != NULL)
500         {
501           case_map_destroy (ds->compactor);
502           ds->compactor = NULL;
503
504           dict_delete_scratch_vars (ds->dict);
505           dict_compact_values (ds->dict);
506         }
507
508       /* Old data sink becomes new data source. */
509       if (ds->sink != NULL)
510         ds->source = casewriter_make_reader (ds->sink);
511     }
512   else
513     {
514       ds->source = NULL;
515       ds->discard_output = false;
516     }
517   ds->sink = NULL;
518
519   caseinit_clear (ds->caseinit);
520   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
521
522   dict_clear_vectors (ds->dict);
523   ds->permanent_dict = NULL;
524   return proc_cancel_all_transformations (ds) && ds->ok;
525 }
526
527 /* Casereader class for procedure execution. */
528 static const struct casereader_class proc_casereader_class =
529   {
530     proc_casereader_read,
531     proc_casereader_destroy,
532     NULL,
533     NULL,
534   };
535
536 /* Updates last_proc_invocation. */
537 static void
538 update_last_proc_invocation (struct dataset *ds)
539 {
540   ds->last_proc_invocation = time (NULL);
541 }
542 \f
543 /* Returns a pointer to the lagged case from N_BEFORE cases before the
544    current one, or NULL if there haven't been that many cases yet. */
545 const struct ccase *
546 lagged_case (const struct dataset *ds, int n_before)
547 {
548   assert (n_before >= 1);
549   assert (n_before <= ds->n_lag);
550
551   if (n_before <= deque_count (&ds->lag))
552     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
553   else
554     return NULL;
555 }
556 \f
557 /* Returns the current set of permanent transformations,
558    and clears the permanent transformations.
559    For use by INPUT PROGRAM. */
560 struct trns_chain *
561 proc_capture_transformations (struct dataset *ds)
562 {
563   struct trns_chain *chain;
564
565   assert (ds->temporary_trns_chain == NULL);
566   chain = ds->permanent_trns_chain;
567   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
568   dataset_transformations_changed__ (ds, false);
569
570   return chain;
571 }
572
573 /* Adds a transformation that processes a case with PROC and
574    frees itself with FREE to the current set of transformations.
575    The functions are passed AUX as auxiliary data. */
576 void
577 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
578 {
579   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
580   dataset_transformations_changed__ (ds, true);
581 }
582
583 /* Adds a transformation that processes a case with PROC and
584    frees itself with FREE to the current set of transformations.
585    When parsing of the block of transformations is complete,
586    FINALIZE will be called.
587    The functions are passed AUX as auxiliary data. */
588 void
589 add_transformation_with_finalizer (struct dataset *ds,
590                                    trns_finalize_func *finalize,
591                                    trns_proc_func *proc,
592                                    trns_free_func *free, void *aux)
593 {
594   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
595   dataset_transformations_changed__ (ds, true);
596 }
597
598 /* Returns the index of the next transformation.
599    This value can be returned by a transformation procedure
600    function to indicate a "jump" to that transformation. */
601 size_t
602 next_transformation (const struct dataset *ds)
603 {
604   return trns_chain_next (ds->cur_trns_chain);
605 }
606
607 /* Returns true if the next call to add_transformation() will add
608    a temporary transformation, false if it will add a permanent
609    transformation. */
610 bool
611 proc_in_temporary_transformations (const struct dataset *ds)
612 {
613   return ds->temporary_trns_chain != NULL;
614 }
615
616 /* Marks the start of temporary transformations.
617    Further calls to add_transformation() will add temporary
618    transformations. */
619 void
620 proc_start_temporary_transformations (struct dataset *ds)
621 {
622   if (!proc_in_temporary_transformations (ds))
623     {
624       add_case_limit_trns (ds);
625
626       ds->permanent_dict = dict_clone (ds->dict);
627
628       trns_chain_finalize (ds->permanent_trns_chain);
629       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
630       dataset_transformations_changed__ (ds, true);
631     }
632 }
633
634 /* Converts all the temporary transformations, if any, to
635    permanent transformations.  Further transformations will be
636    permanent.
637    Returns true if anything changed, false otherwise. */
638 bool
639 proc_make_temporary_transformations_permanent (struct dataset *ds)
640 {
641   if (proc_in_temporary_transformations (ds))
642     {
643       trns_chain_finalize (ds->temporary_trns_chain);
644       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
645       ds->temporary_trns_chain = NULL;
646
647       dict_destroy (ds->permanent_dict);
648       ds->permanent_dict = NULL;
649
650       return true;
651     }
652   else
653     return false;
654 }
655
656 /* Cancels all temporary transformations, if any.  Further
657    transformations will be permanent.
658    Returns true if anything changed, false otherwise. */
659 bool
660 proc_cancel_temporary_transformations (struct dataset *ds)
661 {
662   if (proc_in_temporary_transformations (ds))
663     {
664       dict_destroy (ds->dict);
665       ds->dict = ds->permanent_dict;
666       ds->permanent_dict = NULL;
667
668       trns_chain_destroy (ds->temporary_trns_chain);
669       ds->temporary_trns_chain = NULL;
670       dataset_transformations_changed__ (
671         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
672       return true;
673     }
674   else
675     return false;
676 }
677
678 /* Cancels all transformations, if any.
679    Returns true if successful, false on I/O error. */
680 bool
681 proc_cancel_all_transformations (struct dataset *ds)
682 {
683   bool ok;
684   assert (ds->proc_state == PROC_COMMITTED);
685   ok = trns_chain_destroy (ds->permanent_trns_chain);
686   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
687   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
688   ds->temporary_trns_chain = NULL;
689   dataset_transformations_changed__ (ds, false);
690
691   return ok;
692 }
693 \f
694 /* Causes output from the next procedure to be discarded, instead
695    of being preserved for use as input for the next procedure. */
696 void
697 proc_discard_output (struct dataset *ds)
698 {
699   ds->discard_output = true;
700 }
701
702
703 /* Checks whether DS has a corrupted active file.  If so,
704    discards it and returns false.  If not, returns true without
705    doing anything. */
706 bool
707 dataset_end_of_command (struct dataset *ds)
708 {
709   if (ds->source != NULL)
710     {
711       if (casereader_error (ds->source))
712         {
713           dataset_clear (ds);
714           return false;
715         }
716       else
717         {
718           const struct taint *taint = casereader_get_taint (ds->source);
719           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
720           assert (!taint_has_tainted_successor (taint));
721         }
722     }
723   return true;
724 }
725 \f
726 static trns_proc_func case_limit_trns_proc;
727 static trns_free_func case_limit_trns_free;
728
729 /* Adds a transformation that limits the number of cases that may
730    pass through, if DS->DICT has a case limit. */
731 static void
732 add_case_limit_trns (struct dataset *ds)
733 {
734   casenumber case_limit = dict_get_case_limit (ds->dict);
735   if (case_limit != 0)
736     {
737       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
738       *cases_remaining = case_limit;
739       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
740                           cases_remaining);
741       dict_set_case_limit (ds->dict, 0);
742     }
743 }
744
745 /* Limits the maximum number of cases processed to
746    *CASES_REMAINING. */
747 static int
748 case_limit_trns_proc (void *cases_remaining_,
749                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
750 {
751   size_t *cases_remaining = cases_remaining_;
752   if (*cases_remaining > 0)
753     {
754       (*cases_remaining)--;
755       return TRNS_CONTINUE;
756     }
757   else
758     return TRNS_DROP_CASE;
759 }
760
761 /* Frees the data associated with a case limit transformation. */
762 static bool
763 case_limit_trns_free (void *cases_remaining_)
764 {
765   size_t *cases_remaining = cases_remaining_;
766   free (cases_remaining);
767   return true;
768 }
769 \f
770 static trns_proc_func filter_trns_proc;
771
772 /* Adds a temporary transformation to filter data according to
773    the variable specified on FILTER, if any. */
774 static void
775 add_filter_trns (struct dataset *ds)
776 {
777   struct variable *filter_var = dict_get_filter (ds->dict);
778   if (filter_var != NULL)
779     {
780       proc_start_temporary_transformations (ds);
781       add_transformation (ds, filter_trns_proc, NULL, filter_var);
782     }
783 }
784
785 /* FILTER transformation. */
786 static int
787 filter_trns_proc (void *filter_var_,
788                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
789
790 {
791   struct variable *filter_var = filter_var_;
792   double f = case_num (*c, filter_var);
793   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
794           ? TRNS_CONTINUE : TRNS_DROP_CASE);
795 }
796
797
798 void
799 dataset_need_lag (struct dataset *ds, int n_before)
800 {
801   ds->n_lag = MAX (ds->n_lag, n_before);
802 }
803 \f
804 static void
805 dataset_changed__ (struct dataset *ds)
806 {
807   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
808     ds->callbacks->changed (ds->cb_data);
809 }
810
811 static void
812 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
813 {
814   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
815     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
816 }