Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39 #include <libpspp/i18n.h>
40
41 #include "minmax.h"
42 #include "xalloc.h"
43
44 struct dataset {
45   /* Cases are read from source,
46      their transformation variables are initialized,
47      pass through permanent_trns_chain (which transforms them into
48      the format described by permanent_dict),
49      are written to sink,
50      pass through temporary_trns_chain (which transforms them into
51      the format described by dict),
52      and are finally passed to the procedure. */
53   struct casereader *source;
54   struct caseinit *caseinit;
55   struct trns_chain *permanent_trns_chain;
56   struct dictionary *permanent_dict;
57   struct casewriter *sink;
58   struct trns_chain *temporary_trns_chain;
59   struct dictionary *dict;
60
61   /* Callback which occurs whenever the transformation chain(s) have
62      been modified */
63   transformation_change_callback_func *xform_callback;
64   void *xform_callback_aux;
65
66   /* If true, cases are discarded instead of being written to
67      sink. */
68   bool discard_output;
69
70   /* The transformation chain that the next transformation will be
71      added to. */
72   struct trns_chain *cur_trns_chain;
73
74   /* The case map used to compact a case, if necessary;
75      otherwise a null pointer. */
76   struct case_map *compactor;
77
78   /* Time at which proc was last invoked. */
79   time_t last_proc_invocation;
80
81   /* Cases just before ("lagging") the current one. */
82   int n_lag;                    /* Number of cases to lag. */
83   struct deque lag;             /* Deque of lagged cases. */
84   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
85
86   /* Procedure data. */
87   enum
88     {
89       PROC_COMMITTED,           /* No procedure in progress. */
90       PROC_OPEN,                /* proc_open called, casereader still open. */
91       PROC_CLOSED               /* casereader from proc_open destroyed,
92                                    but proc_commit not yet called. */
93     }
94   proc_state;
95   casenumber cases_written;       /* Cases output so far. */
96   bool ok;                    /* Error status. */
97
98   void (*callback) (void *); /* Callback for when the dataset changes */
99   void *cb_data;
100
101 }; /* struct dataset */
102
103
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
106
107 static void update_last_proc_invocation (struct dataset *ds);
108
109 static void
110 dataset_set_unsaved (const struct dataset *ds)
111 {
112   if (ds->callback) ds->callback (ds->cb_data);
113 }
114
115 \f
116 /* Public functions. */
117
118 void
119 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
120 {
121   ds->callback = cb;
122   ds->cb_data = cb_data;
123 }
124
125
126 /* Returns the last time the data was read. */
127 time_t
128 time_of_last_procedure (struct dataset *ds)
129 {
130   if (ds->last_proc_invocation == 0)
131     update_last_proc_invocation (ds);
132   return ds->last_proc_invocation;
133 }
134 \f
135 /* Regular procedure. */
136
137 /* Executes any pending transformations, if necessary.
138    This is not identical to the EXECUTE command in that it won't
139    always read the source data.  This can be important when the
140    source data is given inline within BEGIN DATA...END FILE. */
141 bool
142 proc_execute (struct dataset *ds)
143 {
144   bool ok;
145
146   if ((ds->temporary_trns_chain == NULL
147        || trns_chain_is_empty (ds->temporary_trns_chain))
148       && trns_chain_is_empty (ds->permanent_trns_chain))
149     {
150       ds->n_lag = 0;
151       ds->discard_output = false;
152       dict_set_case_limit (ds->dict, 0);
153       dict_clear_vectors (ds->dict);
154       return true;
155     }
156
157   ok = casereader_destroy (proc_open (ds));
158   return proc_commit (ds) && ok;
159 }
160
161 static const struct casereader_class proc_casereader_class;
162
163 /* Opens dataset DS for reading cases with proc_read.
164    proc_commit must be called when done. */
165 struct casereader *
166 proc_open (struct dataset *ds)
167 {
168   assert (ds->source != NULL);
169   assert (ds->proc_state == PROC_COMMITTED);
170
171   update_last_proc_invocation (ds);
172
173   caseinit_mark_for_init (ds->caseinit, ds->dict);
174
175   /* Finish up the collection of transformations. */
176   add_case_limit_trns (ds);
177   add_filter_trns (ds);
178   trns_chain_finalize (ds->cur_trns_chain);
179
180   /* Make permanent_dict refer to the dictionary right before
181      data reaches the sink. */
182   if (ds->permanent_dict == NULL)
183     ds->permanent_dict = ds->dict;
184
185   /* Prepare sink. */
186   if (!ds->discard_output)
187     {
188       struct dictionary *pd = ds->permanent_dict;
189       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
190       if (compacted_value_cnt < dict_get_next_value_idx (pd))
191         {
192           struct caseproto *compacted_proto;
193           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
194           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
195           ds->sink = autopaging_writer_create (compacted_proto);
196           caseproto_unref (compacted_proto);
197         }
198       else
199         {
200           ds->compactor = NULL;
201           ds->sink = autopaging_writer_create (dict_get_proto (pd));
202         }
203     }
204   else
205     {
206       ds->compactor = NULL;
207       ds->sink = NULL;
208     }
209
210   /* Allocate memory for lagged cases. */
211   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
212
213   ds->proc_state = PROC_OPEN;
214   ds->cases_written = 0;
215   ds->ok = true;
216
217   /* FIXME: use taint in dataset in place of `ok'? */
218   /* FIXME: for trivial cases we can just return a clone of
219      ds->source? */
220   return casereader_create_sequential (NULL, dict_get_proto (ds->dict),
221                                        CASENUMBER_MAX,
222                                        &proc_casereader_class, ds);
223 }
224
225 /* Returns true if a procedure is in progress, that is, if
226    proc_open has been called but proc_commit has not. */
227 bool
228 proc_is_open (const struct dataset *ds)
229 {
230   return ds->proc_state != PROC_COMMITTED;
231 }
232
233 /* "read" function for procedure casereader. */
234 static struct ccase *
235 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
236 {
237   struct dataset *ds = ds_;
238   enum trns_result retval = TRNS_DROP_CASE;
239   struct ccase *c;
240
241   assert (ds->proc_state == PROC_OPEN);
242   for (; ; case_unref (c))
243     {
244       casenumber case_nr;
245
246       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
247       if (retval == TRNS_ERROR)
248         ds->ok = false;
249       if (!ds->ok)
250         return NULL;
251
252       /* Read a case from source. */
253       c = casereader_read (ds->source);
254       if (c == NULL)
255         return NULL;
256       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
257       caseinit_init_vars (ds->caseinit, c);
258
259       /* Execute permanent transformations.  */
260       case_nr = ds->cases_written + 1;
261       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
262                                    &c, case_nr);
263       caseinit_update_left_vars (ds->caseinit, c);
264       if (retval != TRNS_CONTINUE)
265         continue;
266
267       /* Write case to collection of lagged cases. */
268       if (ds->n_lag > 0)
269         {
270           while (deque_count (&ds->lag) >= ds->n_lag)
271             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
272           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
273         }
274
275       /* Write case to replacement active file. */
276       ds->cases_written++;
277       if (ds->sink != NULL)
278         casewriter_write (ds->sink,
279                           case_map_execute (ds->compactor, case_ref (c)));
280
281       /* Execute temporary transformations. */
282       if (ds->temporary_trns_chain != NULL)
283         {
284           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
285                                        &c, ds->cases_written);
286           if (retval != TRNS_CONTINUE)
287             continue;
288         }
289
290       return c;
291     }
292 }
293
294 /* "destroy" function for procedure casereader. */
295 static void
296 proc_casereader_destroy (struct casereader *reader, void *ds_)
297 {
298   struct dataset *ds = ds_;
299   struct ccase *c;
300
301   /* Make sure transformations happen for every input case, in
302      case they have side effects, and ensure that the replacement
303      active file gets all the cases it should. */
304   while ((c = casereader_read (reader)) != NULL)
305     case_unref (c);
306
307   ds->proc_state = PROC_CLOSED;
308   ds->ok = casereader_destroy (ds->source) && ds->ok;
309   ds->source = NULL;
310   proc_set_active_file_data (ds, NULL);
311 }
312
313 /* Must return false if the source casereader, a transformation,
314    or the sink casewriter signaled an error.  (If a temporary
315    transformation signals an error, then the return value is
316    false, but the replacement active file may still be
317    untainted.) */
318 bool
319 proc_commit (struct dataset *ds)
320 {
321   assert (ds->proc_state == PROC_CLOSED);
322   ds->proc_state = PROC_COMMITTED;
323
324   dataset_set_unsaved (ds);
325
326   /* Free memory for lagged cases. */
327   while (!deque_is_empty (&ds->lag))
328     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
329   free (ds->lag_cases);
330
331   /* Dictionary from before TEMPORARY becomes permanent. */
332   proc_cancel_temporary_transformations (ds);
333
334   if (!ds->discard_output)
335     {
336       /* Finish compacting. */
337       if (ds->compactor != NULL)
338         {
339           case_map_destroy (ds->compactor);
340           ds->compactor = NULL;
341
342           dict_delete_scratch_vars (ds->dict);
343           dict_compact_values (ds->dict);
344         }
345
346       /* Old data sink becomes new data source. */
347       if (ds->sink != NULL)
348         ds->source = casewriter_make_reader (ds->sink);
349     }
350   else
351     {
352       ds->source = NULL;
353       ds->discard_output = false;
354     }
355   ds->sink = NULL;
356
357   caseinit_clear (ds->caseinit);
358   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
359
360   dict_clear_vectors (ds->dict);
361   ds->permanent_dict = NULL;
362   return proc_cancel_all_transformations (ds) && ds->ok;
363 }
364
365 /* Casereader class for procedure execution. */
366 static const struct casereader_class proc_casereader_class =
367   {
368     proc_casereader_read,
369     proc_casereader_destroy,
370     NULL,
371     NULL,
372   };
373
374 /* Updates last_proc_invocation. */
375 static void
376 update_last_proc_invocation (struct dataset *ds)
377 {
378   ds->last_proc_invocation = time (NULL);
379 }
380 \f
381 /* Returns a pointer to the lagged case from N_BEFORE cases before the
382    current one, or NULL if there haven't been that many cases yet. */
383 const struct ccase *
384 lagged_case (const struct dataset *ds, int n_before)
385 {
386   assert (n_before >= 1);
387   assert (n_before <= ds->n_lag);
388
389   if (n_before <= deque_count (&ds->lag))
390     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
391   else
392     return NULL;
393 }
394 \f
395 /* Returns the current set of permanent transformations,
396    and clears the permanent transformations.
397    For use by INPUT PROGRAM. */
398 struct trns_chain *
399 proc_capture_transformations (struct dataset *ds)
400 {
401   struct trns_chain *chain;
402
403   assert (ds->temporary_trns_chain == NULL);
404   chain = ds->permanent_trns_chain;
405   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
406
407   if ( ds->xform_callback)
408     ds->xform_callback (false, ds->xform_callback_aux);
409
410   return chain;
411 }
412
413 /* Adds a transformation that processes a case with PROC and
414    frees itself with FREE to the current set of transformations.
415    The functions are passed AUX as auxiliary data. */
416 void
417 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
418 {
419   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
420   if ( ds->xform_callback)
421     ds->xform_callback (true, ds->xform_callback_aux);
422 }
423
424 /* Adds a transformation that processes a case with PROC and
425    frees itself with FREE to the current set of transformations.
426    When parsing of the block of transformations is complete,
427    FINALIZE will be called.
428    The functions are passed AUX as auxiliary data. */
429 void
430 add_transformation_with_finalizer (struct dataset *ds,
431                                    trns_finalize_func *finalize,
432                                    trns_proc_func *proc,
433                                    trns_free_func *free, void *aux)
434 {
435   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
436
437   if ( ds->xform_callback)
438     ds->xform_callback (true, ds->xform_callback_aux);
439 }
440
441 /* Returns the index of the next transformation.
442    This value can be returned by a transformation procedure
443    function to indicate a "jump" to that transformation. */
444 size_t
445 next_transformation (const struct dataset *ds)
446 {
447   return trns_chain_next (ds->cur_trns_chain);
448 }
449
450 /* Returns true if the next call to add_transformation() will add
451    a temporary transformation, false if it will add a permanent
452    transformation. */
453 bool
454 proc_in_temporary_transformations (const struct dataset *ds)
455 {
456   return ds->temporary_trns_chain != NULL;
457 }
458
459 /* Marks the start of temporary transformations.
460    Further calls to add_transformation() will add temporary
461    transformations. */
462 void
463 proc_start_temporary_transformations (struct dataset *ds)
464 {
465   if (!proc_in_temporary_transformations (ds))
466     {
467       add_case_limit_trns (ds);
468
469       ds->permanent_dict = dict_clone (ds->dict);
470
471       trns_chain_finalize (ds->permanent_trns_chain);
472       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
473
474       if ( ds->xform_callback)
475         ds->xform_callback (true, ds->xform_callback_aux);
476     }
477 }
478
479 /* Converts all the temporary transformations, if any, to
480    permanent transformations.  Further transformations will be
481    permanent.
482    Returns true if anything changed, false otherwise. */
483 bool
484 proc_make_temporary_transformations_permanent (struct dataset *ds)
485 {
486   if (proc_in_temporary_transformations (ds))
487     {
488       trns_chain_finalize (ds->temporary_trns_chain);
489       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
490       ds->temporary_trns_chain = NULL;
491
492       dict_destroy (ds->permanent_dict);
493       ds->permanent_dict = NULL;
494
495       return true;
496     }
497   else
498     return false;
499 }
500
501 /* Cancels all temporary transformations, if any.  Further
502    transformations will be permanent.
503    Returns true if anything changed, false otherwise. */
504 bool
505 proc_cancel_temporary_transformations (struct dataset *ds)
506 {
507   if (proc_in_temporary_transformations (ds))
508     {
509       dict_destroy (ds->dict);
510       ds->dict = ds->permanent_dict;
511       ds->permanent_dict = NULL;
512
513       trns_chain_destroy (ds->temporary_trns_chain);
514       ds->temporary_trns_chain = NULL;
515
516       if ( ds->xform_callback)
517         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
518                             ds->xform_callback_aux);
519
520       return true;
521     }
522   else
523     return false;
524 }
525
526 /* Cancels all transformations, if any.
527    Returns true if successful, false on I/O error. */
528 bool
529 proc_cancel_all_transformations (struct dataset *ds)
530 {
531   bool ok;
532   assert (ds->proc_state == PROC_COMMITTED);
533   ok = trns_chain_destroy (ds->permanent_trns_chain);
534   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
535   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
536   ds->temporary_trns_chain = NULL;
537   if ( ds->xform_callback)
538     ds->xform_callback (false, ds->xform_callback_aux);
539
540   return ok;
541 }
542 \f
543
544 static void
545 dict_callback (struct dictionary *d UNUSED, void *ds_)
546 {
547   struct dataset *ds = ds_;
548   dataset_set_unsaved (ds);
549 }
550
551 /* Initializes procedure handling. */
552 struct dataset *
553 create_dataset (void)
554 {
555   struct dataset *ds = xzalloc (sizeof(*ds));
556   ds->dict = dict_create ();
557
558   dict_set_change_callback (ds->dict, dict_callback, ds);
559
560   dict_set_encoding (ds->dict, get_default_encoding ());
561
562   ds->caseinit = caseinit_create ();
563   proc_cancel_all_transformations (ds);
564   return ds;
565 }
566
567
568 void
569 dataset_add_transform_change_callback (struct dataset *ds,
570                                        transformation_change_callback_func *cb,
571                                        void *aux)
572 {
573   ds->xform_callback = cb;
574   ds->xform_callback_aux = aux;
575 }
576
577 /* Finishes up procedure handling. */
578 void
579 destroy_dataset (struct dataset *ds)
580 {
581   proc_discard_active_file (ds);
582   dict_destroy (ds->dict);
583   caseinit_destroy (ds->caseinit);
584   trns_chain_destroy (ds->permanent_trns_chain);
585
586   if ( ds->xform_callback)
587     ds->xform_callback (false, ds->xform_callback_aux);
588   free (ds);
589 }
590
591 /* Causes output from the next procedure to be discarded, instead
592    of being preserved for use as input for the next procedure. */
593 void
594 proc_discard_output (struct dataset *ds)
595 {
596   ds->discard_output = true;
597 }
598
599 /* Discards the active file dictionary, data, and
600    transformations. */
601 void
602 proc_discard_active_file (struct dataset *ds)
603 {
604   assert (ds->proc_state == PROC_COMMITTED);
605
606   dict_clear (ds->dict);
607   fh_set_default_handle (NULL);
608
609   ds->n_lag = 0;
610
611   casereader_destroy (ds->source);
612   ds->source = NULL;
613
614   proc_cancel_all_transformations (ds);
615 }
616
617 /* Sets SOURCE as the source for procedure input for the next
618    procedure. */
619 void
620 proc_set_active_file (struct dataset *ds,
621                       struct casereader *source,
622                       struct dictionary *dict)
623 {
624   assert (ds->proc_state == PROC_COMMITTED);
625   assert (ds->dict != dict);
626
627   proc_discard_active_file (ds);
628
629   dict_destroy (ds->dict);
630   ds->dict = dict;
631   dict_set_change_callback (ds->dict, dict_callback, ds);
632
633   proc_set_active_file_data (ds, source);
634 }
635
636 /* Replaces the active file's data by READER without replacing
637    the associated dictionary. */
638 bool
639 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
640 {
641   casereader_destroy (ds->source);
642   ds->source = reader;
643
644   caseinit_clear (ds->caseinit);
645   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
646
647   return reader == NULL || !casereader_error (reader);
648 }
649
650 /* Returns true if an active file data source is available, false
651    otherwise. */
652 bool
653 proc_has_active_file (const struct dataset *ds)
654 {
655   return ds->source != NULL;
656 }
657
658 /* Returns the active file data source from DS, or a null pointer
659    if DS has no data source, and removes it from DS. */
660 struct casereader *
661 proc_extract_active_file_data (struct dataset *ds)
662 {
663   struct casereader *reader = ds->source;
664   ds->source = NULL;
665
666   return reader;
667 }
668
669 /* Checks whether DS has a corrupted active file.  If so,
670    discards it and returns false.  If not, returns true without
671    doing anything. */
672 bool
673 dataset_end_of_command (struct dataset *ds)
674 {
675   if (ds->source != NULL)
676     {
677       if (casereader_error (ds->source))
678         {
679           proc_discard_active_file (ds);
680           return false;
681         }
682       else
683         {
684           const struct taint *taint = casereader_get_taint (ds->source);
685           taint_reset_successor_taint ((struct taint *) taint);
686           assert (!taint_has_tainted_successor (taint));
687         }
688     }
689   return true;
690 }
691 \f
692 static trns_proc_func case_limit_trns_proc;
693 static trns_free_func case_limit_trns_free;
694
695 /* Adds a transformation that limits the number of cases that may
696    pass through, if DS->DICT has a case limit. */
697 static void
698 add_case_limit_trns (struct dataset *ds)
699 {
700   casenumber case_limit = dict_get_case_limit (ds->dict);
701   if (case_limit != 0)
702     {
703       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
704       *cases_remaining = case_limit;
705       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
706                           cases_remaining);
707       dict_set_case_limit (ds->dict, 0);
708     }
709 }
710
711 /* Limits the maximum number of cases processed to
712    *CASES_REMAINING. */
713 static int
714 case_limit_trns_proc (void *cases_remaining_,
715                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
716 {
717   size_t *cases_remaining = cases_remaining_;
718   if (*cases_remaining > 0)
719     {
720       (*cases_remaining)--;
721       return TRNS_CONTINUE;
722     }
723   else
724     return TRNS_DROP_CASE;
725 }
726
727 /* Frees the data associated with a case limit transformation. */
728 static bool
729 case_limit_trns_free (void *cases_remaining_)
730 {
731   size_t *cases_remaining = cases_remaining_;
732   free (cases_remaining);
733   return true;
734 }
735 \f
736 static trns_proc_func filter_trns_proc;
737
738 /* Adds a temporary transformation to filter data according to
739    the variable specified on FILTER, if any. */
740 static void
741 add_filter_trns (struct dataset *ds)
742 {
743   struct variable *filter_var = dict_get_filter (ds->dict);
744   if (filter_var != NULL)
745     {
746       proc_start_temporary_transformations (ds);
747       add_transformation (ds, filter_trns_proc, NULL, filter_var);
748     }
749 }
750
751 /* FILTER transformation. */
752 static int
753 filter_trns_proc (void *filter_var_,
754                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
755
756 {
757   struct variable *filter_var = filter_var_;
758   double f = case_num (*c, filter_var);
759   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
760           ? TRNS_CONTINUE : TRNS_DROP_CASE);
761 }
762
763
764 struct dictionary *
765 dataset_dict (const struct dataset *ds)
766 {
767   return ds->dict;
768 }
769
770 const struct casereader *
771 dataset_source (const struct dataset *ds)
772 {
773   return ds->source;
774 }
775
776 void
777 dataset_need_lag (struct dataset *ds, int n_before)
778 {
779   ds->n_lag = MAX (ds->n_lag, n_before);
780 }