a45f497afd1268012a77064a5d49b33839c974f8
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/procedure.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
42
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
45
46 struct dataset {
47   /* Cases are read from source,
48      their transformation variables are initialized,
49      pass through permanent_trns_chain (which transforms them into
50      the format described by permanent_dict),
51      are written to sink,
52      pass through temporary_trns_chain (which transforms them into
53      the format described by dict),
54      and are finally passed to the procedure. */
55   struct casereader *source;
56   struct caseinit *caseinit;
57   struct trns_chain *permanent_trns_chain;
58   struct dictionary *permanent_dict;
59   struct casewriter *sink;
60   struct trns_chain *temporary_trns_chain;
61   struct dictionary *dict;
62
63   /* Callback which occurs whenever the transformation chain(s) have
64      been modified */
65   transformation_change_callback_func *xform_callback;
66   void *xform_callback_aux;
67
68   /* If true, cases are discarded instead of being written to
69      sink. */
70   bool discard_output;
71
72   /* The transformation chain that the next transformation will be
73      added to. */
74   struct trns_chain *cur_trns_chain;
75
76   /* The case map used to compact a case, if necessary;
77      otherwise a null pointer. */
78   struct case_map *compactor;
79
80   /* Time at which proc was last invoked. */
81   time_t last_proc_invocation;
82
83   /* Cases just before ("lagging") the current one. */
84   int n_lag;                    /* Number of cases to lag. */
85   struct deque lag;             /* Deque of lagged cases. */
86   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
87
88   /* Procedure data. */
89   enum
90     {
91       PROC_COMMITTED,           /* No procedure in progress. */
92       PROC_OPEN,                /* proc_open called, casereader still open. */
93       PROC_CLOSED               /* casereader from proc_open destroyed,
94                                    but proc_commit not yet called. */
95     }
96   proc_state;
97   casenumber cases_written;     /* Cases output so far. */
98   bool ok;                      /* Error status. */
99   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
100
101   void (*callback) (void *); /* Callback for when the dataset changes */
102   void *cb_data;
103
104   /* Default encoding for reading syntax files. */
105   char *syntax_encoding;
106 }; /* struct dataset */
107
108
109 static void add_case_limit_trns (struct dataset *ds);
110 static void add_filter_trns (struct dataset *ds);
111
112 static void update_last_proc_invocation (struct dataset *ds);
113
114 static void
115 dataset_set_unsaved (const struct dataset *ds)
116 {
117   if (ds->callback) ds->callback (ds->cb_data);
118 }
119
120 \f
121 /* Public functions. */
122
123 void
124 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
125 {
126   ds->callback = cb;
127   ds->cb_data = cb_data;
128 }
129
130 void
131 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
132 {
133   free (ds->syntax_encoding);
134   ds->syntax_encoding = xstrdup (encoding);
135 }
136
137 const char *
138 dataset_get_default_syntax_encoding (const struct dataset *ds)
139 {
140   return ds->syntax_encoding;
141 }
142
143 /* Returns the last time the data was read. */
144 time_t
145 time_of_last_procedure (struct dataset *ds)
146 {
147   if (ds->last_proc_invocation == 0)
148     update_last_proc_invocation (ds);
149   return ds->last_proc_invocation;
150 }
151 \f
152 /* Regular procedure. */
153
154 /* Executes any pending transformations, if necessary.
155    This is not identical to the EXECUTE command in that it won't
156    always read the source data.  This can be important when the
157    source data is given inline within BEGIN DATA...END FILE. */
158 bool
159 proc_execute (struct dataset *ds)
160 {
161   bool ok;
162
163   if ((ds->temporary_trns_chain == NULL
164        || trns_chain_is_empty (ds->temporary_trns_chain))
165       && trns_chain_is_empty (ds->permanent_trns_chain))
166     {
167       ds->n_lag = 0;
168       ds->discard_output = false;
169       dict_set_case_limit (ds->dict, 0);
170       dict_clear_vectors (ds->dict);
171       return true;
172     }
173
174   ok = casereader_destroy (proc_open (ds));
175   return proc_commit (ds) && ok;
176 }
177
178 static const struct casereader_class proc_casereader_class;
179
180 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
181    cases filtered out with FILTER BY will not be included in the casereader
182    (which is usually desirable).  If FILTER is false, all cases will be
183    included regardless of FILTER BY settings.
184
185    proc_commit must be called when done. */
186 struct casereader *
187 proc_open_filtering (struct dataset *ds, bool filter)
188 {
189   struct casereader *reader;
190
191   assert (ds->source != NULL);
192   assert (ds->proc_state == PROC_COMMITTED);
193
194   update_last_proc_invocation (ds);
195
196   caseinit_mark_for_init (ds->caseinit, ds->dict);
197
198   /* Finish up the collection of transformations. */
199   add_case_limit_trns (ds);
200   if (filter)
201     add_filter_trns (ds);
202   trns_chain_finalize (ds->cur_trns_chain);
203
204   /* Make permanent_dict refer to the dictionary right before
205      data reaches the sink. */
206   if (ds->permanent_dict == NULL)
207     ds->permanent_dict = ds->dict;
208
209   /* Prepare sink. */
210   if (!ds->discard_output)
211     {
212       struct dictionary *pd = ds->permanent_dict;
213       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
214       if (compacted_value_cnt < dict_get_next_value_idx (pd))
215         {
216           struct caseproto *compacted_proto;
217           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
218           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
219           ds->sink = autopaging_writer_create (compacted_proto);
220           caseproto_unref (compacted_proto);
221         }
222       else
223         {
224           ds->compactor = NULL;
225           ds->sink = autopaging_writer_create (dict_get_proto (pd));
226         }
227     }
228   else
229     {
230       ds->compactor = NULL;
231       ds->sink = NULL;
232     }
233
234   /* Allocate memory for lagged cases. */
235   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
236
237   ds->proc_state = PROC_OPEN;
238   ds->cases_written = 0;
239   ds->ok = true;
240
241   /* FIXME: use taint in dataset in place of `ok'? */
242   /* FIXME: for trivial cases we can just return a clone of
243      ds->source? */
244
245   /* Create casereader and insert a shim on top.  The shim allows us to
246      arbitrarily extend the casereader's lifetime, by slurping the cases into
247      the shim's buffer in proc_commit().  That is especially useful when output
248      table_items are generated directly from the procedure casereader (e.g. by
249      the LIST procedure) when we are using an output driver that keeps a
250      reference to the output items passed to it (e.g. the GUI output driver in
251      PSPPIRE). */
252   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
253                                          CASENUMBER_MAX,
254                                          &proc_casereader_class, ds);
255   ds->shim = casereader_shim_insert (reader);
256   return reader;
257 }
258
259 /* Opens dataset DS for reading cases with proc_read.
260    proc_commit must be called when done. */
261 struct casereader *
262 proc_open (struct dataset *ds)
263 {
264   return proc_open_filtering (ds, true);
265 }
266
267 /* Returns true if a procedure is in progress, that is, if
268    proc_open has been called but proc_commit has not. */
269 bool
270 proc_is_open (const struct dataset *ds)
271 {
272   return ds->proc_state != PROC_COMMITTED;
273 }
274
275 /* "read" function for procedure casereader. */
276 static struct ccase *
277 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
278 {
279   struct dataset *ds = ds_;
280   enum trns_result retval = TRNS_DROP_CASE;
281   struct ccase *c;
282
283   assert (ds->proc_state == PROC_OPEN);
284   for (; ; case_unref (c))
285     {
286       casenumber case_nr;
287
288       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
289       if (retval == TRNS_ERROR)
290         ds->ok = false;
291       if (!ds->ok)
292         return NULL;
293
294       /* Read a case from source. */
295       c = casereader_read (ds->source);
296       if (c == NULL)
297         return NULL;
298       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
299       caseinit_init_vars (ds->caseinit, c);
300
301       /* Execute permanent transformations.  */
302       case_nr = ds->cases_written + 1;
303       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
304                                    &c, case_nr);
305       caseinit_update_left_vars (ds->caseinit, c);
306       if (retval != TRNS_CONTINUE)
307         continue;
308
309       /* Write case to collection of lagged cases. */
310       if (ds->n_lag > 0)
311         {
312           while (deque_count (&ds->lag) >= ds->n_lag)
313             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
314           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
315         }
316
317       /* Write case to replacement active file. */
318       ds->cases_written++;
319       if (ds->sink != NULL)
320         casewriter_write (ds->sink,
321                           case_map_execute (ds->compactor, case_ref (c)));
322
323       /* Execute temporary transformations. */
324       if (ds->temporary_trns_chain != NULL)
325         {
326           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
327                                        &c, ds->cases_written);
328           if (retval != TRNS_CONTINUE)
329             continue;
330         }
331
332       return c;
333     }
334 }
335
336 /* "destroy" function for procedure casereader. */
337 static void
338 proc_casereader_destroy (struct casereader *reader, void *ds_)
339 {
340   struct dataset *ds = ds_;
341   struct ccase *c;
342
343   /* We are always the subreader for a casereader_buffer, so if we're being
344      destroyed then it's because the casereader_buffer has read all the cases
345      that it ever will. */
346   ds->shim = NULL;
347
348   /* Make sure transformations happen for every input case, in
349      case they have side effects, and ensure that the replacement
350      active file gets all the cases it should. */
351   while ((c = casereader_read (reader)) != NULL)
352     case_unref (c);
353
354   ds->proc_state = PROC_CLOSED;
355   ds->ok = casereader_destroy (ds->source) && ds->ok;
356   ds->source = NULL;
357   proc_set_active_file_data (ds, NULL);
358 }
359
360 /* Must return false if the source casereader, a transformation,
361    or the sink casewriter signaled an error.  (If a temporary
362    transformation signals an error, then the return value is
363    false, but the replacement active file may still be
364    untainted.) */
365 bool
366 proc_commit (struct dataset *ds)
367 {
368   if (ds->shim != NULL)
369     casereader_shim_slurp (ds->shim);
370
371   assert (ds->proc_state == PROC_CLOSED);
372   ds->proc_state = PROC_COMMITTED;
373
374   dataset_set_unsaved (ds);
375
376   /* Free memory for lagged cases. */
377   while (!deque_is_empty (&ds->lag))
378     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
379   free (ds->lag_cases);
380
381   /* Dictionary from before TEMPORARY becomes permanent. */
382   proc_cancel_temporary_transformations (ds);
383
384   if (!ds->discard_output)
385     {
386       /* Finish compacting. */
387       if (ds->compactor != NULL)
388         {
389           case_map_destroy (ds->compactor);
390           ds->compactor = NULL;
391
392           dict_delete_scratch_vars (ds->dict);
393           dict_compact_values (ds->dict);
394         }
395
396       /* Old data sink becomes new data source. */
397       if (ds->sink != NULL)
398         ds->source = casewriter_make_reader (ds->sink);
399     }
400   else
401     {
402       ds->source = NULL;
403       ds->discard_output = false;
404     }
405   ds->sink = NULL;
406
407   caseinit_clear (ds->caseinit);
408   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
409
410   dict_clear_vectors (ds->dict);
411   ds->permanent_dict = NULL;
412   return proc_cancel_all_transformations (ds) && ds->ok;
413 }
414
415 /* Casereader class for procedure execution. */
416 static const struct casereader_class proc_casereader_class =
417   {
418     proc_casereader_read,
419     proc_casereader_destroy,
420     NULL,
421     NULL,
422   };
423
424 /* Updates last_proc_invocation. */
425 static void
426 update_last_proc_invocation (struct dataset *ds)
427 {
428   ds->last_proc_invocation = time (NULL);
429 }
430 \f
431 /* Returns a pointer to the lagged case from N_BEFORE cases before the
432    current one, or NULL if there haven't been that many cases yet. */
433 const struct ccase *
434 lagged_case (const struct dataset *ds, int n_before)
435 {
436   assert (n_before >= 1);
437   assert (n_before <= ds->n_lag);
438
439   if (n_before <= deque_count (&ds->lag))
440     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
441   else
442     return NULL;
443 }
444 \f
445 /* Returns the current set of permanent transformations,
446    and clears the permanent transformations.
447    For use by INPUT PROGRAM. */
448 struct trns_chain *
449 proc_capture_transformations (struct dataset *ds)
450 {
451   struct trns_chain *chain;
452
453   assert (ds->temporary_trns_chain == NULL);
454   chain = ds->permanent_trns_chain;
455   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
456
457   if ( ds->xform_callback)
458     ds->xform_callback (false, ds->xform_callback_aux);
459
460   return chain;
461 }
462
463 /* Adds a transformation that processes a case with PROC and
464    frees itself with FREE to the current set of transformations.
465    The functions are passed AUX as auxiliary data. */
466 void
467 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
468 {
469   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
470   if ( ds->xform_callback)
471     ds->xform_callback (true, ds->xform_callback_aux);
472 }
473
474 /* Adds a transformation that processes a case with PROC and
475    frees itself with FREE to the current set of transformations.
476    When parsing of the block of transformations is complete,
477    FINALIZE will be called.
478    The functions are passed AUX as auxiliary data. */
479 void
480 add_transformation_with_finalizer (struct dataset *ds,
481                                    trns_finalize_func *finalize,
482                                    trns_proc_func *proc,
483                                    trns_free_func *free, void *aux)
484 {
485   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
486
487   if ( ds->xform_callback)
488     ds->xform_callback (true, ds->xform_callback_aux);
489 }
490
491 /* Returns the index of the next transformation.
492    This value can be returned by a transformation procedure
493    function to indicate a "jump" to that transformation. */
494 size_t
495 next_transformation (const struct dataset *ds)
496 {
497   return trns_chain_next (ds->cur_trns_chain);
498 }
499
500 /* Returns true if the next call to add_transformation() will add
501    a temporary transformation, false if it will add a permanent
502    transformation. */
503 bool
504 proc_in_temporary_transformations (const struct dataset *ds)
505 {
506   return ds->temporary_trns_chain != NULL;
507 }
508
509 /* Marks the start of temporary transformations.
510    Further calls to add_transformation() will add temporary
511    transformations. */
512 void
513 proc_start_temporary_transformations (struct dataset *ds)
514 {
515   if (!proc_in_temporary_transformations (ds))
516     {
517       add_case_limit_trns (ds);
518
519       ds->permanent_dict = dict_clone (ds->dict);
520
521       trns_chain_finalize (ds->permanent_trns_chain);
522       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
523
524       if ( ds->xform_callback)
525         ds->xform_callback (true, ds->xform_callback_aux);
526     }
527 }
528
529 /* Converts all the temporary transformations, if any, to
530    permanent transformations.  Further transformations will be
531    permanent.
532    Returns true if anything changed, false otherwise. */
533 bool
534 proc_make_temporary_transformations_permanent (struct dataset *ds)
535 {
536   if (proc_in_temporary_transformations (ds))
537     {
538       trns_chain_finalize (ds->temporary_trns_chain);
539       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
540       ds->temporary_trns_chain = NULL;
541
542       dict_destroy (ds->permanent_dict);
543       ds->permanent_dict = NULL;
544
545       return true;
546     }
547   else
548     return false;
549 }
550
551 /* Cancels all temporary transformations, if any.  Further
552    transformations will be permanent.
553    Returns true if anything changed, false otherwise. */
554 bool
555 proc_cancel_temporary_transformations (struct dataset *ds)
556 {
557   if (proc_in_temporary_transformations (ds))
558     {
559       dict_destroy (ds->dict);
560       ds->dict = ds->permanent_dict;
561       ds->permanent_dict = NULL;
562
563       trns_chain_destroy (ds->temporary_trns_chain);
564       ds->temporary_trns_chain = NULL;
565
566       if ( ds->xform_callback)
567         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
568                             ds->xform_callback_aux);
569
570       return true;
571     }
572   else
573     return false;
574 }
575
576 /* Cancels all transformations, if any.
577    Returns true if successful, false on I/O error. */
578 bool
579 proc_cancel_all_transformations (struct dataset *ds)
580 {
581   bool ok;
582   assert (ds->proc_state == PROC_COMMITTED);
583   ok = trns_chain_destroy (ds->permanent_trns_chain);
584   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
585   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
586   ds->temporary_trns_chain = NULL;
587   if ( ds->xform_callback)
588     ds->xform_callback (false, ds->xform_callback_aux);
589
590   return ok;
591 }
592 \f
593
594 static void
595 dict_callback (struct dictionary *d UNUSED, void *ds_)
596 {
597   struct dataset *ds = ds_;
598   dataset_set_unsaved (ds);
599 }
600
601 /* Initializes procedure handling. */
602 struct dataset *
603 create_dataset (void)
604 {
605   struct dataset *ds = xzalloc (sizeof(*ds));
606   ds->dict = dict_create ();
607
608   dict_set_change_callback (ds->dict, dict_callback, ds);
609
610   dict_set_encoding (ds->dict, get_default_encoding ());
611
612   ds->caseinit = caseinit_create ();
613   proc_cancel_all_transformations (ds);
614
615   ds->syntax_encoding = xstrdup ("Auto");
616
617   return ds;
618 }
619
620
621 void
622 dataset_add_transform_change_callback (struct dataset *ds,
623                                        transformation_change_callback_func *cb,
624                                        void *aux)
625 {
626   ds->xform_callback = cb;
627   ds->xform_callback_aux = aux;
628 }
629
630 /* Finishes up procedure handling. */
631 void
632 destroy_dataset (struct dataset *ds)
633 {
634   proc_discard_active_file (ds);
635   dict_destroy (ds->dict);
636   caseinit_destroy (ds->caseinit);
637   trns_chain_destroy (ds->permanent_trns_chain);
638
639   if ( ds->xform_callback)
640     ds->xform_callback (false, ds->xform_callback_aux);
641
642   free (ds->syntax_encoding);
643   free (ds);
644 }
645
646 /* Causes output from the next procedure to be discarded, instead
647    of being preserved for use as input for the next procedure. */
648 void
649 proc_discard_output (struct dataset *ds)
650 {
651   ds->discard_output = true;
652 }
653
654 /* Discards the active file dictionary, data, and
655    transformations. */
656 void
657 proc_discard_active_file (struct dataset *ds)
658 {
659   assert (ds->proc_state == PROC_COMMITTED);
660
661   dict_clear (ds->dict);
662   fh_set_default_handle (NULL);
663
664   ds->n_lag = 0;
665
666   casereader_destroy (ds->source);
667   ds->source = NULL;
668
669   proc_cancel_all_transformations (ds);
670 }
671
672 /* Sets SOURCE as the source for procedure input for the next
673    procedure. */
674 void
675 proc_set_active_file (struct dataset *ds,
676                       struct casereader *source,
677                       struct dictionary *dict)
678 {
679   assert (ds->proc_state == PROC_COMMITTED);
680   assert (ds->dict != dict);
681
682   proc_discard_active_file (ds);
683
684   dict_destroy (ds->dict);
685   ds->dict = dict;
686   dict_set_change_callback (ds->dict, dict_callback, ds);
687
688   proc_set_active_file_data (ds, source);
689 }
690
691 /* Replaces the active file's data by READER without replacing
692    the associated dictionary. */
693 bool
694 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
695 {
696   casereader_destroy (ds->source);
697   ds->source = reader;
698
699   caseinit_clear (ds->caseinit);
700   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
701
702   return reader == NULL || !casereader_error (reader);
703 }
704
705 /* Returns true if an active file data source is available, false
706    otherwise. */
707 bool
708 proc_has_active_file (const struct dataset *ds)
709 {
710   return ds->source != NULL;
711 }
712
713 /* Returns the active file data source from DS, or a null pointer
714    if DS has no data source, and removes it from DS. */
715 struct casereader *
716 proc_extract_active_file_data (struct dataset *ds)
717 {
718   struct casereader *reader = ds->source;
719   ds->source = NULL;
720
721   return reader;
722 }
723
724 /* Checks whether DS has a corrupted active file.  If so,
725    discards it and returns false.  If not, returns true without
726    doing anything. */
727 bool
728 dataset_end_of_command (struct dataset *ds)
729 {
730   if (ds->source != NULL)
731     {
732       if (casereader_error (ds->source))
733         {
734           proc_discard_active_file (ds);
735           return false;
736         }
737       else
738         {
739           const struct taint *taint = casereader_get_taint (ds->source);
740           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
741           assert (!taint_has_tainted_successor (taint));
742         }
743     }
744   return true;
745 }
746 \f
747 static trns_proc_func case_limit_trns_proc;
748 static trns_free_func case_limit_trns_free;
749
750 /* Adds a transformation that limits the number of cases that may
751    pass through, if DS->DICT has a case limit. */
752 static void
753 add_case_limit_trns (struct dataset *ds)
754 {
755   casenumber case_limit = dict_get_case_limit (ds->dict);
756   if (case_limit != 0)
757     {
758       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
759       *cases_remaining = case_limit;
760       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
761                           cases_remaining);
762       dict_set_case_limit (ds->dict, 0);
763     }
764 }
765
766 /* Limits the maximum number of cases processed to
767    *CASES_REMAINING. */
768 static int
769 case_limit_trns_proc (void *cases_remaining_,
770                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
771 {
772   size_t *cases_remaining = cases_remaining_;
773   if (*cases_remaining > 0)
774     {
775       (*cases_remaining)--;
776       return TRNS_CONTINUE;
777     }
778   else
779     return TRNS_DROP_CASE;
780 }
781
782 /* Frees the data associated with a case limit transformation. */
783 static bool
784 case_limit_trns_free (void *cases_remaining_)
785 {
786   size_t *cases_remaining = cases_remaining_;
787   free (cases_remaining);
788   return true;
789 }
790 \f
791 static trns_proc_func filter_trns_proc;
792
793 /* Adds a temporary transformation to filter data according to
794    the variable specified on FILTER, if any. */
795 static void
796 add_filter_trns (struct dataset *ds)
797 {
798   struct variable *filter_var = dict_get_filter (ds->dict);
799   if (filter_var != NULL)
800     {
801       proc_start_temporary_transformations (ds);
802       add_transformation (ds, filter_trns_proc, NULL, filter_var);
803     }
804 }
805
806 /* FILTER transformation. */
807 static int
808 filter_trns_proc (void *filter_var_,
809                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
810
811 {
812   struct variable *filter_var = filter_var_;
813   double f = case_num (*c, filter_var);
814   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
815           ? TRNS_CONTINUE : TRNS_DROP_CASE);
816 }
817
818
819 struct dictionary *
820 dataset_dict (const struct dataset *ds)
821 {
822   return ds->dict;
823 }
824
825 const struct casereader *
826 dataset_source (const struct dataset *ds)
827 {
828   return ds->source;
829 }
830
831 void
832 dataset_need_lag (struct dataset *ds, int n_before)
833 {
834   ds->n_lag = MAX (ds->n_lag, n_before);
835 }