7a9b432132c09158ce849147801f4a17bc4f64a4
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/deque.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 struct dataset {
43
44   /* An abstract factory which creates casefiles */
45   struct casefile_factory *cf_factory;
46
47   /* Callback which occurs when a procedure provides a new source for
48      the dataset */
49   replace_source_callback *replace_source ;
50
51   /* Callback which occurs whenever the DICT is replaced by a new one */
52   replace_dictionary_callback *replace_dict;
53
54   /* Cases are read from proc_source,
55      pass through permanent_trns_chain (which transforms them into
56      the format described by permanent_dict),
57      are written to proc_sink,
58      pass through temporary_trns_chain (which transforms them into
59      the format described by dict),
60      and are finally passed to the procedure. */
61   struct case_source *proc_source;
62   struct trns_chain *permanent_trns_chain;
63   struct dictionary *permanent_dict;
64   struct case_sink *proc_sink;
65   struct trns_chain *temporary_trns_chain;
66   struct dictionary *dict;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The compactor used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct dict_compactor *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct deque lag;             /* Deque of lagged cases. */
82   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
83
84   /* Procedure data. */
85   bool is_open;               /* Procedure open? */
86   struct ccase trns_case;     /* Case used for transformations. */
87   struct ccase sink_case;     /* Case written to sink, if
88                                  compacting is necessary. */
89   size_t cases_written;       /* Cases output so far. */
90   bool ok;
91 }; /* struct dataset */
92
93
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
96
97 static bool internal_procedure (struct dataset *ds, case_func *,
98                                 end_func *,
99                                 void *aux);
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void clear_case (const struct dataset *ds, struct ccase *c);
104 static bool close_active_file (struct dataset *ds);
105 \f
106 /* Public functions. */
107
108 /* Returns the last time the data was read. */
109 time_t
110 time_of_last_procedure (struct dataset *ds)
111 {
112   if (ds->last_proc_invocation == 0)
113     update_last_proc_invocation (ds);
114   return ds->last_proc_invocation;
115 }
116 \f
117 /* Regular procedure. */
118
119
120
121 /* Reads the data from the input program and writes it to a new
122    active file.  For each case we read from the input program, we
123    do the following:
124
125    1. Execute permanent transformations.  If these drop the case,
126       start the next case from step 1.
127
128    2. Write case to replacement active file.
129
130    3. Execute temporary transformations.  If these drop the case,
131       start the next case from step 1.
132
133    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
134
135    Returns true if successful, false if an I/O error occurred. */
136 bool
137 procedure (struct dataset *ds, case_func *cf, void *aux)
138 {
139   update_last_proc_invocation (ds);
140
141   /* Optimize the trivial case where we're not going to do
142      anything with the data, by not reading the data at all. */
143   if (cf == NULL
144       && case_source_is_class (ds->proc_source, &storage_source_class)
145       && ds->proc_sink == NULL
146       && (ds->temporary_trns_chain == NULL
147           || trns_chain_is_empty (ds->temporary_trns_chain))
148       && trns_chain_is_empty (ds->permanent_trns_chain))
149     {
150       ds->n_lag = 0;
151       dict_set_case_limit (ds->dict, 0);
152       dict_clear_vectors (ds->dict);
153       return true;
154     }
155
156   return internal_procedure (ds, cf, NULL, aux);
157 }
158 \f
159 /* Multipass procedure. */
160
161 struct multipass_aux_data
162   {
163     struct casefile *casefile;
164
165     bool (*proc_func) (const struct casefile *, void *aux);
166     void *aux;
167   };
168
169 /* Case processing function for multipass_procedure(). */
170 static bool
171 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
172 {
173   struct multipass_aux_data *aux_data = aux_data_;
174   return casefile_append (aux_data->casefile, c);
175 }
176
177 /* End-of-file function for multipass_procedure(). */
178 static bool
179 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
180 {
181   struct multipass_aux_data *aux_data = aux_data_;
182   return (aux_data->proc_func == NULL
183           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
184 }
185
186 /* Procedure that allows multiple passes over the input data.
187    The entire active file is passed to PROC_FUNC, with the given
188    AUX as auxiliary data, as a unit. */
189 bool
190 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
191 {
192   struct multipass_aux_data aux_data;
193   bool ok;
194
195   aux_data.casefile =
196     ds->cf_factory->create_casefile (ds->cf_factory,
197                                      dict_get_next_value_idx (ds->dict));
198
199   aux_data.proc_func = proc_func;
200   aux_data.aux = aux;
201
202   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
203   ok = !casefile_error (aux_data.casefile) && ok;
204
205   casefile_destroy (aux_data.casefile);
206
207   return ok;
208 }
209 \f
210
211 /* Procedure implementation. */
212
213 /* Executes a procedure.
214    Passes each case to CASE_FUNC.
215    Calls END_FUNC after the last case.
216    Returns true if successful, false if an I/O error occurred (or
217    if CASE_FUNC or END_FUNC ever returned false). */
218 static bool
219 internal_procedure (struct dataset *ds, case_func *proc,
220                     end_func *end,
221                     void *aux)
222 {
223   struct ccase *c;
224   bool ok = true;
225
226   proc_open (ds);
227   while (ok && proc_read (ds, &c))
228     if (proc != NULL)
229       ok = proc (c, aux, ds) && ok;
230   if (end != NULL)
231     ok = end (aux, ds) && ok;
232
233   if ( proc_close (ds) && ok )
234     {
235
236       return true;
237     }
238
239   return false;
240 }
241
242 /* Opens dataset DS for reading cases with proc_read.
243    proc_close must be called when done. */
244 void
245 proc_open (struct dataset *ds)
246 {
247   assert (ds->proc_source != NULL);
248   assert (!ds->is_open);
249
250   update_last_proc_invocation (ds);
251
252   open_active_file (ds);
253
254   ds->is_open = true;
255   create_trns_case (&ds->trns_case, ds->dict);
256   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
257   ds->cases_written = 0;
258   ds->ok = true;
259 }
260
261 /* Reads the next case from dataset DS, which must have been
262    opened for reading with proc_open.
263    Returns true if successful, in which case a pointer to the
264    case is stored in *C.
265    Return false at end of file or if a read error occurs.  In
266    this case a null pointer is stored in *C. */
267 bool
268 proc_read (struct dataset *ds, struct ccase **c)
269 {
270   enum trns_result retval = TRNS_DROP_CASE;
271
272   assert (ds->is_open);
273   *c = NULL;
274   for (;;)
275     {
276       size_t case_nr;
277
278       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
279       if (retval == TRNS_ERROR)
280         ds->ok = false;
281       if (!ds->ok)
282         return false;
283
284       /* Read a case from proc_source. */
285       clear_case (ds, &ds->trns_case);
286       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
287         return false;
288
289       /* Execute permanent transformations.  */
290       case_nr = ds->cases_written + 1;
291       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
292                                    &ds->trns_case, &case_nr);
293       if (retval != TRNS_CONTINUE)
294         continue;
295
296       /* Write case to collection of lagged cases. */
297       if (ds->n_lag > 0) 
298         {
299           while (deque_count (&ds->lag) >= ds->n_lag)
300             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
301           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)],
302                       &ds->trns_case);
303         }
304
305       /* Write case to replacement active file. */
306       ds->cases_written++;
307       if (ds->proc_sink->class->write != NULL)
308         {
309           if (ds->compactor != NULL)
310             {
311               dict_compactor_compact (ds->compactor, &ds->sink_case,
312                                       &ds->trns_case);
313               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
314             }
315           else
316             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
317         }
318
319       /* Execute temporary transformations. */
320       if (ds->temporary_trns_chain != NULL)
321         {
322           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
323                                        &ds->trns_case, &ds->cases_written);
324           if (retval != TRNS_CONTINUE)
325             continue;
326         }
327
328       *c = &ds->trns_case;
329       return true;
330     }
331 }
332
333 /* Closes dataset DS for reading.
334    Returns true if successful, false if an I/O error occurred
335    while reading or closing the data set.
336    If DS has not been opened, returns true without doing
337    anything else. */
338 bool
339 proc_close (struct dataset *ds)
340 {
341   if (!ds->is_open)
342     return true;
343
344   /* Drain any remaining cases. */
345   while (ds->ok)
346     {
347       struct ccase *c;
348       if (!proc_read (ds, &c))
349         break;
350     }
351   ds->ok = free_case_source (ds->proc_source) && ds->ok;
352   proc_set_source (ds, NULL);
353
354   case_destroy (&ds->sink_case);
355   case_destroy (&ds->trns_case);
356
357   ds->ok = close_active_file (ds) && ds->ok;
358   ds->is_open = false;
359
360   return ds->ok;
361 }
362
363 /* Updates last_proc_invocation. */
364 static void
365 update_last_proc_invocation (struct dataset *ds)
366 {
367   ds->last_proc_invocation = time (NULL);
368 }
369
370 /* Creates and returns a case, initializing it from the vectors
371    that say which `value's need to be initialized just once, and
372    which ones need to be re-initialized before every case. */
373 static void
374 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
375 {
376   size_t var_cnt = dict_get_var_cnt (dict);
377   size_t i;
378
379   case_create (trns_case, dict_get_next_value_idx (dict));
380   for (i = 0; i < var_cnt; i++)
381     {
382       struct variable *v = dict_get_var (dict, i);
383       union value *value = case_data_rw (trns_case, v);
384
385       if (var_is_numeric (v))
386         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
387       else
388         memset (value->s, ' ', var_get_width (v));
389     }
390 }
391
392 /* Makes all preparations for reading from the data source and writing
393    to the data sink. */
394 static void
395 open_active_file (struct dataset *ds)
396 {
397   add_case_limit_trns (ds);
398   add_filter_trns (ds);
399
400   /* Finalize transformations. */
401   trns_chain_finalize (ds->cur_trns_chain);
402
403   /* Make permanent_dict refer to the dictionary right before
404      data reaches the sink. */
405   if (ds->permanent_dict == NULL)
406     ds->permanent_dict = ds->dict;
407
408   /* Figure out whether to compact. */
409   ds->compactor =
410     (dict_compacting_would_shrink (ds->permanent_dict)
411      ? dict_make_compactor (ds->permanent_dict)
412      : NULL);
413
414   /* Prepare sink. */
415   if (ds->proc_sink == NULL)
416     ds->proc_sink = create_case_sink (&storage_sink_class,
417                                       ds->permanent_dict,
418                                       ds->cf_factory,
419                                       NULL);
420   if (ds->proc_sink->class->open != NULL)
421     ds->proc_sink->class->open (ds->proc_sink);
422
423   /* Allocate memory for lagged cases. */
424   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
425 }
426
427 /* Clears the variables in C that need to be cleared between
428    processing cases.  */
429 static void
430 clear_case (const struct dataset *ds, struct ccase *c)
431 {
432   size_t var_cnt = dict_get_var_cnt (ds->dict);
433   size_t i;
434
435   for (i = 0; i < var_cnt; i++)
436     {
437       struct variable *v = dict_get_var (ds->dict, i);
438       if (!var_get_leave (v))
439         {
440           if (var_is_numeric (v))
441             case_data_rw (c, v)->f = SYSMIS;
442           else
443             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
444         }
445     }
446 }
447
448 /* Closes the active file. */
449 static bool
450 close_active_file (struct dataset *ds)
451 {
452   /* Free memory for lagged cases. */
453   while (!deque_is_empty (&ds->lag))
454     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
455   free (ds->lag_cases);
456
457   /* Dictionary from before TEMPORARY becomes permanent. */
458   proc_cancel_temporary_transformations (ds);
459
460   /* Finish compacting. */
461   if (ds->compactor != NULL)
462     {
463       dict_compactor_destroy (ds->compactor);
464       dict_compact_values (ds->dict);
465       ds->compactor = NULL;
466     }
467
468   /* Old data sink becomes new data source. */
469   if (ds->proc_sink->class->make_source != NULL)
470     proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
471   free_case_sink (ds->proc_sink);
472   ds->proc_sink = NULL;
473
474   dict_clear_vectors (ds->dict);
475   ds->permanent_dict = NULL;
476   return proc_cancel_all_transformations (ds);
477 }
478 \f
479 /* Returns a pointer to the lagged case from N_BEFORE cases before the
480    current one, or NULL if there haven't been that many cases yet. */
481 struct ccase *
482 lagged_case (const struct dataset *ds, int n_before)
483 {
484   assert (n_before >= 1);
485   assert (n_before <= ds->n_lag);
486
487   if (n_before <= deque_count (&ds->lag))
488     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
489   else
490     return NULL;
491 }
492 \f
493 /* Procedure that separates the data into SPLIT FILE groups. */
494
495 /* Represents auxiliary data for handling SPLIT FILE. */
496 struct split_aux_data
497   {
498     struct dataset *dataset;    /* The dataset */
499     struct ccase prev_case;     /* Data in previous case. */
500
501     /* Callback functions. */
502     begin_func *begin;
503     case_func *proc;
504     end_func *end;
505     void *func_aux;
506   };
507
508 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
509 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
510 static bool split_procedure_end_func (void *, const struct dataset *);
511
512 /* Like procedure(), but it automatically breaks the case stream
513    into SPLIT FILE break groups.  Before each group of cases with
514    identical SPLIT FILE variable values, BEGIN_FUNC is called
515    with the first case in the group.
516    Then PROC_FUNC is called for each case in the group (including
517    the first).
518    END_FUNC is called when the group is finished.  FUNC_AUX is
519    passed to each of the functions as auxiliary data.
520
521    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
522    and END_FUNC will be called at all.
523
524    If SPLIT FILE is not in effect, then there is one break group
525    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
526    will be called once.
527
528    Returns true if successful, false if an I/O error occurred. */
529 bool
530 procedure_with_splits (struct dataset *ds,
531                        begin_func begin,
532                        case_func *proc,
533                        end_func *end,
534                        void *func_aux)
535 {
536   struct split_aux_data split_aux;
537   bool ok;
538
539   case_nullify (&split_aux.prev_case);
540   split_aux.begin = begin;
541   split_aux.proc = proc;
542   split_aux.end = end;
543   split_aux.func_aux = func_aux;
544   split_aux.dataset = ds;
545
546   ok = internal_procedure (ds, split_procedure_case_func,
547                            split_procedure_end_func, &split_aux);
548
549   case_destroy (&split_aux.prev_case);
550
551   return ok;
552 }
553
554 /* Case callback used by procedure_with_splits(). */
555 static bool
556 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
557 {
558   struct split_aux_data *split_aux = split_aux_;
559
560   /* Start a new series if needed. */
561   if (case_is_null (&split_aux->prev_case)
562       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
563     {
564       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
565         split_aux->end (split_aux->func_aux, ds);
566
567       case_destroy (&split_aux->prev_case);
568       case_clone (&split_aux->prev_case, c);
569
570       if (split_aux->begin != NULL)
571         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
572     }
573
574   return (split_aux->proc == NULL
575           || split_aux->proc (c, split_aux->func_aux, ds));
576 }
577
578 /* End-of-file callback used by procedure_with_splits(). */
579 static bool
580 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
581 {
582   struct split_aux_data *split_aux = split_aux_;
583
584   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
585     split_aux->end (split_aux->func_aux, ds);
586   return true;
587 }
588
589 /* Compares the SPLIT FILE variables in cases A and B and returns
590    nonzero only if they differ. */
591 static int
592 equal_splits (const struct ccase *a, const struct ccase *b,
593               const struct dataset *ds)
594 {
595   return case_compare (a, b,
596                        dict_get_split_vars (ds->dict),
597                        dict_get_split_cnt (ds->dict)) == 0;
598 }
599 \f
600 /* Multipass procedure that separates the data into SPLIT FILE
601    groups. */
602
603 /* Represents auxiliary data for handling SPLIT FILE in a
604    multipass procedure. */
605 struct multipass_split_aux_data
606   {
607     struct dataset *dataset;    /* The dataset of the split */
608     struct ccase prev_case;     /* Data in previous case. */
609     struct casefile *casefile;  /* Accumulates data for a split. */
610     split_func *split;          /* Function to call with the accumulated
611                                    data. */
612     void *func_aux;             /* Auxiliary data. */
613   };
614
615 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
616 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
617 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
618
619 /* Returns true if successful, false if an I/O error occurred. */
620 bool
621 multipass_procedure_with_splits (struct dataset *ds,
622                                  split_func  *split,
623                                  void *func_aux)
624 {
625   struct multipass_split_aux_data aux;
626   bool ok;
627
628   case_nullify (&aux.prev_case);
629   aux.casefile = NULL;
630   aux.split = split;
631   aux.func_aux = func_aux;
632   aux.dataset = ds;
633
634   ok = internal_procedure (ds, multipass_split_case_func,
635                            multipass_split_end_func, &aux);
636   case_destroy (&aux.prev_case);
637
638   return ok;
639 }
640
641 /* Case callback used by multipass_procedure_with_splits(). */
642 static bool
643 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
644 {
645   struct multipass_split_aux_data *aux = aux_;
646   bool ok = true;
647
648   /* Start a new series if needed. */
649   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
650     {
651       /* Record split values. */
652       case_destroy (&aux->prev_case);
653       case_clone (&aux->prev_case, c);
654
655       /* Pass any cases to split_func. */
656       if (aux->casefile != NULL)
657         ok = multipass_split_output (aux, ds);
658
659       /* Start a new casefile. */
660       aux->casefile =
661         ds->cf_factory->create_casefile (ds->cf_factory,
662                                          dict_get_next_value_idx (ds->dict));
663     }
664
665   return casefile_append (aux->casefile, c) && ok;
666 }
667
668 /* End-of-file callback used by multipass_procedure_with_splits(). */
669 static bool
670 multipass_split_end_func (void *aux_, const struct dataset *ds)
671 {
672   struct multipass_split_aux_data *aux = aux_;
673   return (aux->casefile == NULL || multipass_split_output (aux, ds));
674 }
675
676 static bool
677 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
678 {
679   bool ok;
680
681   assert (aux->casefile != NULL);
682   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
683   casefile_destroy (aux->casefile);
684   aux->casefile = NULL;
685
686   return ok;
687 }
688 \f
689 /* Discards all the current state in preparation for a data-input
690    command like DATA LIST or GET. */
691 void
692 discard_variables (struct dataset *ds)
693 {
694   dict_clear (ds->dict);
695   fh_set_default_handle (NULL);
696
697   ds->n_lag = 0;
698
699   free_case_source (ds->proc_source);
700   proc_set_source (ds, NULL);
701
702   proc_cancel_all_transformations (ds);
703 }
704 \f
705 /* Returns the current set of permanent transformations,
706    and clears the permanent transformations.
707    For use by INPUT PROGRAM. */
708 struct trns_chain *
709 proc_capture_transformations (struct dataset *ds)
710 {
711   struct trns_chain *chain;
712
713   assert (ds->temporary_trns_chain == NULL);
714   chain = ds->permanent_trns_chain;
715   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
716   return chain;
717 }
718
719 /* Adds a transformation that processes a case with PROC and
720    frees itself with FREE to the current set of transformations.
721    The functions are passed AUX as auxiliary data. */
722 void
723 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
724 {
725   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
726 }
727
728 /* Adds a transformation that processes a case with PROC and
729    frees itself with FREE to the current set of transformations.
730    When parsing of the block of transformations is complete,
731    FINALIZE will be called.
732    The functions are passed AUX as auxiliary data. */
733 void
734 add_transformation_with_finalizer (struct dataset *ds,
735                                    trns_finalize_func *finalize,
736                                    trns_proc_func *proc,
737                                    trns_free_func *free, void *aux)
738 {
739   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
740 }
741
742 /* Returns the index of the next transformation.
743    This value can be returned by a transformation procedure
744    function to indicate a "jump" to that transformation. */
745 size_t
746 next_transformation (const struct dataset *ds)
747 {
748   return trns_chain_next (ds->cur_trns_chain);
749 }
750
751 /* Returns true if the next call to add_transformation() will add
752    a temporary transformation, false if it will add a permanent
753    transformation. */
754 bool
755 proc_in_temporary_transformations (const struct dataset *ds)
756 {
757   return ds->temporary_trns_chain != NULL;
758 }
759
760 /* Marks the start of temporary transformations.
761    Further calls to add_transformation() will add temporary
762    transformations. */
763 void
764 proc_start_temporary_transformations (struct dataset *ds)
765 {
766   if (!proc_in_temporary_transformations (ds))
767     {
768       add_case_limit_trns (ds);
769
770       ds->permanent_dict = dict_clone (ds->dict);
771
772       trns_chain_finalize (ds->permanent_trns_chain);
773       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
774     }
775 }
776
777 /* Converts all the temporary transformations, if any, to
778    permanent transformations.  Further transformations will be
779    permanent.
780    Returns true if anything changed, false otherwise. */
781 bool
782 proc_make_temporary_transformations_permanent (struct dataset *ds)
783 {
784   if (proc_in_temporary_transformations (ds))
785     {
786       trns_chain_finalize (ds->temporary_trns_chain);
787       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
788       ds->temporary_trns_chain = NULL;
789
790       dict_destroy (ds->permanent_dict);
791       ds->permanent_dict = NULL;
792
793       return true;
794     }
795   else
796     return false;
797 }
798
799 /* Cancels all temporary transformations, if any.  Further
800    transformations will be permanent.
801    Returns true if anything changed, false otherwise. */
802 bool
803 proc_cancel_temporary_transformations (struct dataset *ds)
804 {
805   if (proc_in_temporary_transformations (ds))
806     {
807       dataset_set_dict (ds, ds->permanent_dict);
808       ds->permanent_dict = NULL;
809
810       trns_chain_destroy (ds->temporary_trns_chain);
811       ds->temporary_trns_chain = NULL;
812
813       return true;
814     }
815   else
816     return false;
817 }
818
819 /* Cancels all transformations, if any.
820    Returns true if successful, false on I/O error. */
821 bool
822 proc_cancel_all_transformations (struct dataset *ds)
823 {
824   bool ok;
825   ok = trns_chain_destroy (ds->permanent_trns_chain);
826   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
827   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
828   ds->temporary_trns_chain = NULL;
829   return ok;
830 }
831 \f
832 /* Initializes procedure handling. */
833 struct dataset *
834 create_dataset (struct casefile_factory *fact,
835                 replace_source_callback *rps,
836                 replace_dictionary_callback *rds
837                 )
838 {
839   struct dataset *ds = xzalloc (sizeof(*ds));
840   ds->dict = dict_create ();
841   ds->cf_factory = fact;
842   ds->replace_source = rps;
843   ds->replace_dict = rds;
844   proc_cancel_all_transformations (ds);
845   return ds;
846 }
847
848 /* Finishes up procedure handling. */
849 void
850 destroy_dataset (struct dataset *ds)
851 {
852   discard_variables (ds);
853   dict_destroy (ds->dict);
854   trns_chain_destroy (ds->permanent_trns_chain);
855   free (ds);
856 }
857
858 /* Sets SINK as the destination for procedure output from the
859    next procedure. */
860 void
861 proc_set_sink (struct dataset *ds, struct case_sink *sink)
862 {
863   assert (ds->proc_sink == NULL);
864   ds->proc_sink = sink;
865 }
866
867 /* Sets SOURCE as the source for procedure input for the next
868    procedure. */
869 void
870 proc_set_source (struct dataset *ds, struct case_source *source)
871 {
872   ds->proc_source = source;
873
874   if ( ds->replace_source )
875     ds->replace_source (ds->proc_source);
876 }
877
878 /* Returns true if a source for the next procedure has been
879    configured, false otherwise. */
880 bool
881 proc_has_source (const struct dataset *ds)
882 {
883   return ds->proc_source != NULL;
884 }
885
886 /* Returns the output from the previous procedure.
887    For use only immediately after executing a procedure.
888    The returned casefile is owned by the caller; it will not be
889    automatically used for the next procedure's input. */
890 struct casefile *
891 proc_capture_output (struct dataset *ds)
892 {
893   struct casefile *casefile;
894
895   /* Try to make sure that this function is called immediately
896      after procedure() or a similar function. */
897   assert (ds->proc_source != NULL);
898   assert (case_source_is_class (ds->proc_source, &storage_source_class));
899   assert (trns_chain_is_empty (ds->permanent_trns_chain));
900   assert (!proc_in_temporary_transformations (ds));
901
902   casefile = storage_source_decapsulate (ds->proc_source);
903   proc_set_source (ds, NULL);
904
905   return casefile;
906 }
907 \f
908 static trns_proc_func case_limit_trns_proc;
909 static trns_free_func case_limit_trns_free;
910
911 /* Adds a transformation that limits the number of cases that may
912    pass through, if DS->DICT has a case limit. */
913 static void
914 add_case_limit_trns (struct dataset *ds)
915 {
916   size_t case_limit = dict_get_case_limit (ds->dict);
917   if (case_limit != 0)
918     {
919       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
920       *cases_remaining = case_limit;
921       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
922                           cases_remaining);
923       dict_set_case_limit (ds->dict, 0);
924     }
925 }
926
927 /* Limits the maximum number of cases processed to
928    *CASES_REMAINING. */
929 static int
930 case_limit_trns_proc (void *cases_remaining_,
931                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
932 {
933   size_t *cases_remaining = cases_remaining_;
934   if (*cases_remaining > 0)
935     {
936       (*cases_remaining)--;
937       return TRNS_CONTINUE;
938     }
939   else
940     return TRNS_DROP_CASE;
941 }
942
943 /* Frees the data associated with a case limit transformation. */
944 static bool
945 case_limit_trns_free (void *cases_remaining_)
946 {
947   size_t *cases_remaining = cases_remaining_;
948   free (cases_remaining);
949   return true;
950 }
951 \f
952 static trns_proc_func filter_trns_proc;
953
954 /* Adds a temporary transformation to filter data according to
955    the variable specified on FILTER, if any. */
956 static void
957 add_filter_trns (struct dataset *ds)
958 {
959   struct variable *filter_var = dict_get_filter (ds->dict);
960   if (filter_var != NULL)
961     {
962       proc_start_temporary_transformations (ds);
963       add_transformation (ds, filter_trns_proc, NULL, filter_var);
964     }
965 }
966
967 /* FILTER transformation. */
968 static int
969 filter_trns_proc (void *filter_var_,
970                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
971
972 {
973   struct variable *filter_var = filter_var_;
974   double f = case_num (c, filter_var);
975   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
976           ? TRNS_CONTINUE : TRNS_DROP_CASE);
977 }
978
979
980 struct dictionary *
981 dataset_dict (const struct dataset *ds)
982 {
983   return ds->dict;
984 }
985
986
987 /* Set or replace dataset DS's dictionary with DICT.
988    The old dictionary is destroyed */
989 void
990 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
991 {
992   struct dictionary *old_dict = ds->dict;
993
994   dict_copy_callbacks (dict, ds->dict);
995   ds->dict = dict;
996
997   if ( ds->replace_dict )
998     ds->replace_dict (dict);
999
1000   dict_destroy (old_dict);
1001 }
1002
1003 void 
1004 dataset_need_lag (struct dataset *ds, int n_before)
1005 {
1006   ds->n_lag = MAX (ds->n_lag, n_before);
1007 }
1008
1009 struct casefile_factory *
1010 dataset_get_casefile_factory (const struct dataset *ds)
1011 {
1012   return ds->cf_factory;
1013 }
1014