dictionary: Make dict_create() take the new dictionary's encoding.
[pspp-builds.git] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* If true, cases are discarded instead of being written to
64      sink. */
65   bool discard_output;
66
67   /* The transformation chain that the next transformation will be
68      added to. */
69   struct trns_chain *cur_trns_chain;
70
71   /* The case map used to compact a case, if necessary;
72      otherwise a null pointer. */
73   struct case_map *compactor;
74
75   /* Time at which proc was last invoked. */
76   time_t last_proc_invocation;
77
78   /* Cases just before ("lagging") the current one. */
79   int n_lag;                    /* Number of cases to lag. */
80   struct deque lag;             /* Deque of lagged cases. */
81   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
82
83   /* Procedure data. */
84   enum
85     {
86       PROC_COMMITTED,           /* No procedure in progress. */
87       PROC_OPEN,                /* proc_open called, casereader still open. */
88       PROC_CLOSED               /* casereader from proc_open destroyed,
89                                    but proc_commit not yet called. */
90     }
91   proc_state;
92   casenumber cases_written;     /* Cases output so far. */
93   bool ok;                      /* Error status. */
94   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
95
96   const struct dataset_callbacks *callbacks;
97   void *cb_data;
98
99   /* Default encoding for reading syntax files. */
100   char *syntax_encoding;
101 };
102
103 static void dataset_changed__ (struct dataset *);
104 static void dataset_transformations_changed__ (struct dataset *,
105                                                bool non_empty);
106
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
109
110 static void update_last_proc_invocation (struct dataset *ds);
111
112 static void
113 dict_callback (struct dictionary *d UNUSED, void *ds_)
114 {
115   struct dataset *ds = ds_;
116   dataset_changed__ (ds);
117 }
118 \f
119 /* Creates and returns a new dataset.  The dataset initially has an empty
120    dictionary and no data source. */
121 struct dataset *
122 dataset_create (void)
123 {
124   struct dataset *ds;
125
126   ds = xzalloc (sizeof *ds);
127   ds->dict = dict_create (get_default_encoding ());
128   dict_set_change_callback (ds->dict, dict_callback, ds);
129
130   ds->caseinit = caseinit_create ();
131   proc_cancel_all_transformations (ds);
132   ds->syntax_encoding = xstrdup ("Auto");
133   return ds;
134 }
135
136 /* Destroys DS. */
137 void
138 dataset_destroy (struct dataset *ds)
139 {
140   if (ds != NULL)
141     {
142       dataset_clear (ds);
143       dict_destroy (ds->dict);
144       caseinit_destroy (ds->caseinit);
145       trns_chain_destroy (ds->permanent_trns_chain);
146       dataset_transformations_changed__ (ds, false);
147       free (ds->syntax_encoding);
148       free (ds);
149     }
150 }
151
152 /* Discards the active dataset's dictionary, data, and transformations. */
153 void
154 dataset_clear (struct dataset *ds)
155 {
156   assert (ds->proc_state == PROC_COMMITTED);
157
158   dict_clear (ds->dict);
159   fh_set_default_handle (NULL);
160
161   ds->n_lag = 0;
162
163   casereader_destroy (ds->source);
164   ds->source = NULL;
165
166   proc_cancel_all_transformations (ds);
167 }
168
169 /* Returns the dictionary within DS.  This is always nonnull, although it
170    might not contain any variables. */
171 struct dictionary *
172 dataset_dict (const struct dataset *ds)
173 {
174   return ds->dict;
175 }
176
177 /* Replaces DS's dictionary by DICT, discarding any source and
178    transformations. */
179 void
180 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
181 {
182   assert (ds->proc_state == PROC_COMMITTED);
183   assert (ds->dict != dict);
184
185   dataset_clear (ds);
186
187   dict_destroy (ds->dict);
188   ds->dict = dict;
189   dict_set_change_callback (ds->dict, dict_callback, ds);
190 }
191
192 /* Returns the casereader that will be read when a procedure is executed on
193    DS.  This can be NULL if none has been set up yet. */
194 const struct casereader *
195 dataset_source (const struct dataset *ds)
196 {
197   return ds->source;
198 }
199
200 /* Returns true if DS has a data source, false otherwise. */
201 bool
202 dataset_has_source (const struct dataset *ds)
203 {
204   return dataset_source (ds) != NULL;
205 }
206
207 /* Replaces the active dataset's data by READER.  READER's cases must have an
208    appropriate format for DS's dictionary. */
209 bool
210 dataset_set_source (struct dataset *ds, struct casereader *reader)
211 {
212   casereader_destroy (ds->source);
213   ds->source = reader;
214
215   caseinit_clear (ds->caseinit);
216   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
217
218   return reader == NULL || !casereader_error (reader);
219 }
220
221 /* Returns the data source from DS and removes it from DS.  Returns a null
222    pointer if DS has no data source. */
223 struct casereader *
224 dataset_steal_source (struct dataset *ds)
225 {
226   struct casereader *reader = ds->source;
227   ds->source = NULL;
228
229   return reader;
230 }
231
232 void
233 dataset_set_callbacks (struct dataset *ds,
234                        const struct dataset_callbacks *callbacks,
235                        void *cb_data)
236 {
237   ds->callbacks = callbacks;
238   ds->cb_data = cb_data;
239 }
240
241 void
242 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
243 {
244   free (ds->syntax_encoding);
245   ds->syntax_encoding = xstrdup (encoding);
246 }
247
248 const char *
249 dataset_get_default_syntax_encoding (const struct dataset *ds)
250 {
251   return ds->syntax_encoding;
252 }
253 \f
254 /* Returns the last time the data was read. */
255 time_t
256 time_of_last_procedure (struct dataset *ds)
257 {
258   if (ds->last_proc_invocation == 0)
259     update_last_proc_invocation (ds);
260   return ds->last_proc_invocation;
261 }
262 \f
263 /* Regular procedure. */
264
265 /* Executes any pending transformations, if necessary.
266    This is not identical to the EXECUTE command in that it won't
267    always read the source data.  This can be important when the
268    source data is given inline within BEGIN DATA...END FILE. */
269 bool
270 proc_execute (struct dataset *ds)
271 {
272   bool ok;
273
274   if ((ds->temporary_trns_chain == NULL
275        || trns_chain_is_empty (ds->temporary_trns_chain))
276       && trns_chain_is_empty (ds->permanent_trns_chain))
277     {
278       ds->n_lag = 0;
279       ds->discard_output = false;
280       dict_set_case_limit (ds->dict, 0);
281       dict_clear_vectors (ds->dict);
282       return true;
283     }
284
285   ok = casereader_destroy (proc_open (ds));
286   return proc_commit (ds) && ok;
287 }
288
289 static const struct casereader_class proc_casereader_class;
290
291 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
292    cases filtered out with FILTER BY will not be included in the casereader
293    (which is usually desirable).  If FILTER is false, all cases will be
294    included regardless of FILTER BY settings.
295
296    proc_commit must be called when done. */
297 struct casereader *
298 proc_open_filtering (struct dataset *ds, bool filter)
299 {
300   struct casereader *reader;
301
302   assert (ds->source != NULL);
303   assert (ds->proc_state == PROC_COMMITTED);
304
305   update_last_proc_invocation (ds);
306
307   caseinit_mark_for_init (ds->caseinit, ds->dict);
308
309   /* Finish up the collection of transformations. */
310   add_case_limit_trns (ds);
311   if (filter)
312     add_filter_trns (ds);
313   trns_chain_finalize (ds->cur_trns_chain);
314
315   /* Make permanent_dict refer to the dictionary right before
316      data reaches the sink. */
317   if (ds->permanent_dict == NULL)
318     ds->permanent_dict = ds->dict;
319
320   /* Prepare sink. */
321   if (!ds->discard_output)
322     {
323       struct dictionary *pd = ds->permanent_dict;
324       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
325       if (compacted_value_cnt < dict_get_next_value_idx (pd))
326         {
327           struct caseproto *compacted_proto;
328           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
329           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
330           ds->sink = autopaging_writer_create (compacted_proto);
331           caseproto_unref (compacted_proto);
332         }
333       else
334         {
335           ds->compactor = NULL;
336           ds->sink = autopaging_writer_create (dict_get_proto (pd));
337         }
338     }
339   else
340     {
341       ds->compactor = NULL;
342       ds->sink = NULL;
343     }
344
345   /* Allocate memory for lagged cases. */
346   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
347
348   ds->proc_state = PROC_OPEN;
349   ds->cases_written = 0;
350   ds->ok = true;
351
352   /* FIXME: use taint in dataset in place of `ok'? */
353   /* FIXME: for trivial cases we can just return a clone of
354      ds->source? */
355
356   /* Create casereader and insert a shim on top.  The shim allows us to
357      arbitrarily extend the casereader's lifetime, by slurping the cases into
358      the shim's buffer in proc_commit().  That is especially useful when output
359      table_items are generated directly from the procedure casereader (e.g. by
360      the LIST procedure) when we are using an output driver that keeps a
361      reference to the output items passed to it (e.g. the GUI output driver in
362      PSPPIRE). */
363   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
364                                          CASENUMBER_MAX,
365                                          &proc_casereader_class, ds);
366   ds->shim = casereader_shim_insert (reader);
367   return reader;
368 }
369
370 /* Opens dataset DS for reading cases with proc_read.
371    proc_commit must be called when done. */
372 struct casereader *
373 proc_open (struct dataset *ds)
374 {
375   return proc_open_filtering (ds, true);
376 }
377
378 /* Returns true if a procedure is in progress, that is, if
379    proc_open has been called but proc_commit has not. */
380 bool
381 proc_is_open (const struct dataset *ds)
382 {
383   return ds->proc_state != PROC_COMMITTED;
384 }
385
386 /* "read" function for procedure casereader. */
387 static struct ccase *
388 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
389 {
390   struct dataset *ds = ds_;
391   enum trns_result retval = TRNS_DROP_CASE;
392   struct ccase *c;
393
394   assert (ds->proc_state == PROC_OPEN);
395   for (; ; case_unref (c))
396     {
397       casenumber case_nr;
398
399       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
400       if (retval == TRNS_ERROR)
401         ds->ok = false;
402       if (!ds->ok)
403         return NULL;
404
405       /* Read a case from source. */
406       c = casereader_read (ds->source);
407       if (c == NULL)
408         return NULL;
409       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
410       caseinit_init_vars (ds->caseinit, c);
411
412       /* Execute permanent transformations.  */
413       case_nr = ds->cases_written + 1;
414       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
415                                    &c, case_nr);
416       caseinit_update_left_vars (ds->caseinit, c);
417       if (retval != TRNS_CONTINUE)
418         continue;
419
420       /* Write case to collection of lagged cases. */
421       if (ds->n_lag > 0)
422         {
423           while (deque_count (&ds->lag) >= ds->n_lag)
424             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
425           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
426         }
427
428       /* Write case to replacement dataset. */
429       ds->cases_written++;
430       if (ds->sink != NULL)
431         casewriter_write (ds->sink,
432                           case_map_execute (ds->compactor, case_ref (c)));
433
434       /* Execute temporary transformations. */
435       if (ds->temporary_trns_chain != NULL)
436         {
437           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
438                                        &c, ds->cases_written);
439           if (retval != TRNS_CONTINUE)
440             continue;
441         }
442
443       return c;
444     }
445 }
446
447 /* "destroy" function for procedure casereader. */
448 static void
449 proc_casereader_destroy (struct casereader *reader, void *ds_)
450 {
451   struct dataset *ds = ds_;
452   struct ccase *c;
453
454   /* We are always the subreader for a casereader_buffer, so if we're being
455      destroyed then it's because the casereader_buffer has read all the cases
456      that it ever will. */
457   ds->shim = NULL;
458
459   /* Make sure transformations happen for every input case, in
460      case they have side effects, and ensure that the replacement
461      active dataset gets all the cases it should. */
462   while ((c = casereader_read (reader)) != NULL)
463     case_unref (c);
464
465   ds->proc_state = PROC_CLOSED;
466   ds->ok = casereader_destroy (ds->source) && ds->ok;
467   ds->source = NULL;
468   dataset_set_source (ds, NULL);
469 }
470
471 /* Must return false if the source casereader, a transformation,
472    or the sink casewriter signaled an error.  (If a temporary
473    transformation signals an error, then the return value is
474    false, but the replacement active dataset may still be
475    untainted.) */
476 bool
477 proc_commit (struct dataset *ds)
478 {
479   if (ds->shim != NULL)
480     casereader_shim_slurp (ds->shim);
481
482   assert (ds->proc_state == PROC_CLOSED);
483   ds->proc_state = PROC_COMMITTED;
484
485   dataset_changed__ (ds);
486
487   /* Free memory for lagged cases. */
488   while (!deque_is_empty (&ds->lag))
489     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
490   free (ds->lag_cases);
491
492   /* Dictionary from before TEMPORARY becomes permanent. */
493   proc_cancel_temporary_transformations (ds);
494
495   if (!ds->discard_output)
496     {
497       /* Finish compacting. */
498       if (ds->compactor != NULL)
499         {
500           case_map_destroy (ds->compactor);
501           ds->compactor = NULL;
502
503           dict_delete_scratch_vars (ds->dict);
504           dict_compact_values (ds->dict);
505         }
506
507       /* Old data sink becomes new data source. */
508       if (ds->sink != NULL)
509         ds->source = casewriter_make_reader (ds->sink);
510     }
511   else
512     {
513       ds->source = NULL;
514       ds->discard_output = false;
515     }
516   ds->sink = NULL;
517
518   caseinit_clear (ds->caseinit);
519   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
520
521   dict_clear_vectors (ds->dict);
522   ds->permanent_dict = NULL;
523   return proc_cancel_all_transformations (ds) && ds->ok;
524 }
525
526 /* Casereader class for procedure execution. */
527 static const struct casereader_class proc_casereader_class =
528   {
529     proc_casereader_read,
530     proc_casereader_destroy,
531     NULL,
532     NULL,
533   };
534
535 /* Updates last_proc_invocation. */
536 static void
537 update_last_proc_invocation (struct dataset *ds)
538 {
539   ds->last_proc_invocation = time (NULL);
540 }
541 \f
542 /* Returns a pointer to the lagged case from N_BEFORE cases before the
543    current one, or NULL if there haven't been that many cases yet. */
544 const struct ccase *
545 lagged_case (const struct dataset *ds, int n_before)
546 {
547   assert (n_before >= 1);
548   assert (n_before <= ds->n_lag);
549
550   if (n_before <= deque_count (&ds->lag))
551     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
552   else
553     return NULL;
554 }
555 \f
556 /* Returns the current set of permanent transformations,
557    and clears the permanent transformations.
558    For use by INPUT PROGRAM. */
559 struct trns_chain *
560 proc_capture_transformations (struct dataset *ds)
561 {
562   struct trns_chain *chain;
563
564   assert (ds->temporary_trns_chain == NULL);
565   chain = ds->permanent_trns_chain;
566   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
567   dataset_transformations_changed__ (ds, false);
568
569   return chain;
570 }
571
572 /* Adds a transformation that processes a case with PROC and
573    frees itself with FREE to the current set of transformations.
574    The functions are passed AUX as auxiliary data. */
575 void
576 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
577 {
578   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
579   dataset_transformations_changed__ (ds, true);
580 }
581
582 /* Adds a transformation that processes a case with PROC and
583    frees itself with FREE to the current set of transformations.
584    When parsing of the block of transformations is complete,
585    FINALIZE will be called.
586    The functions are passed AUX as auxiliary data. */
587 void
588 add_transformation_with_finalizer (struct dataset *ds,
589                                    trns_finalize_func *finalize,
590                                    trns_proc_func *proc,
591                                    trns_free_func *free, void *aux)
592 {
593   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
594   dataset_transformations_changed__ (ds, true);
595 }
596
597 /* Returns the index of the next transformation.
598    This value can be returned by a transformation procedure
599    function to indicate a "jump" to that transformation. */
600 size_t
601 next_transformation (const struct dataset *ds)
602 {
603   return trns_chain_next (ds->cur_trns_chain);
604 }
605
606 /* Returns true if the next call to add_transformation() will add
607    a temporary transformation, false if it will add a permanent
608    transformation. */
609 bool
610 proc_in_temporary_transformations (const struct dataset *ds)
611 {
612   return ds->temporary_trns_chain != NULL;
613 }
614
615 /* Marks the start of temporary transformations.
616    Further calls to add_transformation() will add temporary
617    transformations. */
618 void
619 proc_start_temporary_transformations (struct dataset *ds)
620 {
621   if (!proc_in_temporary_transformations (ds))
622     {
623       add_case_limit_trns (ds);
624
625       ds->permanent_dict = dict_clone (ds->dict);
626
627       trns_chain_finalize (ds->permanent_trns_chain);
628       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
629       dataset_transformations_changed__ (ds, true);
630     }
631 }
632
633 /* Converts all the temporary transformations, if any, to
634    permanent transformations.  Further transformations will be
635    permanent.
636    Returns true if anything changed, false otherwise. */
637 bool
638 proc_make_temporary_transformations_permanent (struct dataset *ds)
639 {
640   if (proc_in_temporary_transformations (ds))
641     {
642       trns_chain_finalize (ds->temporary_trns_chain);
643       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
644       ds->temporary_trns_chain = NULL;
645
646       dict_destroy (ds->permanent_dict);
647       ds->permanent_dict = NULL;
648
649       return true;
650     }
651   else
652     return false;
653 }
654
655 /* Cancels all temporary transformations, if any.  Further
656    transformations will be permanent.
657    Returns true if anything changed, false otherwise. */
658 bool
659 proc_cancel_temporary_transformations (struct dataset *ds)
660 {
661   if (proc_in_temporary_transformations (ds))
662     {
663       dict_destroy (ds->dict);
664       ds->dict = ds->permanent_dict;
665       ds->permanent_dict = NULL;
666
667       trns_chain_destroy (ds->temporary_trns_chain);
668       ds->temporary_trns_chain = NULL;
669       dataset_transformations_changed__ (
670         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
671       return true;
672     }
673   else
674     return false;
675 }
676
677 /* Cancels all transformations, if any.
678    Returns true if successful, false on I/O error. */
679 bool
680 proc_cancel_all_transformations (struct dataset *ds)
681 {
682   bool ok;
683   assert (ds->proc_state == PROC_COMMITTED);
684   ok = trns_chain_destroy (ds->permanent_trns_chain);
685   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
686   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
687   ds->temporary_trns_chain = NULL;
688   dataset_transformations_changed__ (ds, false);
689
690   return ok;
691 }
692 \f
693 /* Causes output from the next procedure to be discarded, instead
694    of being preserved for use as input for the next procedure. */
695 void
696 proc_discard_output (struct dataset *ds)
697 {
698   ds->discard_output = true;
699 }
700
701
702 /* Checks whether DS has a corrupted active dataset.  If so,
703    discards it and returns false.  If not, returns true without
704    doing anything. */
705 bool
706 dataset_end_of_command (struct dataset *ds)
707 {
708   if (ds->source != NULL)
709     {
710       if (casereader_error (ds->source))
711         {
712           dataset_clear (ds);
713           return false;
714         }
715       else
716         {
717           const struct taint *taint = casereader_get_taint (ds->source);
718           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
719           assert (!taint_has_tainted_successor (taint));
720         }
721     }
722   return true;
723 }
724 \f
725 static trns_proc_func case_limit_trns_proc;
726 static trns_free_func case_limit_trns_free;
727
728 /* Adds a transformation that limits the number of cases that may
729    pass through, if DS->DICT has a case limit. */
730 static void
731 add_case_limit_trns (struct dataset *ds)
732 {
733   casenumber case_limit = dict_get_case_limit (ds->dict);
734   if (case_limit != 0)
735     {
736       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
737       *cases_remaining = case_limit;
738       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
739                           cases_remaining);
740       dict_set_case_limit (ds->dict, 0);
741     }
742 }
743
744 /* Limits the maximum number of cases processed to
745    *CASES_REMAINING. */
746 static int
747 case_limit_trns_proc (void *cases_remaining_,
748                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
749 {
750   size_t *cases_remaining = cases_remaining_;
751   if (*cases_remaining > 0)
752     {
753       (*cases_remaining)--;
754       return TRNS_CONTINUE;
755     }
756   else
757     return TRNS_DROP_CASE;
758 }
759
760 /* Frees the data associated with a case limit transformation. */
761 static bool
762 case_limit_trns_free (void *cases_remaining_)
763 {
764   size_t *cases_remaining = cases_remaining_;
765   free (cases_remaining);
766   return true;
767 }
768 \f
769 static trns_proc_func filter_trns_proc;
770
771 /* Adds a temporary transformation to filter data according to
772    the variable specified on FILTER, if any. */
773 static void
774 add_filter_trns (struct dataset *ds)
775 {
776   struct variable *filter_var = dict_get_filter (ds->dict);
777   if (filter_var != NULL)
778     {
779       proc_start_temporary_transformations (ds);
780       add_transformation (ds, filter_trns_proc, NULL, filter_var);
781     }
782 }
783
784 /* FILTER transformation. */
785 static int
786 filter_trns_proc (void *filter_var_,
787                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
788
789 {
790   struct variable *filter_var = filter_var_;
791   double f = case_num (*c, filter_var);
792   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
793           ? TRNS_CONTINUE : TRNS_DROP_CASE);
794 }
795
796
797 void
798 dataset_need_lag (struct dataset *ds, int n_before)
799 {
800   ds->n_lag = MAX (ds->n_lag, n_before);
801 }
802 \f
803 static void
804 dataset_changed__ (struct dataset *ds)
805 {
806   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
807     ds->callbacks->changed (ds->cb_data);
808 }
809
810 static void
811 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
812 {
813   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
814     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
815 }