Make it possible to pull cases from the active file with a
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 struct dataset {
42   /* Cases are read from proc_source,
43      pass through permanent_trns_chain (which transforms them into
44      the format described by permanent_dict),
45      are written to proc_sink,
46      pass through temporary_trns_chain (which transforms them into
47      the format described by dict),
48      and are finally passed to the procedure. */
49   struct case_source *proc_source;
50   struct trns_chain *permanent_trns_chain;
51   struct dictionary *permanent_dict;
52   struct case_sink *proc_sink;
53   struct trns_chain *temporary_trns_chain;
54   struct dictionary *dict;
55
56   /* The transformation chain that the next transformation will be
57      added to. */
58   struct trns_chain *cur_trns_chain;
59
60   /* The compactor used to compact a case, if necessary;
61      otherwise a null pointer. */
62   struct dict_compactor *compactor;
63
64   /* Time at which proc was last invoked. */
65   time_t last_proc_invocation;
66
67   /* Lag queue. */
68   int n_lag;                    /* Number of cases to lag. */
69   int lag_count;                /* Number of cases in lag_queue so far. */
70   int lag_head;         /* Index where next case will be added. */
71   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
72
73   /* Procedure data. */
74   bool is_open;               /* Procedure open? */
75   struct ccase trns_case;     /* Case used for transformations. */
76   struct ccase sink_case;     /* Case written to sink, if
77                                  compacting is necessary. */
78   size_t cases_written;       /* Cases output so far. */
79   bool ok;
80 }; /* struct dataset */
81
82
83 static void add_case_limit_trns (struct dataset *ds);
84 static void add_filter_trns (struct dataset *ds);
85
86 static bool internal_procedure (struct dataset *ds, case_func *,
87                                 end_func *,
88                                 void *aux);
89 static void update_last_proc_invocation (struct dataset *ds);
90 static void create_trns_case (struct ccase *, struct dictionary *);
91 static void open_active_file (struct dataset *ds);
92 static void lag_case (struct dataset *ds, const struct ccase *c);
93 static void clear_case (const struct dataset *ds, struct ccase *c);
94 static bool close_active_file (struct dataset *ds);
95 \f
96 /* Public functions. */
97
98 /* Returns the last time the data was read. */
99 time_t
100 time_of_last_procedure (struct dataset *ds) 
101 {
102   if (ds->last_proc_invocation == 0)
103     update_last_proc_invocation (ds);
104   return ds->last_proc_invocation;
105 }
106 \f
107 /* Regular procedure. */
108
109
110
111 /* Reads the data from the input program and writes it to a new
112    active file.  For each case we read from the input program, we
113    do the following:
114
115    1. Execute permanent transformations.  If these drop the case,
116       start the next case from step 1.
117
118    2. Write case to replacement active file.
119    
120    3. Execute temporary transformations.  If these drop the case,
121       start the next case from step 1.
122       
123    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
124
125    Returns true if successful, false if an I/O error occurred. */
126 bool
127 procedure (struct dataset *ds, case_func *cf, void *aux)
128 {
129   update_last_proc_invocation (ds);
130
131   /* Optimize the trivial case where we're not going to do
132      anything with the data, by not reading the data at all. */
133   if (cf == NULL
134       && case_source_is_class (ds->proc_source, &storage_source_class)
135       && ds->proc_sink == NULL
136       && (ds->temporary_trns_chain == NULL
137           || trns_chain_is_empty (ds->temporary_trns_chain))
138       && trns_chain_is_empty (ds->permanent_trns_chain))
139     {
140       ds->n_lag = 0;
141       dict_set_case_limit (ds->dict, 0);
142       dict_clear_vectors (ds->dict);
143       return true;
144     }
145
146   return internal_procedure (ds, cf, NULL, aux);
147 }
148 \f
149 /* Multipass procedure. */
150
151 struct multipass_aux_data 
152   {
153     struct casefile *casefile;
154     
155     bool (*proc_func) (const struct casefile *, void *aux);
156     void *aux;
157   };
158
159 /* Case processing function for multipass_procedure(). */
160 static bool
161 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
162 {
163   struct multipass_aux_data *aux_data = aux_data_;
164   return casefile_append (aux_data->casefile, c);
165 }
166
167 /* End-of-file function for multipass_procedure(). */
168 static bool
169 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
170 {
171   struct multipass_aux_data *aux_data = aux_data_;
172   return (aux_data->proc_func == NULL
173           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
174 }
175
176 /* Procedure that allows multiple passes over the input data.
177    The entire active file is passed to PROC_FUNC, with the given
178    AUX as auxiliary data, as a unit. */
179 bool
180 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
181 {
182   struct multipass_aux_data aux_data;
183   bool ok;
184
185   aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
186   aux_data.proc_func = proc_func;
187   aux_data.aux = aux;
188
189   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
190   ok = !casefile_error (aux_data.casefile) && ok;
191
192   casefile_destroy (aux_data.casefile);
193
194   return ok;
195 }
196 \f
197 /* Procedure implementation. */
198
199 /* Executes a procedure.
200    Passes each case to CASE_FUNC.
201    Calls END_FUNC after the last case.
202    Returns true if successful, false if an I/O error occurred (or
203    if CASE_FUNC or END_FUNC ever returned false). */
204 static bool
205 internal_procedure (struct dataset *ds, case_func *proc,
206                     end_func *end,
207                     void *aux) 
208 {
209   struct ccase *c;
210   bool ok = true;
211   
212   proc_open (ds);
213   while (ok && proc_read (ds, &c))
214     if (proc != NULL)
215       ok = proc (c, aux, ds) && ok;
216   if (end != NULL)
217     ok = end (aux, ds) && ok;
218   return proc_close (ds) && ok;
219 }
220
221 /* Opens dataset DS for reading cases with proc_read.
222    proc_close must be called when done. */
223 void
224 proc_open (struct dataset *ds)
225 {
226   assert (ds->proc_source != NULL);
227   assert (!ds->is_open);
228
229   update_last_proc_invocation (ds);
230
231   open_active_file (ds);
232
233   ds->is_open = true;
234   create_trns_case (&ds->trns_case, ds->dict);
235   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
236   ds->cases_written = 0;
237   ds->ok = true;
238 }
239
240 /* Reads the next case from dataset DS, which must have been
241    opened for reading with proc_open.
242    Returns true if successful, in which case a pointer to the
243    case is stored in *C.
244    Return false at end of file or if a read error occurs.  In
245    this case a null pointer is stored in *C. */
246 bool
247 proc_read (struct dataset *ds, struct ccase **c) 
248 {
249   enum trns_result retval = TRNS_DROP_CASE;
250
251   assert (ds->is_open);
252   *c = NULL;
253   for (;;) 
254     {
255       size_t case_nr;
256
257       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
258       if (retval == TRNS_ERROR)
259         ds->ok = false;
260       if (!ds->ok)
261         return false;
262
263       /* Read a case from proc_source. */
264       clear_case (ds, &ds->trns_case);
265       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
266         return false;
267
268       /* Execute permanent transformations.  */
269       case_nr = ds->cases_written + 1;
270       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
271                                    &ds->trns_case, &case_nr);
272       if (retval != TRNS_CONTINUE)
273         continue;
274   
275       /* Write case to LAG queue. */
276       if (ds->n_lag)
277         lag_case (ds, &ds->trns_case);
278
279       /* Write case to replacement active file. */
280       ds->cases_written++;
281       if (ds->proc_sink->class->write != NULL) 
282         {
283           if (ds->compactor != NULL) 
284             {
285               dict_compactor_compact (ds->compactor, &ds->sink_case,
286                                       &ds->trns_case);
287               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
288             }
289           else
290             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
291         }
292   
293       /* Execute temporary transformations. */
294       if (ds->temporary_trns_chain != NULL) 
295         {
296           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
297                                        &ds->trns_case, &ds->cases_written);
298           if (retval != TRNS_CONTINUE)
299             continue;
300         }
301
302       *c = &ds->trns_case;
303       return true; 
304     }
305 }
306
307 /* Closes dataset DS for reading.
308    Returns true if successful, false if an I/O error occurred
309    while reading or closing the data set.
310    If DS has not been opened, returns true without doing
311    anything else. */
312 bool
313 proc_close (struct dataset *ds) 
314 {
315   if (!ds->is_open)
316     return true;
317
318   /* Drain any remaining cases. */
319   while (ds->ok) 
320     {
321       struct ccase *c;
322       if (!proc_read (ds, &c))
323         break; 
324     }
325   
326   ds->ok = free_case_source (ds->proc_source) && ds->ok;
327   ds->proc_source = NULL;
328
329   case_destroy (&ds->sink_case);
330   case_destroy (&ds->trns_case);
331
332   ds->ok = close_active_file (ds) && ds->ok;
333   ds->is_open = false;
334
335   return ds->ok;
336 }
337
338 /* Updates last_proc_invocation. */
339 static void
340 update_last_proc_invocation (struct dataset *ds) 
341 {
342   ds->last_proc_invocation = time (NULL);
343 }
344
345 /* Creates and returns a case, initializing it from the vectors
346    that say which `value's need to be initialized just once, and
347    which ones need to be re-initialized before every case. */
348 static void
349 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
350 {
351   size_t var_cnt = dict_get_var_cnt (dict);
352   size_t i;
353
354   case_create (trns_case, dict_get_next_value_idx (dict));
355   for (i = 0; i < var_cnt; i++) 
356     {
357       struct variable *v = dict_get_var (dict, i);
358       union value *value = case_data_rw (trns_case, v);
359
360       if (var_is_numeric (v))
361         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
362       else
363         memset (value->s, ' ', var_get_width (v));
364     }
365 }
366
367 /* Makes all preparations for reading from the data source and writing
368    to the data sink. */
369 static void
370 open_active_file (struct dataset *ds)
371 {
372   add_case_limit_trns (ds);
373   add_filter_trns (ds);
374
375   /* Finalize transformations. */
376   trns_chain_finalize (ds->cur_trns_chain);
377
378   /* Make permanent_dict refer to the dictionary right before
379      data reaches the sink. */
380   if (ds->permanent_dict == NULL)
381     ds->permanent_dict = ds->dict;
382
383   /* Figure out whether to compact. */
384   ds->compactor = 
385     (dict_compacting_would_shrink (ds->permanent_dict)
386      ? dict_make_compactor (ds->permanent_dict)
387      : NULL);
388
389   /* Prepare sink. */
390   if (ds->proc_sink == NULL)
391     ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
392   if (ds->proc_sink->class->open != NULL)
393     ds->proc_sink->class->open (ds->proc_sink);
394
395   /* Allocate memory for lag queue. */
396   if (ds->n_lag > 0)
397     {
398       int i;
399   
400       ds->lag_count = 0;
401       ds->lag_head = 0;
402       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
403       for (i = 0; i < ds->n_lag; i++)
404         case_nullify (&ds->lag_queue[i]);
405     }
406 }
407
408 /* Add C to the lag queue. */
409 static void
410 lag_case (struct dataset *ds, const struct ccase *c)
411 {
412   if (ds->lag_count < ds->n_lag)
413     ds->lag_count++;
414   case_destroy (&ds->lag_queue[ds->lag_head]);
415   case_clone (&ds->lag_queue[ds->lag_head], c);
416   if (++ds->lag_head >= ds->n_lag)
417     ds->lag_head = 0;
418 }
419
420 /* Clears the variables in C that need to be cleared between
421    processing cases.  */
422 static void
423 clear_case (const struct dataset *ds, struct ccase *c)
424 {
425   size_t var_cnt = dict_get_var_cnt (ds->dict);
426   size_t i;
427   
428   for (i = 0; i < var_cnt; i++) 
429     {
430       struct variable *v = dict_get_var (ds->dict, i);
431       if (!var_get_leave (v)) 
432         {
433           if (var_is_numeric (v))
434             case_data_rw (c, v)->f = SYSMIS; 
435           else
436             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
437         } 
438     }
439 }
440
441 /* Closes the active file. */
442 static bool
443 close_active_file (struct dataset *ds)
444 {
445   /* Free memory for lag queue, and turn off lagging. */
446   if (ds->n_lag > 0)
447     {
448       int i;
449       
450       for (i = 0; i < ds->n_lag; i++)
451         case_destroy (&ds->lag_queue[i]);
452       free (ds->lag_queue);
453       ds->n_lag = 0;
454     }
455   
456   /* Dictionary from before TEMPORARY becomes permanent. */
457   proc_cancel_temporary_transformations (ds);
458
459   /* Finish compacting. */
460   if (ds->compactor != NULL) 
461     {
462       dict_compactor_destroy (ds->compactor);
463       dict_compact_values (ds->dict);
464       ds->compactor = NULL;
465     }
466     
467   /* Old data sink becomes new data source. */
468   if (ds->proc_sink->class->make_source != NULL)
469     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
470   free_case_sink (ds->proc_sink);
471   ds->proc_sink = NULL;
472
473   dict_clear_vectors (ds->dict);
474   ds->permanent_dict = NULL;
475   return proc_cancel_all_transformations (ds);
476 }
477 \f
478 /* Returns a pointer to the lagged case from N_BEFORE cases before the
479    current one, or NULL if there haven't been that many cases yet. */
480 struct ccase *
481 lagged_case (const struct dataset *ds, int n_before)
482 {
483   assert (n_before >= 1 );
484   assert (n_before <= ds->n_lag);
485
486   if (n_before <= ds->lag_count)
487     {
488       int index = ds->lag_head - n_before;
489       if (index < 0)
490         index += ds->n_lag;
491       return &ds->lag_queue[index];
492     }
493   else
494     return NULL;
495 }
496 \f
497 /* Procedure that separates the data into SPLIT FILE groups. */
498
499 /* Represents auxiliary data for handling SPLIT FILE. */
500 struct split_aux_data 
501   {
502     struct dataset *dataset;    /* The dataset */
503     struct ccase prev_case;     /* Data in previous case. */
504
505     /* Callback functions. */
506     begin_func *begin; 
507     case_func *proc;
508     end_func *end;
509     void *func_aux;
510   };
511
512 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
513 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
514 static bool split_procedure_end_func (void *, const struct dataset *);
515
516 /* Like procedure(), but it automatically breaks the case stream
517    into SPLIT FILE break groups.  Before each group of cases with
518    identical SPLIT FILE variable values, BEGIN_FUNC is called
519    with the first case in the group.
520    Then PROC_FUNC is called for each case in the group (including
521    the first).
522    END_FUNC is called when the group is finished.  FUNC_AUX is
523    passed to each of the functions as auxiliary data.
524
525    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
526    and END_FUNC will be called at all. 
527
528    If SPLIT FILE is not in effect, then there is one break group
529    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
530    will be called once.
531    
532    Returns true if successful, false if an I/O error occurred. */
533 bool
534 procedure_with_splits (struct dataset *ds,
535                        begin_func begin, 
536                        case_func *proc,
537                        end_func *end,
538                        void *func_aux) 
539 {
540   struct split_aux_data split_aux;
541   bool ok;
542
543   case_nullify (&split_aux.prev_case);
544   split_aux.begin = begin;
545   split_aux.proc = proc;
546   split_aux.end = end;
547   split_aux.func_aux = func_aux;
548   split_aux.dataset = ds;
549
550   ok = internal_procedure (ds, split_procedure_case_func,
551                            split_procedure_end_func, &split_aux);
552
553   case_destroy (&split_aux.prev_case);
554
555   return ok;
556 }
557
558 /* Case callback used by procedure_with_splits(). */
559 static bool
560 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
561 {
562   struct split_aux_data *split_aux = split_aux_;
563
564   /* Start a new series if needed. */
565   if (case_is_null (&split_aux->prev_case)
566       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
567     {
568       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
569         split_aux->end (split_aux->func_aux, ds);
570
571       case_destroy (&split_aux->prev_case);
572       case_clone (&split_aux->prev_case, c);
573
574       if (split_aux->begin != NULL)
575         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
576     }
577
578   return (split_aux->proc == NULL
579           || split_aux->proc (c, split_aux->func_aux, ds));
580 }
581
582 /* End-of-file callback used by procedure_with_splits(). */
583 static bool
584 split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
585 {
586   struct split_aux_data *split_aux = split_aux_;
587
588   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
589     split_aux->end (split_aux->func_aux, ds);
590   return true;
591 }
592
593 /* Compares the SPLIT FILE variables in cases A and B and returns
594    nonzero only if they differ. */
595 static int
596 equal_splits (const struct ccase *a, const struct ccase *b, 
597               const struct dataset *ds) 
598 {
599   return case_compare (a, b,
600                        dict_get_split_vars (ds->dict),
601                        dict_get_split_cnt (ds->dict)) == 0;
602 }
603 \f
604 /* Multipass procedure that separates the data into SPLIT FILE
605    groups. */
606
607 /* Represents auxiliary data for handling SPLIT FILE in a
608    multipass procedure. */
609 struct multipass_split_aux_data 
610   {
611     struct dataset *dataset;    /* The dataset of the split */
612     struct ccase prev_case;     /* Data in previous case. */
613     struct casefile *casefile;  /* Accumulates data for a split. */
614     split_func *split;          /* Function to call with the accumulated 
615                                    data. */
616     void *func_aux;             /* Auxiliary data. */ 
617   };
618
619 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
620 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
621 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
622
623 /* Returns true if successful, false if an I/O error occurred. */
624 bool
625 multipass_procedure_with_splits (struct dataset *ds, 
626                                  split_func  *split,
627                                  void *func_aux)
628 {
629   struct multipass_split_aux_data aux;
630   bool ok;
631
632   case_nullify (&aux.prev_case);
633   aux.casefile = NULL;
634   aux.split = split;
635   aux.func_aux = func_aux;
636   aux.dataset = ds;
637
638   ok = internal_procedure (ds, multipass_split_case_func,
639                            multipass_split_end_func, &aux);
640   case_destroy (&aux.prev_case);
641
642   return ok;
643 }
644
645 /* Case callback used by multipass_procedure_with_splits(). */
646 static bool
647 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
648 {
649   struct multipass_split_aux_data *aux = aux_;
650   bool ok = true;
651
652   /* Start a new series if needed. */
653   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
654     {
655       /* Record split values. */
656       case_destroy (&aux->prev_case);
657       case_clone (&aux->prev_case, c);
658
659       /* Pass any cases to split_func. */
660       if (aux->casefile != NULL)
661         ok = multipass_split_output (aux, ds);
662
663       /* Start a new casefile. */
664       aux->casefile = 
665         fastfile_create (dict_get_next_value_idx (ds->dict));
666     }
667
668   return casefile_append (aux->casefile, c) && ok;
669 }
670
671 /* End-of-file callback used by multipass_procedure_with_splits(). */
672 static bool
673 multipass_split_end_func (void *aux_, const struct dataset *ds)
674 {
675   struct multipass_split_aux_data *aux = aux_;
676   return (aux->casefile == NULL || multipass_split_output (aux, ds));
677 }
678
679 static bool
680 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
681 {
682   bool ok;
683   
684   assert (aux->casefile != NULL);
685   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
686   casefile_destroy (aux->casefile);
687   aux->casefile = NULL;
688
689   return ok;
690 }
691 \f
692 /* Discards all the current state in preparation for a data-input
693    command like DATA LIST or GET. */
694 void
695 discard_variables (struct dataset *ds)
696 {
697   dict_clear (ds->dict);
698   fh_set_default_handle (NULL);
699
700   ds->n_lag = 0;
701   
702   free_case_source (ds->proc_source);
703   ds->proc_source = NULL;
704
705   proc_cancel_all_transformations (ds);
706 }
707 \f
708 /* Returns the current set of permanent transformations,
709    and clears the permanent transformations.
710    For use by INPUT PROGRAM. */
711 struct trns_chain *
712 proc_capture_transformations (struct dataset *ds) 
713 {
714   struct trns_chain *chain;
715   
716   assert (ds->temporary_trns_chain == NULL);
717   chain = ds->permanent_trns_chain;
718   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
719   return chain;
720 }
721
722 /* Adds a transformation that processes a case with PROC and
723    frees itself with FREE to the current set of transformations.
724    The functions are passed AUX as auxiliary data. */
725 void
726 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
727 {
728   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
729 }
730
731 /* Adds a transformation that processes a case with PROC and
732    frees itself with FREE to the current set of transformations.
733    When parsing of the block of transformations is complete,
734    FINALIZE will be called.
735    The functions are passed AUX as auxiliary data. */
736 void
737 add_transformation_with_finalizer (struct dataset *ds, 
738                                    trns_finalize_func *finalize,
739                                    trns_proc_func *proc,
740                                    trns_free_func *free, void *aux)
741 {
742   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
743 }
744
745 /* Returns the index of the next transformation.
746    This value can be returned by a transformation procedure
747    function to indicate a "jump" to that transformation. */
748 size_t
749 next_transformation (const struct dataset *ds) 
750 {
751   return trns_chain_next (ds->cur_trns_chain);
752 }
753
754 /* Returns true if the next call to add_transformation() will add
755    a temporary transformation, false if it will add a permanent
756    transformation. */
757 bool
758 proc_in_temporary_transformations (const struct dataset *ds) 
759 {
760   return ds->temporary_trns_chain != NULL;
761 }
762
763 /* Marks the start of temporary transformations.
764    Further calls to add_transformation() will add temporary
765    transformations. */
766 void
767 proc_start_temporary_transformations (struct dataset *ds) 
768 {
769   if (!proc_in_temporary_transformations (ds))
770     {
771       add_case_limit_trns (ds);
772
773       ds->permanent_dict = dict_clone (ds->dict);
774       trns_chain_finalize (ds->permanent_trns_chain);
775       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
776     }
777 }
778
779 /* Converts all the temporary transformations, if any, to
780    permanent transformations.  Further transformations will be
781    permanent.
782    Returns true if anything changed, false otherwise. */
783 bool
784 proc_make_temporary_transformations_permanent (struct dataset *ds) 
785 {
786   if (proc_in_temporary_transformations (ds)) 
787     {
788       trns_chain_finalize (ds->temporary_trns_chain);
789       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
790       ds->temporary_trns_chain = NULL;
791
792       dict_destroy (ds->permanent_dict);
793       ds->permanent_dict = NULL;
794
795       return true;
796     }
797   else
798     return false;
799 }
800
801 /* Cancels all temporary transformations, if any.  Further
802    transformations will be permanent.
803    Returns true if anything changed, false otherwise. */
804 bool
805 proc_cancel_temporary_transformations (struct dataset *ds) 
806 {
807   if (proc_in_temporary_transformations (ds)) 
808     {
809       dict_destroy (ds->dict);
810       ds->dict = ds->permanent_dict;
811       ds->permanent_dict = NULL;
812
813       trns_chain_destroy (ds->temporary_trns_chain);
814       ds->temporary_trns_chain = NULL;
815
816       return true;
817     }
818   else
819     return false;
820 }
821
822 /* Cancels all transformations, if any.
823    Returns true if successful, false on I/O error. */
824 bool
825 proc_cancel_all_transformations (struct dataset *ds)
826 {
827   bool ok;
828   ok = trns_chain_destroy (ds->permanent_trns_chain);
829   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
830   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
831   ds->temporary_trns_chain = NULL;
832   return ok;
833 }
834 \f
835 /* Initializes procedure handling. */
836 struct dataset *
837 create_dataset (void)
838 {
839   struct dataset *ds = xzalloc (sizeof(*ds));
840   ds->dict = dict_create ();
841   proc_cancel_all_transformations (ds);
842   return ds;
843 }
844
845 /* Finishes up procedure handling. */
846 void
847 destroy_dataset (struct dataset *ds)
848 {
849   discard_variables (ds);
850   dict_destroy (ds->dict);
851   trns_chain_destroy (ds->permanent_trns_chain);
852   free (ds);
853 }
854
855 /* Sets SINK as the destination for procedure output from the
856    next procedure. */
857 void
858 proc_set_sink (struct dataset *ds, struct case_sink *sink) 
859 {
860   assert (ds->proc_sink == NULL);
861   ds->proc_sink = sink;
862 }
863
864 /* Sets SOURCE as the source for procedure input for the next
865    procedure. */
866 void
867 proc_set_source (struct dataset *ds, struct case_source *source) 
868 {
869   assert (ds->proc_source == NULL);
870   ds->proc_source = source;
871 }
872
873 /* Returns true if a source for the next procedure has been
874    configured, false otherwise. */
875 bool
876 proc_has_source (const struct dataset *ds) 
877 {
878   return ds->proc_source != NULL;
879 }
880
881 /* Returns the output from the previous procedure.
882    For use only immediately after executing a procedure.
883    The returned casefile is owned by the caller; it will not be
884    automatically used for the next procedure's input. */
885 struct casefile *
886 proc_capture_output (struct dataset *ds) 
887 {
888   struct casefile *casefile;
889
890   /* Try to make sure that this function is called immediately
891      after procedure() or a similar function. */
892   assert (ds->proc_source != NULL);
893   assert (case_source_is_class (ds->proc_source, &storage_source_class));
894   assert (trns_chain_is_empty (ds->permanent_trns_chain));
895   assert (!proc_in_temporary_transformations (ds));
896
897   casefile = storage_source_decapsulate (ds->proc_source);
898   ds->proc_source = NULL;
899
900   return casefile;
901 }
902 \f
903 static trns_proc_func case_limit_trns_proc;
904 static trns_free_func case_limit_trns_free;
905
906 /* Adds a transformation that limits the number of cases that may
907    pass through, if DS->DICT has a case limit. */
908 static void
909 add_case_limit_trns (struct dataset *ds) 
910 {
911   size_t case_limit = dict_get_case_limit (ds->dict);
912   if (case_limit != 0)
913     {
914       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
915       *cases_remaining = case_limit;
916       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
917                           cases_remaining);
918       dict_set_case_limit (ds->dict, 0);
919     }
920 }
921
922 /* Limits the maximum number of cases processed to
923    *CASES_REMAINING. */
924 static int
925 case_limit_trns_proc (void *cases_remaining_,
926                       struct ccase *c UNUSED, casenumber case_nr UNUSED) 
927 {
928   size_t *cases_remaining = cases_remaining_;
929   if (*cases_remaining > 0) 
930     {
931       (*cases_remaining)--;
932       return TRNS_CONTINUE;
933     }
934   else
935     return TRNS_DROP_CASE;
936 }
937
938 /* Frees the data associated with a case limit transformation. */
939 static bool
940 case_limit_trns_free (void *cases_remaining_) 
941 {
942   size_t *cases_remaining = cases_remaining_;
943   free (cases_remaining);
944   return true;
945 }
946 \f
947 static trns_proc_func filter_trns_proc;
948
949 /* Adds a temporary transformation to filter data according to
950    the variable specified on FILTER, if any. */
951 static void
952 add_filter_trns (struct dataset *ds) 
953 {
954   struct variable *filter_var = dict_get_filter (ds->dict);
955   if (filter_var != NULL) 
956     {
957       proc_start_temporary_transformations (ds);
958       add_transformation (ds, filter_trns_proc, NULL, filter_var);
959     }
960 }
961
962 /* FILTER transformation. */
963 static int
964 filter_trns_proc (void *filter_var_,
965                   struct ccase *c UNUSED, casenumber case_nr UNUSED) 
966   
967 {
968   struct variable *filter_var = filter_var_;
969   double f = case_num (c, filter_var);
970   return (f != 0.0 && !var_is_num_missing (filter_var, f)
971           ? TRNS_CONTINUE : TRNS_DROP_CASE);
972 }
973
974
975 struct dictionary *
976 dataset_dict (const struct dataset *ds)
977 {
978   return ds->dict;
979 }
980
981
982 void 
983 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
984 {
985   ds->dict = dict;
986 }
987
988 int 
989 dataset_n_lag (const struct dataset *ds)
990 {
991   return ds->n_lag;
992 }
993
994 void 
995 dataset_set_n_lag (struct dataset *ds, int n_lag)
996 {
997   ds->n_lag = n_lag;
998 }
999
1000