1729c2d0530307df762eb9f1d5963fe7e3bd0470
[pspp-builds.git] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* Callback which occurs whenever the transformation chain(s) have
64      been modified */
65   transformation_change_callback_func *xform_callback;
66   void *xform_callback_aux;
67
68   /* If true, cases are discarded instead of being written to
69      sink. */
70   bool discard_output;
71
72   /* The transformation chain that the next transformation will be
73      added to. */
74   struct trns_chain *cur_trns_chain;
75
76   /* The case map used to compact a case, if necessary;
77      otherwise a null pointer. */
78   struct case_map *compactor;
79
80   /* Time at which proc was last invoked. */
81   time_t last_proc_invocation;
82
83   /* Cases just before ("lagging") the current one. */
84   int n_lag;                    /* Number of cases to lag. */
85   struct deque lag;             /* Deque of lagged cases. */
86   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
87
88   /* Procedure data. */
89   enum
90     {
91       PROC_COMMITTED,           /* No procedure in progress. */
92       PROC_OPEN,                /* proc_open called, casereader still open. */
93       PROC_CLOSED               /* casereader from proc_open destroyed,
94                                    but proc_commit not yet called. */
95     }
96   proc_state;
97   casenumber cases_written;     /* Cases output so far. */
98   bool ok;                      /* Error status. */
99   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
100
101   void (*callback) (void *); /* Callback for when the dataset changes */
102   void *cb_data;
103
104   /* Default encoding for reading syntax files. */
105   char *syntax_encoding;
106 };
107
108 static void add_case_limit_trns (struct dataset *ds);
109 static void add_filter_trns (struct dataset *ds);
110
111 static void update_last_proc_invocation (struct dataset *ds);
112
113 static void
114 dataset_set_unsaved (const struct dataset *ds)
115 {
116   if (ds->callback) ds->callback (ds->cb_data);
117 }
118
119 static void
120 dict_callback (struct dictionary *d UNUSED, void *ds_)
121 {
122   struct dataset *ds = ds_;
123   dataset_set_unsaved (ds);
124 }
125 \f
126 /* Creates and returns a new dataset.  The dataset initially has an empty
127    dictionary and no data source. */
128 struct dataset *
129 dataset_create (void)
130 {
131   struct dataset *ds;
132
133   ds = xzalloc (sizeof *ds);
134   ds->dict = dict_create ();
135   dict_set_change_callback (ds->dict, dict_callback, ds);
136   dict_set_encoding (ds->dict, get_default_encoding ());
137
138   ds->caseinit = caseinit_create ();
139   proc_cancel_all_transformations (ds);
140   ds->syntax_encoding = xstrdup ("Auto");
141   return ds;
142 }
143
144 /* Destroys DS. */
145 void
146 dataset_destroy (struct dataset *ds)
147 {
148   if (ds != NULL)
149     {
150       dataset_clear (ds);
151       dict_destroy (ds->dict);
152       caseinit_destroy (ds->caseinit);
153       trns_chain_destroy (ds->permanent_trns_chain);
154
155       if ( ds->xform_callback)
156         ds->xform_callback (false, ds->xform_callback_aux);
157       free (ds->syntax_encoding);
158       free (ds);
159     }
160 }
161
162 /* Discards the active file dictionary, data, and transformations. */
163 void
164 dataset_clear (struct dataset *ds)
165 {
166   assert (ds->proc_state == PROC_COMMITTED);
167
168   dict_clear (ds->dict);
169   fh_set_default_handle (NULL);
170
171   ds->n_lag = 0;
172
173   casereader_destroy (ds->source);
174   ds->source = NULL;
175
176   proc_cancel_all_transformations (ds);
177 }
178
179 /* Returns the dictionary within DS.  This is always nonnull, although it
180    might not contain any variables. */
181 struct dictionary *
182 dataset_dict (const struct dataset *ds)
183 {
184   return ds->dict;
185 }
186
187 /* Replaces DS's dictionary by DICT, discarding any source and
188    transformations. */
189 void
190 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
191 {
192   assert (ds->proc_state == PROC_COMMITTED);
193   assert (ds->dict != dict);
194
195   dataset_clear (ds);
196
197   dict_destroy (ds->dict);
198   ds->dict = dict;
199   dict_set_change_callback (ds->dict, dict_callback, ds);
200 }
201
202 /* Returns the casereader that will be read when a procedure is executed on
203    DS.  This can be NULL if none has been set up yet. */
204 const struct casereader *
205 dataset_source (const struct dataset *ds)
206 {
207   return ds->source;
208 }
209
210 /* Returns true if DS has a data source, false otherwise. */
211 bool
212 dataset_has_source (const struct dataset *ds)
213 {
214   return dataset_source (ds) != NULL;
215 }
216
217 /* Replaces the active file's data by READER.  READER's cases must have an
218    appropriate format for DS's dictionary. */
219 bool
220 dataset_set_source (struct dataset *ds, struct casereader *reader)
221 {
222   casereader_destroy (ds->source);
223   ds->source = reader;
224
225   caseinit_clear (ds->caseinit);
226   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
227
228   return reader == NULL || !casereader_error (reader);
229 }
230
231 /* Returns the data source from DS and removes it from DS.  Returns a null
232    pointer if DS has no data source. */
233 struct casereader *
234 dataset_steal_source (struct dataset *ds)
235 {
236   struct casereader *reader = ds->source;
237   ds->source = NULL;
238
239   return reader;
240 }
241 \f
242 void
243 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
244 {
245   ds->callback = cb;
246   ds->cb_data = cb_data;
247 }
248
249 void
250 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
251 {
252   free (ds->syntax_encoding);
253   ds->syntax_encoding = xstrdup (encoding);
254 }
255
256 const char *
257 dataset_get_default_syntax_encoding (const struct dataset *ds)
258 {
259   return ds->syntax_encoding;
260 }
261
262 /* Returns the last time the data was read. */
263 time_t
264 time_of_last_procedure (struct dataset *ds)
265 {
266   if (ds->last_proc_invocation == 0)
267     update_last_proc_invocation (ds);
268   return ds->last_proc_invocation;
269 }
270 \f
271 /* Regular procedure. */
272
273 /* Executes any pending transformations, if necessary.
274    This is not identical to the EXECUTE command in that it won't
275    always read the source data.  This can be important when the
276    source data is given inline within BEGIN DATA...END FILE. */
277 bool
278 proc_execute (struct dataset *ds)
279 {
280   bool ok;
281
282   if ((ds->temporary_trns_chain == NULL
283        || trns_chain_is_empty (ds->temporary_trns_chain))
284       && trns_chain_is_empty (ds->permanent_trns_chain))
285     {
286       ds->n_lag = 0;
287       ds->discard_output = false;
288       dict_set_case_limit (ds->dict, 0);
289       dict_clear_vectors (ds->dict);
290       return true;
291     }
292
293   ok = casereader_destroy (proc_open (ds));
294   return proc_commit (ds) && ok;
295 }
296
297 static const struct casereader_class proc_casereader_class;
298
299 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
300    cases filtered out with FILTER BY will not be included in the casereader
301    (which is usually desirable).  If FILTER is false, all cases will be
302    included regardless of FILTER BY settings.
303
304    proc_commit must be called when done. */
305 struct casereader *
306 proc_open_filtering (struct dataset *ds, bool filter)
307 {
308   struct casereader *reader;
309
310   assert (ds->source != NULL);
311   assert (ds->proc_state == PROC_COMMITTED);
312
313   update_last_proc_invocation (ds);
314
315   caseinit_mark_for_init (ds->caseinit, ds->dict);
316
317   /* Finish up the collection of transformations. */
318   add_case_limit_trns (ds);
319   if (filter)
320     add_filter_trns (ds);
321   trns_chain_finalize (ds->cur_trns_chain);
322
323   /* Make permanent_dict refer to the dictionary right before
324      data reaches the sink. */
325   if (ds->permanent_dict == NULL)
326     ds->permanent_dict = ds->dict;
327
328   /* Prepare sink. */
329   if (!ds->discard_output)
330     {
331       struct dictionary *pd = ds->permanent_dict;
332       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
333       if (compacted_value_cnt < dict_get_next_value_idx (pd))
334         {
335           struct caseproto *compacted_proto;
336           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
337           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
338           ds->sink = autopaging_writer_create (compacted_proto);
339           caseproto_unref (compacted_proto);
340         }
341       else
342         {
343           ds->compactor = NULL;
344           ds->sink = autopaging_writer_create (dict_get_proto (pd));
345         }
346     }
347   else
348     {
349       ds->compactor = NULL;
350       ds->sink = NULL;
351     }
352
353   /* Allocate memory for lagged cases. */
354   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
355
356   ds->proc_state = PROC_OPEN;
357   ds->cases_written = 0;
358   ds->ok = true;
359
360   /* FIXME: use taint in dataset in place of `ok'? */
361   /* FIXME: for trivial cases we can just return a clone of
362      ds->source? */
363
364   /* Create casereader and insert a shim on top.  The shim allows us to
365      arbitrarily extend the casereader's lifetime, by slurping the cases into
366      the shim's buffer in proc_commit().  That is especially useful when output
367      table_items are generated directly from the procedure casereader (e.g. by
368      the LIST procedure) when we are using an output driver that keeps a
369      reference to the output items passed to it (e.g. the GUI output driver in
370      PSPPIRE). */
371   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
372                                          CASENUMBER_MAX,
373                                          &proc_casereader_class, ds);
374   ds->shim = casereader_shim_insert (reader);
375   return reader;
376 }
377
378 /* Opens dataset DS for reading cases with proc_read.
379    proc_commit must be called when done. */
380 struct casereader *
381 proc_open (struct dataset *ds)
382 {
383   return proc_open_filtering (ds, true);
384 }
385
386 /* Returns true if a procedure is in progress, that is, if
387    proc_open has been called but proc_commit has not. */
388 bool
389 proc_is_open (const struct dataset *ds)
390 {
391   return ds->proc_state != PROC_COMMITTED;
392 }
393
394 /* "read" function for procedure casereader. */
395 static struct ccase *
396 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
397 {
398   struct dataset *ds = ds_;
399   enum trns_result retval = TRNS_DROP_CASE;
400   struct ccase *c;
401
402   assert (ds->proc_state == PROC_OPEN);
403   for (; ; case_unref (c))
404     {
405       casenumber case_nr;
406
407       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
408       if (retval == TRNS_ERROR)
409         ds->ok = false;
410       if (!ds->ok)
411         return NULL;
412
413       /* Read a case from source. */
414       c = casereader_read (ds->source);
415       if (c == NULL)
416         return NULL;
417       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
418       caseinit_init_vars (ds->caseinit, c);
419
420       /* Execute permanent transformations.  */
421       case_nr = ds->cases_written + 1;
422       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
423                                    &c, case_nr);
424       caseinit_update_left_vars (ds->caseinit, c);
425       if (retval != TRNS_CONTINUE)
426         continue;
427
428       /* Write case to collection of lagged cases. */
429       if (ds->n_lag > 0)
430         {
431           while (deque_count (&ds->lag) >= ds->n_lag)
432             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
433           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
434         }
435
436       /* Write case to replacement active file. */
437       ds->cases_written++;
438       if (ds->sink != NULL)
439         casewriter_write (ds->sink,
440                           case_map_execute (ds->compactor, case_ref (c)));
441
442       /* Execute temporary transformations. */
443       if (ds->temporary_trns_chain != NULL)
444         {
445           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
446                                        &c, ds->cases_written);
447           if (retval != TRNS_CONTINUE)
448             continue;
449         }
450
451       return c;
452     }
453 }
454
455 /* "destroy" function for procedure casereader. */
456 static void
457 proc_casereader_destroy (struct casereader *reader, void *ds_)
458 {
459   struct dataset *ds = ds_;
460   struct ccase *c;
461
462   /* We are always the subreader for a casereader_buffer, so if we're being
463      destroyed then it's because the casereader_buffer has read all the cases
464      that it ever will. */
465   ds->shim = NULL;
466
467   /* Make sure transformations happen for every input case, in
468      case they have side effects, and ensure that the replacement
469      active file gets all the cases it should. */
470   while ((c = casereader_read (reader)) != NULL)
471     case_unref (c);
472
473   ds->proc_state = PROC_CLOSED;
474   ds->ok = casereader_destroy (ds->source) && ds->ok;
475   ds->source = NULL;
476   dataset_set_source (ds, NULL);
477 }
478
479 /* Must return false if the source casereader, a transformation,
480    or the sink casewriter signaled an error.  (If a temporary
481    transformation signals an error, then the return value is
482    false, but the replacement active file may still be
483    untainted.) */
484 bool
485 proc_commit (struct dataset *ds)
486 {
487   if (ds->shim != NULL)
488     casereader_shim_slurp (ds->shim);
489
490   assert (ds->proc_state == PROC_CLOSED);
491   ds->proc_state = PROC_COMMITTED;
492
493   dataset_set_unsaved (ds);
494
495   /* Free memory for lagged cases. */
496   while (!deque_is_empty (&ds->lag))
497     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
498   free (ds->lag_cases);
499
500   /* Dictionary from before TEMPORARY becomes permanent. */
501   proc_cancel_temporary_transformations (ds);
502
503   if (!ds->discard_output)
504     {
505       /* Finish compacting. */
506       if (ds->compactor != NULL)
507         {
508           case_map_destroy (ds->compactor);
509           ds->compactor = NULL;
510
511           dict_delete_scratch_vars (ds->dict);
512           dict_compact_values (ds->dict);
513         }
514
515       /* Old data sink becomes new data source. */
516       if (ds->sink != NULL)
517         ds->source = casewriter_make_reader (ds->sink);
518     }
519   else
520     {
521       ds->source = NULL;
522       ds->discard_output = false;
523     }
524   ds->sink = NULL;
525
526   caseinit_clear (ds->caseinit);
527   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
528
529   dict_clear_vectors (ds->dict);
530   ds->permanent_dict = NULL;
531   return proc_cancel_all_transformations (ds) && ds->ok;
532 }
533
534 /* Casereader class for procedure execution. */
535 static const struct casereader_class proc_casereader_class =
536   {
537     proc_casereader_read,
538     proc_casereader_destroy,
539     NULL,
540     NULL,
541   };
542
543 /* Updates last_proc_invocation. */
544 static void
545 update_last_proc_invocation (struct dataset *ds)
546 {
547   ds->last_proc_invocation = time (NULL);
548 }
549 \f
550 /* Returns a pointer to the lagged case from N_BEFORE cases before the
551    current one, or NULL if there haven't been that many cases yet. */
552 const struct ccase *
553 lagged_case (const struct dataset *ds, int n_before)
554 {
555   assert (n_before >= 1);
556   assert (n_before <= ds->n_lag);
557
558   if (n_before <= deque_count (&ds->lag))
559     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
560   else
561     return NULL;
562 }
563 \f
564 /* Returns the current set of permanent transformations,
565    and clears the permanent transformations.
566    For use by INPUT PROGRAM. */
567 struct trns_chain *
568 proc_capture_transformations (struct dataset *ds)
569 {
570   struct trns_chain *chain;
571
572   assert (ds->temporary_trns_chain == NULL);
573   chain = ds->permanent_trns_chain;
574   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
575
576   if ( ds->xform_callback)
577     ds->xform_callback (false, ds->xform_callback_aux);
578
579   return chain;
580 }
581
582 /* Adds a transformation that processes a case with PROC and
583    frees itself with FREE to the current set of transformations.
584    The functions are passed AUX as auxiliary data. */
585 void
586 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
587 {
588   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
589   if ( ds->xform_callback)
590     ds->xform_callback (true, ds->xform_callback_aux);
591 }
592
593 /* Adds a transformation that processes a case with PROC and
594    frees itself with FREE to the current set of transformations.
595    When parsing of the block of transformations is complete,
596    FINALIZE will be called.
597    The functions are passed AUX as auxiliary data. */
598 void
599 add_transformation_with_finalizer (struct dataset *ds,
600                                    trns_finalize_func *finalize,
601                                    trns_proc_func *proc,
602                                    trns_free_func *free, void *aux)
603 {
604   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
605
606   if ( ds->xform_callback)
607     ds->xform_callback (true, ds->xform_callback_aux);
608 }
609
610 /* Returns the index of the next transformation.
611    This value can be returned by a transformation procedure
612    function to indicate a "jump" to that transformation. */
613 size_t
614 next_transformation (const struct dataset *ds)
615 {
616   return trns_chain_next (ds->cur_trns_chain);
617 }
618
619 /* Returns true if the next call to add_transformation() will add
620    a temporary transformation, false if it will add a permanent
621    transformation. */
622 bool
623 proc_in_temporary_transformations (const struct dataset *ds)
624 {
625   return ds->temporary_trns_chain != NULL;
626 }
627
628 /* Marks the start of temporary transformations.
629    Further calls to add_transformation() will add temporary
630    transformations. */
631 void
632 proc_start_temporary_transformations (struct dataset *ds)
633 {
634   if (!proc_in_temporary_transformations (ds))
635     {
636       add_case_limit_trns (ds);
637
638       ds->permanent_dict = dict_clone (ds->dict);
639
640       trns_chain_finalize (ds->permanent_trns_chain);
641       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
642
643       if ( ds->xform_callback)
644         ds->xform_callback (true, ds->xform_callback_aux);
645     }
646 }
647
648 /* Converts all the temporary transformations, if any, to
649    permanent transformations.  Further transformations will be
650    permanent.
651    Returns true if anything changed, false otherwise. */
652 bool
653 proc_make_temporary_transformations_permanent (struct dataset *ds)
654 {
655   if (proc_in_temporary_transformations (ds))
656     {
657       trns_chain_finalize (ds->temporary_trns_chain);
658       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
659       ds->temporary_trns_chain = NULL;
660
661       dict_destroy (ds->permanent_dict);
662       ds->permanent_dict = NULL;
663
664       return true;
665     }
666   else
667     return false;
668 }
669
670 /* Cancels all temporary transformations, if any.  Further
671    transformations will be permanent.
672    Returns true if anything changed, false otherwise. */
673 bool
674 proc_cancel_temporary_transformations (struct dataset *ds)
675 {
676   if (proc_in_temporary_transformations (ds))
677     {
678       dict_destroy (ds->dict);
679       ds->dict = ds->permanent_dict;
680       ds->permanent_dict = NULL;
681
682       trns_chain_destroy (ds->temporary_trns_chain);
683       ds->temporary_trns_chain = NULL;
684
685       if ( ds->xform_callback)
686         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
687                             ds->xform_callback_aux);
688
689       return true;
690     }
691   else
692     return false;
693 }
694
695 /* Cancels all transformations, if any.
696    Returns true if successful, false on I/O error. */
697 bool
698 proc_cancel_all_transformations (struct dataset *ds)
699 {
700   bool ok;
701   assert (ds->proc_state == PROC_COMMITTED);
702   ok = trns_chain_destroy (ds->permanent_trns_chain);
703   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
704   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
705   ds->temporary_trns_chain = NULL;
706   if ( ds->xform_callback)
707     ds->xform_callback (false, ds->xform_callback_aux);
708
709   return ok;
710 }
711 \f
712
713 void
714 dataset_add_transform_change_callback (struct dataset *ds,
715                                        transformation_change_callback_func *cb,
716                                        void *aux)
717 {
718   ds->xform_callback = cb;
719   ds->xform_callback_aux = aux;
720 }
721
722 /* Causes output from the next procedure to be discarded, instead
723    of being preserved for use as input for the next procedure. */
724 void
725 proc_discard_output (struct dataset *ds)
726 {
727   ds->discard_output = true;
728 }
729
730
731 /* Checks whether DS has a corrupted active file.  If so,
732    discards it and returns false.  If not, returns true without
733    doing anything. */
734 bool
735 dataset_end_of_command (struct dataset *ds)
736 {
737   if (ds->source != NULL)
738     {
739       if (casereader_error (ds->source))
740         {
741           dataset_clear (ds);
742           return false;
743         }
744       else
745         {
746           const struct taint *taint = casereader_get_taint (ds->source);
747           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
748           assert (!taint_has_tainted_successor (taint));
749         }
750     }
751   return true;
752 }
753 \f
754 static trns_proc_func case_limit_trns_proc;
755 static trns_free_func case_limit_trns_free;
756
757 /* Adds a transformation that limits the number of cases that may
758    pass through, if DS->DICT has a case limit. */
759 static void
760 add_case_limit_trns (struct dataset *ds)
761 {
762   casenumber case_limit = dict_get_case_limit (ds->dict);
763   if (case_limit != 0)
764     {
765       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
766       *cases_remaining = case_limit;
767       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
768                           cases_remaining);
769       dict_set_case_limit (ds->dict, 0);
770     }
771 }
772
773 /* Limits the maximum number of cases processed to
774    *CASES_REMAINING. */
775 static int
776 case_limit_trns_proc (void *cases_remaining_,
777                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
778 {
779   size_t *cases_remaining = cases_remaining_;
780   if (*cases_remaining > 0)
781     {
782       (*cases_remaining)--;
783       return TRNS_CONTINUE;
784     }
785   else
786     return TRNS_DROP_CASE;
787 }
788
789 /* Frees the data associated with a case limit transformation. */
790 static bool
791 case_limit_trns_free (void *cases_remaining_)
792 {
793   size_t *cases_remaining = cases_remaining_;
794   free (cases_remaining);
795   return true;
796 }
797 \f
798 static trns_proc_func filter_trns_proc;
799
800 /* Adds a temporary transformation to filter data according to
801    the variable specified on FILTER, if any. */
802 static void
803 add_filter_trns (struct dataset *ds)
804 {
805   struct variable *filter_var = dict_get_filter (ds->dict);
806   if (filter_var != NULL)
807     {
808       proc_start_temporary_transformations (ds);
809       add_transformation (ds, filter_trns_proc, NULL, filter_var);
810     }
811 }
812
813 /* FILTER transformation. */
814 static int
815 filter_trns_proc (void *filter_var_,
816                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
817
818 {
819   struct variable *filter_var = filter_var_;
820   double f = case_num (*c, filter_var);
821   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
822           ? TRNS_CONTINUE : TRNS_DROP_CASE);
823 }
824
825
826 void
827 dataset_need_lag (struct dataset *ds, int n_before)
828 {
829   ds->n_lag = MAX (ds->n_lag, n_before);
830 }