Continue reforming procedure execution. Change internal_procedure()
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/value-labels.h>
38 #include <data/variable.h>
39 #include <language/expressions/public.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44 #include <output/manager.h>
45 #include <output/table.h>
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /*
51    Virtual File Manager (vfm):
52
53    vfm is used to process data files.  It uses the model that
54    data is read from one stream (the data source), processed,
55    then written to another (the data sink).  The data source is
56    then deleted and the data sink becomes the data source for the
57    next procedure. */
58
59 /* Procedure execution data. */
60 struct write_case_data
61   {
62     /* Function to call for each case. */
63     bool (*case_func) (struct ccase *, void *); /* Function. */
64     void *aux;                                 /* Auxiliary data. */ 
65
66     struct ccase trns_case;     /* Case used for transformations. */
67     struct ccase sink_case;     /* Case written to sink, if
68                                    compaction is necessary. */
69     size_t cases_written;       /* Cases output so far. */
70   };
71
72 /* Cases are read from vfm_source,
73    pass through permanent_trns_chain (which transforms them into
74    the format described by permanent_dict),
75    are written to vfm_sink,
76    pass through temporary_trns_chain (which transforms them into
77    the format described by default_dict),
78    and are finally passed to the procedure. */
79 static struct case_source *vfm_source;
80 static struct trns_chain *permanent_trns_chain;
81 static struct dictionary *permanent_dict;
82 static struct case_sink *vfm_sink;
83 static struct trns_chain *temporary_trns_chain;
84 struct dictionary *default_dict;
85
86 /* The transformation chain that the next transformation will be
87    added to. */
88 static struct trns_chain *cur_trns_chain;
89
90 /* The compactor used to compact a case, if necessary;
91    otherwise a null pointer. */
92 static struct dict_compactor *compactor;
93
94 /* Time at which vfm was last invoked. */
95 static time_t last_vfm_invocation;
96
97 /* Lag queue. */
98 int n_lag;                      /* Number of cases to lag. */
99 static int lag_count;           /* Number of cases in lag_queue so far. */
100 static int lag_head;            /* Index where next case will be added. */
101 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
102
103 static void add_case_limit_trns (void);
104 static void add_filter_trns (void);
105 static void add_process_if_trns (void);
106
107 static bool internal_procedure (bool (*case_func) (struct ccase *, void *),
108                                 bool (*end_func) (void *),
109                                 void *aux);
110 static void update_last_vfm_invocation (void);
111 static void create_trns_case (struct ccase *, struct dictionary *);
112 static void open_active_file (void);
113 static bool write_case (struct write_case_data *wc_data);
114 static void lag_case (const struct ccase *c);
115 static void clear_case (struct ccase *c);
116 static bool close_active_file (void);
117 \f
118 /* Public functions. */
119
120 /* Returns the last time the data was read. */
121 time_t
122 time_of_last_procedure (void) 
123 {
124   if (last_vfm_invocation == 0)
125     update_last_vfm_invocation ();
126   return last_vfm_invocation;
127 }
128 \f
129 /* Regular procedure. */
130
131 /* Reads the data from the input program and writes it to a new
132    active file.  For each case we read from the input program, we
133    do the following:
134
135    1. Execute permanent transformations.  If these drop the case,
136       start the next case from step 1.
137
138    2. Write case to replacement active file.
139    
140    3. Execute temporary transformations.  If these drop the case,
141       start the next case from step 1.
142       
143    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
144
145    Returns true if successful, false if an I/O error occurred. */
146 bool
147 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
148 {
149   return internal_procedure (proc_func, NULL, aux);
150 }
151 \f
152 /* Multipass procedure. */
153
154 struct multipass_aux_data 
155   {
156     struct casefile *casefile;
157     
158     bool (*proc_func) (const struct casefile *, void *aux);
159     void *aux;
160   };
161
162 /* Case processing function for multipass_procedure(). */
163 static bool
164 multipass_case_func (struct ccase *c, void *aux_data_) 
165 {
166   struct multipass_aux_data *aux_data = aux_data_;
167   return casefile_append (aux_data->casefile, c);
168 }
169
170 /* End-of-file function for multipass_procedure(). */
171 static bool
172 multipass_end_func (void *aux_data_) 
173 {
174   struct multipass_aux_data *aux_data = aux_data_;
175   return (aux_data->proc_func == NULL
176           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
177 }
178
179 /* Procedure that allows multiple passes over the input data.
180    The entire active file is passed to PROC_FUNC, with the given
181    AUX as auxiliary data, as a unit. */
182 bool
183 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
184                      void *aux) 
185 {
186   struct multipass_aux_data aux_data;
187   bool ok;
188
189   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
190   aux_data.proc_func = proc_func;
191   aux_data.aux = aux;
192
193   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
194   ok = !casefile_error (aux_data.casefile) && ok;
195
196   casefile_destroy (aux_data.casefile);
197
198   return ok;
199 }
200 \f
201 /* Procedure implementation. */
202
203 /* Executes a procedure.
204    Passes each case to CASE_FUNC.
205    Calls END_FUNC after the last case.
206    Returns true if successful, false if an I/O error occurred (or
207    if CASE_FUNC or END_FUNC ever returned false). */
208 static bool
209 internal_procedure (bool (*case_func) (struct ccase *, void *),
210                     bool (*end_func) (void *),
211                     void *aux) 
212 {
213   struct write_case_data wc_data;
214   bool ok = true;
215
216   assert (vfm_source != NULL);
217
218   update_last_vfm_invocation ();
219
220   /* Optimize the trivial case where we're not going to do
221      anything with the data, by not reading the data at all. */
222   if (case_func == NULL && end_func == NULL
223       && case_source_is_class (vfm_source, &storage_source_class)
224       && vfm_sink == NULL
225       && (temporary_trns_chain == NULL
226           || trns_chain_is_empty (temporary_trns_chain))
227       && trns_chain_is_empty (permanent_trns_chain))
228     {
229       n_lag = 0;
230       expr_free (process_if_expr);
231       process_if_expr = NULL;
232       dict_set_case_limit (default_dict, 0);
233       dict_clear_vectors (default_dict);
234       return true;
235     }
236   
237   open_active_file ();
238   
239   wc_data.case_func = case_func;
240   wc_data.aux = aux;
241   create_trns_case (&wc_data.trns_case, default_dict);
242   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
243   wc_data.cases_written = 0;
244
245   ok = vfm_source->class->read (vfm_source,
246                                 &wc_data.trns_case,
247                                 write_case, &wc_data) && ok;
248   if (end_func != NULL)
249     ok = end_func (aux) && ok;
250
251   case_destroy (&wc_data.sink_case);
252   case_destroy (&wc_data.trns_case);
253
254   ok = close_active_file () && ok;
255
256   return ok;
257 }
258
259 /* Updates last_vfm_invocation. */
260 static void
261 update_last_vfm_invocation (void) 
262 {
263   last_vfm_invocation = time (NULL);
264 }
265
266 /* Creates and returns a case, initializing it from the vectors
267    that say which `value's need to be initialized just once, and
268    which ones need to be re-initialized before every case. */
269 static void
270 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
271 {
272   size_t var_cnt = dict_get_var_cnt (dict);
273   size_t i;
274
275   case_create (trns_case, dict_get_next_value_idx (dict));
276   for (i = 0; i < var_cnt; i++) 
277     {
278       struct variable *v = dict_get_var (dict, i);
279       union value *value = case_data_rw (trns_case, v->fv);
280
281       if (v->type == NUMERIC)
282         value->f = v->leave ? 0.0 : SYSMIS;
283       else
284         memset (value->s, ' ', v->width);
285     }
286 }
287
288 /* Makes all preparations for reading from the data source and writing
289    to the data sink. */
290 static void
291 open_active_file (void)
292 {
293   add_case_limit_trns ();
294   add_filter_trns ();
295   add_process_if_trns ();
296
297   /* Finalize transformations. */
298   trns_chain_finalize (cur_trns_chain);
299
300   /* Make permanent_dict refer to the dictionary right before
301      data reaches the sink. */
302   if (permanent_dict == NULL)
303     permanent_dict = default_dict;
304
305   /* Figure out compaction. */
306   compactor = (dict_needs_compaction (permanent_dict)
307                ? dict_make_compactor (permanent_dict)
308                : NULL);
309
310   /* Prepare sink. */
311   if (vfm_sink == NULL)
312     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
313   if (vfm_sink->class->open != NULL)
314     vfm_sink->class->open (vfm_sink);
315
316   /* Allocate memory for lag queue. */
317   if (n_lag > 0)
318     {
319       int i;
320   
321       lag_count = 0;
322       lag_head = 0;
323       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
324       for (i = 0; i < n_lag; i++)
325         case_nullify (&lag_queue[i]);
326     }
327 }
328
329 /* Transforms trns_case and writes it to the replacement active
330    file if advisable.  Returns true if more cases can be
331    accepted, false otherwise.  Do not call this function again
332    after it has returned false once.  */
333 static bool
334 write_case (struct write_case_data *wc_data)
335 {
336   enum trns_result retval;
337   size_t case_nr;
338   
339   /* Execute permanent transformations.  */
340   case_nr = wc_data->cases_written + 1;
341   retval = trns_chain_execute (permanent_trns_chain,
342                                &wc_data->trns_case, &case_nr);
343   if (retval != TRNS_CONTINUE)
344     goto done;
345
346   /* Write case to LAG queue. */
347   if (n_lag)
348     lag_case (&wc_data->trns_case);
349
350   /* Write case to replacement active file. */
351   wc_data->cases_written++;
352   if (vfm_sink->class->write != NULL) 
353     {
354       if (compactor != NULL) 
355         {
356           dict_compactor_compact (compactor, &wc_data->sink_case,
357                                   &wc_data->trns_case);
358           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
359         }
360       else
361         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
362     }
363   
364   /* Execute temporary transformations. */
365   if (temporary_trns_chain != NULL) 
366     {
367       retval = trns_chain_execute (temporary_trns_chain,
368                                    &wc_data->trns_case,
369                                    &wc_data->cases_written);
370       if (retval != TRNS_CONTINUE)
371         goto done;
372     }
373
374   /* Pass case to procedure. */
375   if (wc_data->case_func != NULL)
376     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
377       retval = TRNS_ERROR;
378
379  done:
380   clear_case (&wc_data->trns_case);
381   return retval != TRNS_ERROR;
382 }
383
384 /* Add C to the lag queue. */
385 static void
386 lag_case (const struct ccase *c)
387 {
388   if (lag_count < n_lag)
389     lag_count++;
390   case_destroy (&lag_queue[lag_head]);
391   case_clone (&lag_queue[lag_head], c);
392   if (++lag_head >= n_lag)
393     lag_head = 0;
394 }
395
396 /* Clears the variables in C that need to be cleared between
397    processing cases.  */
398 static void
399 clear_case (struct ccase *c)
400 {
401   size_t var_cnt = dict_get_var_cnt (default_dict);
402   size_t i;
403   
404   for (i = 0; i < var_cnt; i++) 
405     {
406       struct variable *v = dict_get_var (default_dict, i);
407       if (!v->leave) 
408         {
409           if (v->type == NUMERIC)
410             case_data_rw (c, v->fv)->f = SYSMIS;
411           else
412             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
413         } 
414     }
415 }
416
417 /* Closes the active file. */
418 static bool
419 close_active_file (void)
420 {
421   /* Free memory for lag queue, and turn off lagging. */
422   if (n_lag > 0)
423     {
424       int i;
425       
426       for (i = 0; i < n_lag; i++)
427         case_destroy (&lag_queue[i]);
428       free (lag_queue);
429       n_lag = 0;
430     }
431   
432   /* Dictionary from before TEMPORARY becomes permanent. */
433   proc_cancel_temporary_transformations ();
434
435   /* Finish compaction. */
436   if (compactor != NULL) 
437     {
438       dict_compactor_destroy (compactor);
439       dict_compact_values (default_dict);
440       compactor = NULL;
441     }
442     
443   /* Free data source. */
444   free_case_source (vfm_source);
445   vfm_source = NULL;
446
447   /* Old data sink becomes new data source. */
448   if (vfm_sink->class->make_source != NULL)
449     vfm_source = vfm_sink->class->make_source (vfm_sink);
450   free_case_sink (vfm_sink);
451   vfm_sink = NULL;
452
453   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
454      and get rid of all the transformations. */
455   dict_clear_vectors (default_dict);
456   permanent_dict = NULL;
457   return proc_cancel_all_transformations ();
458 }
459 \f
460 /* Returns a pointer to the lagged case from N_BEFORE cases before the
461    current one, or NULL if there haven't been that many cases yet. */
462 struct ccase *
463 lagged_case (int n_before)
464 {
465   assert (n_before >= 1 );
466   assert (n_before <= n_lag);
467
468   if (n_before <= lag_count)
469     {
470       int index = lag_head - n_before;
471       if (index < 0)
472         index += n_lag;
473       return &lag_queue[index];
474     }
475   else
476     return NULL;
477 }
478 \f
479 /* Procedure that separates the data into SPLIT FILE groups. */
480
481 /* Represents auxiliary data for handling SPLIT FILE. */
482 struct split_aux_data 
483   {
484     size_t case_count;          /* Number of cases so far. */
485     struct ccase prev_case;     /* Data in previous case. */
486
487     /* Functions to call... */
488     void (*begin_func) (void *);               /* ...before data. */
489     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
490     void (*end_func) (void *);                 /* ...after data. */
491     void *func_aux;                            /* Auxiliary data. */ 
492   };
493
494 static int equal_splits (const struct ccase *, const struct ccase *);
495 static bool split_procedure_case_func (struct ccase *c, void *split_aux_);
496 static bool split_procedure_end_func (void *split_aux_);
497 static void dump_splits (struct ccase *);
498
499 /* Like procedure(), but it automatically breaks the case stream
500    into SPLIT FILE break groups.  Before each group of cases with
501    identical SPLIT FILE variable values, BEGIN_FUNC is called.
502    Then PROC_FUNC is called with each case in the group.  
503    END_FUNC is called when the group is finished.  FUNC_AUX is
504    passed to each of the functions as auxiliary data.
505
506    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
507    and END_FUNC will be called at all. 
508
509    If SPLIT FILE is not in effect, then there is one break group
510    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
511    will be called once.
512    
513    Returns true if successful, false if an I/O error occurred. */
514 bool
515 procedure_with_splits (void (*begin_func) (void *aux),
516                        bool (*proc_func) (struct ccase *, void *aux),
517                        void (*end_func) (void *aux),
518                        void *func_aux) 
519 {
520   struct split_aux_data split_aux;
521   bool ok;
522
523   split_aux.case_count = 0;
524   case_nullify (&split_aux.prev_case);
525   split_aux.begin_func = begin_func;
526   split_aux.proc_func = proc_func;
527   split_aux.end_func = end_func;
528   split_aux.func_aux = func_aux;
529
530   ok = internal_procedure (split_procedure_case_func,
531                            split_procedure_end_func, &split_aux);
532
533   case_destroy (&split_aux.prev_case);
534
535   return ok;
536 }
537
538 /* Case callback used by procedure_with_splits(). */
539 static bool
540 split_procedure_case_func (struct ccase *c, void *split_aux_) 
541 {
542   struct split_aux_data *split_aux = split_aux_;
543
544   /* Start a new series if needed. */
545   if (split_aux->case_count == 0
546       || !equal_splits (c, &split_aux->prev_case))
547     {
548       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
549         split_aux->end_func (split_aux->func_aux);
550
551       dump_splits (c);
552       case_destroy (&split_aux->prev_case);
553       case_clone (&split_aux->prev_case, c);
554
555       if (split_aux->begin_func != NULL)
556         split_aux->begin_func (split_aux->func_aux);
557     }
558
559   split_aux->case_count++;
560   return (split_aux->proc_func == NULL
561           || split_aux->proc_func (c, split_aux->func_aux));
562 }
563
564 /* End-of-file callback used by procedure_with_splits(). */
565 static bool
566 split_procedure_end_func (void *split_aux_) 
567 {
568   struct split_aux_data *split_aux = split_aux_;
569
570   if (split_aux->case_count > 0 && split_aux->end_func != NULL)
571     split_aux->end_func (split_aux->func_aux);
572   return true;
573 }
574
575 /* Compares the SPLIT FILE variables in cases A and B and returns
576    nonzero only if they differ. */
577 static int
578 equal_splits (const struct ccase *a, const struct ccase *b) 
579 {
580   return case_compare (a, b,
581                        dict_get_split_vars (default_dict),
582                        dict_get_split_cnt (default_dict)) == 0;
583 }
584
585 /* Dumps out the values of all the split variables for the case C. */
586 static void
587 dump_splits (struct ccase *c)
588 {
589   struct variable *const *split;
590   struct tab_table *t;
591   size_t split_cnt;
592   int i;
593
594   split_cnt = dict_get_split_cnt (default_dict);
595   if (split_cnt == 0)
596     return;
597
598   t = tab_create (3, split_cnt + 1, 0);
599   tab_dim (t, tab_natural_dimensions);
600   tab_vline (t, TAL_GAP, 1, 0, split_cnt);
601   tab_vline (t, TAL_GAP, 2, 0, split_cnt);
602   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
603   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
604   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
605   split = dict_get_split_vars (default_dict);
606   for (i = 0; i < split_cnt; i++)
607     {
608       struct variable *v = split[i];
609       char temp_buf[80];
610       const char *val_lab;
611
612       assert (v->type == NUMERIC || v->type == ALPHA);
613       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
614       
615       data_out (temp_buf, &v->print, case_data (c, v->fv));
616       
617       temp_buf[v->print.w] = 0;
618       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
619
620       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
621       if (val_lab)
622         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
623     }
624   tab_flags (t, SOMF_NO_TITLE);
625   tab_submit (t);
626 }
627 \f
628 /* Multipass procedure that separates the data into SPLIT FILE
629    groups. */
630
631 /* Represents auxiliary data for handling SPLIT FILE in a
632    multipass procedure. */
633 struct multipass_split_aux_data 
634   {
635     struct ccase prev_case;     /* Data in previous case. */
636     struct casefile *casefile;  /* Accumulates data for a split. */
637
638     /* Function to call with the accumulated data. */
639     bool (*split_func) (const struct casefile *, void *);
640     void *func_aux;                            /* Auxiliary data. */ 
641   };
642
643 static bool multipass_split_case_func (struct ccase *c, void *aux_);
644 static bool multipass_split_end_func (void *aux_);
645 static bool multipass_split_output (struct multipass_split_aux_data *);
646
647 /* Returns true if successful, false if an I/O error occurred. */
648 bool
649 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
650                                                      void *),
651                                  void *func_aux) 
652 {
653   struct multipass_split_aux_data aux;
654   bool ok;
655
656   case_nullify (&aux.prev_case);
657   aux.casefile = NULL;
658   aux.split_func = split_func;
659   aux.func_aux = func_aux;
660
661   ok = internal_procedure (multipass_split_case_func,
662                            multipass_split_end_func, &aux);
663   case_destroy (&aux.prev_case);
664
665   return ok;
666 }
667
668 /* Case callback used by multipass_procedure_with_splits(). */
669 static bool
670 multipass_split_case_func (struct ccase *c, void *aux_)
671 {
672   struct multipass_split_aux_data *aux = aux_;
673   bool ok = true;
674
675   /* Start a new series if needed. */
676   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
677     {
678       /* Pass any cases to split_func. */
679       if (aux->casefile != NULL)
680         ok = multipass_split_output (aux);
681
682       /* Start a new casefile. */
683       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
684
685       /* Record split values. */
686       dump_splits (c);
687       case_destroy (&aux->prev_case);
688       case_clone (&aux->prev_case, c);
689     }
690
691   return casefile_append (aux->casefile, c) && ok;
692 }
693
694 /* End-of-file callback used by multipass_procedure_with_splits(). */
695 static bool
696 multipass_split_end_func (void *aux_)
697 {
698   struct multipass_split_aux_data *aux = aux_;
699   return (aux->casefile == NULL || multipass_split_output (aux));
700 }
701
702 static bool
703 multipass_split_output (struct multipass_split_aux_data *aux)
704 {
705   bool ok;
706   
707   assert (aux->casefile != NULL);
708   ok = aux->split_func (aux->casefile, aux->func_aux);
709   casefile_destroy (aux->casefile);
710   aux->casefile = NULL;
711
712   return ok;
713 }
714 \f
715 /* Discards all the current state in preparation for a data-input
716    command like DATA LIST or GET. */
717 void
718 discard_variables (void)
719 {
720   dict_clear (default_dict);
721   fh_set_default_handle (NULL);
722
723   n_lag = 0;
724   
725   free_case_source (vfm_source);
726   vfm_source = NULL;
727
728   proc_cancel_all_transformations ();
729
730   expr_free (process_if_expr);
731   process_if_expr = NULL;
732
733   proc_cancel_temporary_transformations ();
734 }
735 \f
736 /* Returns the current set of permanent transformations,
737    and clears the permanent transformations.
738    For use by INPUT PROGRAM. */
739 struct trns_chain *
740 proc_capture_transformations (void) 
741 {
742   struct trns_chain *chain;
743   
744   assert (temporary_trns_chain == NULL);
745   chain = permanent_trns_chain;
746   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
747   return chain;
748 }
749
750 /* Adds a transformation that processes a case with PROC and
751    frees itself with FREE to the current set of transformations.
752    The functions are passed AUX as auxiliary data. */
753 void
754 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
755 {
756   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
757 }
758
759 /* Adds a transformation that processes a case with PROC and
760    frees itself with FREE to the current set of transformations.
761    When parsing of the block of transformations is complete,
762    FINALIZE will be called.
763    The functions are passed AUX as auxiliary data. */
764 void
765 add_transformation_with_finalizer (trns_finalize_func *finalize,
766                                    trns_proc_func *proc,
767                                    trns_free_func *free, void *aux)
768 {
769   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
770 }
771
772 /* Returns the index of the next transformation.
773    This value can be returned by a transformation procedure
774    function to indicate a "jump" to that transformation. */
775 size_t
776 next_transformation (void) 
777 {
778   return trns_chain_next (cur_trns_chain);
779 }
780
781 /* Returns true if the next call to add_transformation() will add
782    a temporary transformation, false if it will add a permanent
783    transformation. */
784 bool
785 proc_in_temporary_transformations (void) 
786 {
787   return temporary_trns_chain != NULL;
788 }
789
790 /* Marks the start of temporary transformations.
791    Further calls to add_transformation() will add temporary
792    transformations. */
793 void
794 proc_start_temporary_transformations (void) 
795 {
796   if (!proc_in_temporary_transformations ())
797     {
798       add_case_limit_trns ();
799
800       permanent_dict = dict_clone (default_dict);
801       trns_chain_finalize (permanent_trns_chain);
802       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
803     }
804 }
805
806 /* Converts all the temporary transformations, if any, to
807    permanent transformations.  Further transformations will be
808    permanent.
809    Returns true if anything changed, false otherwise. */
810 bool
811 proc_make_temporary_transformations_permanent (void) 
812 {
813   if (proc_in_temporary_transformations ()) 
814     {
815       trns_chain_finalize (temporary_trns_chain);
816       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
817       temporary_trns_chain = NULL;
818
819       dict_destroy (permanent_dict);
820       permanent_dict = NULL;
821
822       return true;
823     }
824   else
825     return false;
826 }
827
828 /* Cancels all temporary transformations, if any.  Further
829    transformations will be permanent.
830    Returns true if anything changed, false otherwise. */
831 bool
832 proc_cancel_temporary_transformations (void) 
833 {
834   if (proc_in_temporary_transformations ()) 
835     {
836       dict_destroy (default_dict);
837       default_dict = permanent_dict;
838       permanent_dict = NULL;
839
840       trns_chain_destroy (temporary_trns_chain);
841       temporary_trns_chain = NULL;
842
843       return true;
844     }
845   else
846     return false;
847 }
848
849 /* Cancels all transformations, if any.
850    Returns true if successful, false on I/O error. */
851 bool
852 proc_cancel_all_transformations (void)
853 {
854   bool ok;
855   ok = trns_chain_destroy (permanent_trns_chain);
856   ok = trns_chain_destroy (temporary_trns_chain) && ok;
857   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
858   temporary_trns_chain = NULL;
859   return ok;
860 }
861 \f
862 /* Initializes procedure handling. */
863 void
864 proc_init (void) 
865 {
866   default_dict = dict_create ();
867   proc_cancel_all_transformations ();
868 }
869
870 /* Finishes up procedure handling. */
871 void
872 proc_done (void)
873 {
874   discard_variables ();
875 }
876
877 /* Sets SINK as the destination for procedure output from the
878    next procedure. */
879 void
880 proc_set_sink (struct case_sink *sink) 
881 {
882   assert (vfm_sink == NULL);
883   vfm_sink = sink;
884 }
885
886 /* Sets SOURCE as the source for procedure input for the next
887    procedure. */
888 void
889 proc_set_source (struct case_source *source) 
890 {
891   assert (vfm_source == NULL);
892   vfm_source = source;
893 }
894
895 /* Returns true if a source for the next procedure has been
896    configured, false otherwise. */
897 bool
898 proc_has_source (void) 
899 {
900   return vfm_source != NULL;
901 }
902
903 /* Returns the output from the previous procedure.
904    For use only immediately after executing a procedure.
905    The returned casefile is owned by the caller; it will not be
906    automatically used for the next procedure's input. */
907 struct casefile *
908 proc_capture_output (void) 
909 {
910   struct casefile *casefile;
911
912   /* Try to make sure that this function is called immediately
913      after procedure() or a similar function. */
914   assert (vfm_source != NULL);
915   assert (case_source_is_class (vfm_source, &storage_source_class));
916   assert (trns_chain_is_empty (permanent_trns_chain));
917   assert (!proc_in_temporary_transformations ());
918
919   casefile = storage_source_decapsulate (vfm_source);
920   vfm_source = NULL;
921
922   return casefile;
923 }
924 \f
925 static trns_proc_func case_limit_trns_proc;
926 static trns_free_func case_limit_trns_free;
927
928 /* Adds a transformation that limits the number of cases that may
929    pass through, if default_dict has a case limit. */
930 static void
931 add_case_limit_trns (void) 
932 {
933   size_t case_limit = dict_get_case_limit (default_dict);
934   if (case_limit != 0)
935     {
936       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
937       *cases_remaining = case_limit;
938       add_transformation (case_limit_trns_proc, case_limit_trns_free,
939                           cases_remaining);
940       dict_set_case_limit (default_dict, 0);
941     }
942 }
943
944 /* Limits the maximum number of cases processed to
945    *CASES_REMAINING. */
946 static int
947 case_limit_trns_proc (void *cases_remaining_,
948                       struct ccase *c UNUSED, int case_nr UNUSED) 
949 {
950   size_t *cases_remaining = cases_remaining_;
951   if (*cases_remaining > 0) 
952     {
953       *cases_remaining--;
954       return TRNS_CONTINUE;
955     }
956   else
957     return TRNS_DROP_CASE;
958 }
959
960 /* Frees the data associated with a case limit transformation. */
961 static bool
962 case_limit_trns_free (void *cases_remaining_) 
963 {
964   size_t *cases_remaining = cases_remaining_;
965   free (cases_remaining);
966   return true;
967 }
968 \f
969 static trns_proc_func filter_trns_proc;
970
971 /* Adds a temporary transformation to filter data according to
972    the variable specified on FILTER, if any. */
973 static void
974 add_filter_trns (void) 
975 {
976   struct variable *filter_var = dict_get_filter (default_dict);
977   if (filter_var != NULL) 
978     {
979       proc_start_temporary_transformations ();
980       add_transformation (filter_trns_proc, NULL, filter_var);
981     }
982 }
983
984 /* FILTER transformation. */
985 static int
986 filter_trns_proc (void *filter_var_,
987                   struct ccase *c UNUSED, int case_nr UNUSED) 
988   
989 {
990   struct variable *filter_var = filter_var_;
991   double f = case_num (c, filter_var->fv);
992   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
993           ? TRNS_CONTINUE : TRNS_DROP_CASE);
994 }
995 \f
996 static trns_proc_func process_if_trns_proc;
997 static trns_free_func process_if_trns_free;
998
999 /* Adds a temporary transformation to filter data according to
1000    the expression specified on PROCESS IF, if any. */
1001 static void
1002 add_process_if_trns (void) 
1003 {
1004   if (process_if_expr != NULL) 
1005     {
1006       proc_start_temporary_transformations ();
1007       add_transformation (process_if_trns_proc, process_if_trns_free,
1008                           process_if_expr);
1009       process_if_expr = NULL;
1010     }
1011 }
1012
1013 /* PROCESS IF transformation. */
1014 static int
1015 process_if_trns_proc (void *expression_,
1016                       struct ccase *c UNUSED, int case_nr UNUSED) 
1017   
1018 {
1019   struct expression *expression = expression_;
1020   return (expr_evaluate_num (expression, c, case_nr) == 1.0
1021           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1022 }
1023
1024 /* Frees a PROCESS IF transformation. */
1025 static bool
1026 process_if_trns_free (void *expression_) 
1027 {
1028   struct expression *expression = expression_;
1029   expr_free (expression);
1030   return true;
1031 }