Get rid of unused struct member.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/value-labels.h>
38 #include <data/variable.h>
39 #include <language/expressions/public.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44 #include <output/manager.h>
45 #include <output/table.h>
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /*
51    Virtual File Manager (vfm):
52
53    vfm is used to process data files.  It uses the model that
54    data is read from one stream (the data source), processed,
55    then written to another (the data sink).  The data source is
56    then deleted and the data sink becomes the data source for the
57    next procedure. */
58
59 /* Procedure execution data. */
60 struct write_case_data
61   {
62     /* Function to call for each case. */
63     bool (*proc_func) (struct ccase *, void *); /* Function. */
64     void *aux;                                 /* Auxiliary data. */ 
65
66     struct ccase trns_case;     /* Case used for transformations. */
67     struct ccase sink_case;     /* Case written to sink, if
68                                    compaction is necessary. */
69     size_t cases_written;       /* Cases output so far. */
70   };
71
72 /* Cases are read from vfm_source,
73    pass through permanent_trns_chain (which transforms them into
74    the format described by permanent_dict),
75    are written to vfm_sink,
76    pass through temporary_trns_chain (which transforms them into
77    the format described by default_dict),
78    and are finally passed to the procedure. */
79 static struct case_source *vfm_source;
80 static struct trns_chain *permanent_trns_chain;
81 static struct dictionary *permanent_dict;
82 static struct case_sink *vfm_sink;
83 static struct trns_chain *temporary_trns_chain;
84 struct dictionary *default_dict;
85
86 /* The transformation chain that the next transformation will be
87    added to. */
88 static struct trns_chain *cur_trns_chain;
89
90 /* The compactor used to compact a case, if necessary;
91    otherwise a null pointer. */
92 static struct dict_compactor *compactor;
93
94 /* Time at which vfm was last invoked. */
95 static time_t last_vfm_invocation;
96
97 /* Lag queue. */
98 int n_lag;                      /* Number of cases to lag. */
99 static int lag_count;           /* Number of cases in lag_queue so far. */
100 static int lag_head;            /* Index where next case will be added. */
101 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
102
103 static void add_case_limit_trns (void);
104 static void add_filter_trns (void);
105 static void add_process_if_trns (void);
106
107 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
108                                 void *aux);
109 static void update_last_vfm_invocation (void);
110 static void create_trns_case (struct ccase *, struct dictionary *);
111 static void open_active_file (void);
112 static bool write_case (struct write_case_data *wc_data);
113 static void lag_case (const struct ccase *c);
114 static void clear_case (struct ccase *c);
115 static bool close_active_file (void);
116 \f
117 /* Public functions. */
118
119 /* Returns the last time the data was read. */
120 time_t
121 time_of_last_procedure (void) 
122 {
123   if (last_vfm_invocation == 0)
124     update_last_vfm_invocation ();
125   return last_vfm_invocation;
126 }
127
128 /* Reads the data from the input program and writes it to a new
129    active file.  For each case we read from the input program, we
130    do the following:
131
132    1. Execute permanent transformations.  If these drop the case,
133       start the next case from step 1.
134
135    2. Write case to replacement active file.
136    
137    3. Execute temporary transformations.  If these drop the case,
138       start the next case from step 1.
139       
140    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
141
142    Returns true if successful, false if an I/O error occurred. */
143 bool
144 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
145 {
146   if (proc_func == NULL
147       && case_source_is_class (vfm_source, &storage_source_class)
148       && vfm_sink == NULL
149       && temporary_trns_chain == NULL
150       && trns_chain_is_empty (permanent_trns_chain))
151     {
152       expr_free (process_if_expr);
153       process_if_expr = NULL;
154       dict_set_case_limit (default_dict, 0);
155       dict_clear_vectors (default_dict);
156
157       update_last_vfm_invocation ();
158       return true;
159     }
160   else 
161     {
162       bool ok;
163       
164       open_active_file ();
165       ok = internal_procedure (proc_func, aux);
166       ok = close_active_file () && ok;
167
168       return ok;
169     }
170 }
171
172 /* Callback function for multipass_procedure(). */
173 static bool
174 multipass_callback (struct ccase *c, void *cf_) 
175 {
176   struct casefile *cf = cf_;
177   return casefile_append (cf, c);
178 }
179
180 /* Procedure that allows multiple passes over the input data.
181    The entire active file is passed to PROC_FUNC, with the given
182    AUX as auxiliary data, as a unit. */
183 bool
184 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
185                      void *aux) 
186 {
187   if (case_source_is_class (vfm_source, &storage_source_class)
188       && vfm_sink == NULL
189       && temporary_trns_chain == NULL
190       && trns_chain_is_empty (permanent_trns_chain))
191     {
192       proc_func (storage_source_get_casefile (vfm_source), aux);
193
194       expr_free (process_if_expr);
195       process_if_expr = NULL;
196       dict_set_case_limit (default_dict, 0);
197       dict_clear_vectors (default_dict);
198
199       update_last_vfm_invocation ();
200       return true;
201     }
202   else 
203     {
204       struct casefile *cf;
205       bool ok;
206
207       assert (proc_func != NULL);
208
209       cf = casefile_create (dict_get_next_value_idx (default_dict));
210
211       open_active_file ();
212       ok = internal_procedure (multipass_callback, cf);
213       ok = proc_func (cf, aux) && ok;
214       ok = close_active_file () && ok;
215
216       casefile_destroy (cf);
217
218       return ok;
219     }
220 }
221
222 /* Executes a procedure, as procedure(), except that the caller
223    is responsible for calling open_active_file() and
224    close_active_file().
225    Returns true if successful, false if an I/O error occurred. */
226 static bool
227 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
228 {
229   struct write_case_data wc_data;
230   bool ok;
231
232   wc_data.proc_func = proc_func;
233   wc_data.aux = aux;
234   create_trns_case (&wc_data.trns_case, default_dict);
235   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
236   wc_data.cases_written = 0;
237
238   update_last_vfm_invocation ();
239
240   ok = (vfm_source == NULL
241         || vfm_source->class->read (vfm_source,
242                                     &wc_data.trns_case,
243                                     write_case, &wc_data));
244
245   case_destroy (&wc_data.sink_case);
246   case_destroy (&wc_data.trns_case);
247
248   return ok;
249 }
250
251 /* Updates last_vfm_invocation. */
252 static void
253 update_last_vfm_invocation (void) 
254 {
255   last_vfm_invocation = time (NULL);
256 }
257
258 /* Creates and returns a case, initializing it from the vectors
259    that say which `value's need to be initialized just once, and
260    which ones need to be re-initialized before every case. */
261 static void
262 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
263 {
264   size_t var_cnt = dict_get_var_cnt (dict);
265   size_t i;
266
267   case_create (trns_case, dict_get_next_value_idx (dict));
268   for (i = 0; i < var_cnt; i++) 
269     {
270       struct variable *v = dict_get_var (dict, i);
271       union value *value = case_data_rw (trns_case, v->fv);
272
273       if (v->type == NUMERIC)
274         value->f = v->leave ? 0.0 : SYSMIS;
275       else
276         memset (value->s, ' ', v->width);
277     }
278 }
279
280 /* Makes all preparations for reading from the data source and writing
281    to the data sink. */
282 static void
283 open_active_file (void)
284 {
285   add_case_limit_trns ();
286   add_filter_trns ();
287   add_process_if_trns ();
288
289   /* Finalize transformations. */
290   trns_chain_finalize (cur_trns_chain);
291
292   /* Make permanent_dict refer to the dictionary right before
293      data reaches the sink. */
294   if (permanent_dict == NULL)
295     permanent_dict = default_dict;
296
297   /* Figure out compaction. */
298   compactor = (dict_needs_compaction (permanent_dict)
299                ? dict_make_compactor (permanent_dict)
300                : NULL);
301
302   /* Prepare sink. */
303   if (vfm_sink == NULL)
304     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
305   if (vfm_sink->class->open != NULL)
306     vfm_sink->class->open (vfm_sink);
307
308   /* Allocate memory for lag queue. */
309   if (n_lag > 0)
310     {
311       int i;
312   
313       lag_count = 0;
314       lag_head = 0;
315       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
316       for (i = 0; i < n_lag; i++)
317         case_nullify (&lag_queue[i]);
318     }
319 }
320
321 /* Transforms trns_case and writes it to the replacement active
322    file if advisable.  Returns true if more cases can be
323    accepted, false otherwise.  Do not call this function again
324    after it has returned false once.  */
325 static bool
326 write_case (struct write_case_data *wc_data)
327 {
328   enum trns_result retval;
329   size_t case_nr;
330   
331   /* Execute permanent transformations.  */
332   case_nr = wc_data->cases_written + 1;
333   retval = trns_chain_execute (permanent_trns_chain,
334                                &wc_data->trns_case, &case_nr);
335   if (retval != TRNS_CONTINUE)
336     goto done;
337
338   /* Write case to LAG queue. */
339   if (n_lag)
340     lag_case (&wc_data->trns_case);
341
342   /* Write case to replacement active file. */
343   wc_data->cases_written++;
344   if (vfm_sink->class->write != NULL) 
345     {
346       if (compactor != NULL) 
347         {
348           dict_compactor_compact (compactor, &wc_data->sink_case,
349                                   &wc_data->trns_case);
350           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
351         }
352       else
353         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
354     }
355   
356   /* Execute temporary transformations. */
357   if (temporary_trns_chain != NULL) 
358     {
359       retval = trns_chain_execute (temporary_trns_chain,
360                                    &wc_data->trns_case,
361                                    &wc_data->cases_written);
362       if (retval != TRNS_CONTINUE)
363         goto done;
364     }
365
366   /* Pass case to procedure. */
367   if (wc_data->proc_func != NULL)
368     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
369       retval = TRNS_ERROR;
370
371  done:
372   clear_case (&wc_data->trns_case);
373   return retval != TRNS_ERROR;
374 }
375
376 /* Add C to the lag queue. */
377 static void
378 lag_case (const struct ccase *c)
379 {
380   if (lag_count < n_lag)
381     lag_count++;
382   case_destroy (&lag_queue[lag_head]);
383   case_clone (&lag_queue[lag_head], c);
384   if (++lag_head >= n_lag)
385     lag_head = 0;
386 }
387
388 /* Clears the variables in C that need to be cleared between
389    processing cases.  */
390 static void
391 clear_case (struct ccase *c)
392 {
393   size_t var_cnt = dict_get_var_cnt (default_dict);
394   size_t i;
395   
396   for (i = 0; i < var_cnt; i++) 
397     {
398       struct variable *v = dict_get_var (default_dict, i);
399       if (!v->leave) 
400         {
401           if (v->type == NUMERIC)
402             case_data_rw (c, v->fv)->f = SYSMIS;
403           else
404             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
405         } 
406     }
407 }
408
409 /* Closes the active file. */
410 static bool
411 close_active_file (void)
412 {
413   /* Free memory for lag queue, and turn off lagging. */
414   if (n_lag > 0)
415     {
416       int i;
417       
418       for (i = 0; i < n_lag; i++)
419         case_destroy (&lag_queue[i]);
420       free (lag_queue);
421       n_lag = 0;
422     }
423   
424   /* Dictionary from before TEMPORARY becomes permanent. */
425   proc_cancel_temporary_transformations ();
426
427   /* Finish compaction. */
428   if (compactor != NULL) 
429     {
430       dict_compactor_destroy (compactor);
431       dict_compact_values (default_dict); 
432     }
433     
434   /* Free data source. */
435   free_case_source (vfm_source);
436   vfm_source = NULL;
437
438   /* Old data sink becomes new data source. */
439   if (vfm_sink->class->make_source != NULL)
440     vfm_source = vfm_sink->class->make_source (vfm_sink);
441   free_case_sink (vfm_sink);
442   vfm_sink = NULL;
443
444   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
445      and get rid of all the transformations. */
446   dict_clear_vectors (default_dict);
447   permanent_dict = NULL;
448   return proc_cancel_all_transformations ();
449 }
450 \f
451 /* Returns a pointer to the lagged case from N_BEFORE cases before the
452    current one, or NULL if there haven't been that many cases yet. */
453 struct ccase *
454 lagged_case (int n_before)
455 {
456   assert (n_before >= 1 );
457   assert (n_before <= n_lag);
458
459   if (n_before <= lag_count)
460     {
461       int index = lag_head - n_before;
462       if (index < 0)
463         index += n_lag;
464       return &lag_queue[index];
465     }
466   else
467     return NULL;
468 }
469 \f
470 /* Represents auxiliary data for handling SPLIT FILE. */
471 struct split_aux_data 
472   {
473     size_t case_count;          /* Number of cases so far. */
474     struct ccase prev_case;     /* Data in previous case. */
475
476     /* Functions to call... */
477     void (*begin_func) (void *);               /* ...before data. */
478     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
479     void (*end_func) (void *);                 /* ...after data. */
480     void *func_aux;                            /* Auxiliary data. */ 
481   };
482
483 static int equal_splits (const struct ccase *, const struct ccase *);
484 static bool procedure_with_splits_callback (struct ccase *, void *);
485 static void dump_splits (struct ccase *);
486
487 /* Like procedure(), but it automatically breaks the case stream
488    into SPLIT FILE break groups.  Before each group of cases with
489    identical SPLIT FILE variable values, BEGIN_FUNC is called.
490    Then PROC_FUNC is called with each case in the group.  
491    END_FUNC is called when the group is finished.  FUNC_AUX is
492    passed to each of the functions as auxiliary data.
493
494    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
495    and END_FUNC will be called at all. 
496
497    If SPLIT FILE is not in effect, then there is one break group
498    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
499    will be called once.
500    
501    Returns true if successful, false if an I/O error occurred. */
502 bool
503 procedure_with_splits (void (*begin_func) (void *aux),
504                        bool (*proc_func) (struct ccase *, void *aux),
505                        void (*end_func) (void *aux),
506                        void *func_aux) 
507 {
508   struct split_aux_data split_aux;
509   bool ok;
510
511   split_aux.case_count = 0;
512   case_nullify (&split_aux.prev_case);
513   split_aux.begin_func = begin_func;
514   split_aux.proc_func = proc_func;
515   split_aux.end_func = end_func;
516   split_aux.func_aux = func_aux;
517
518   open_active_file ();
519   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
520   if (split_aux.case_count > 0 && end_func != NULL)
521     end_func (func_aux);
522   if (!close_active_file ())
523     ok = false;
524
525   case_destroy (&split_aux.prev_case);
526
527   return ok;
528 }
529
530 /* procedure() callback used by procedure_with_splits(). */
531 static bool
532 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
533 {
534   struct split_aux_data *split_aux = split_aux_;
535
536   /* Start a new series if needed. */
537   if (split_aux->case_count == 0
538       || !equal_splits (c, &split_aux->prev_case))
539     {
540       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
541         split_aux->end_func (split_aux->func_aux);
542
543       dump_splits (c);
544       case_destroy (&split_aux->prev_case);
545       case_clone (&split_aux->prev_case, c);
546
547       if (split_aux->begin_func != NULL)
548         split_aux->begin_func (split_aux->func_aux);
549     }
550
551   split_aux->case_count++;
552   if (split_aux->proc_func != NULL)
553     return split_aux->proc_func (c, split_aux->func_aux);
554   else
555     return true;
556 }
557
558 /* Compares the SPLIT FILE variables in cases A and B and returns
559    nonzero only if they differ. */
560 static int
561 equal_splits (const struct ccase *a, const struct ccase *b) 
562 {
563   return case_compare (a, b,
564                        dict_get_split_vars (default_dict),
565                        dict_get_split_cnt (default_dict)) == 0;
566 }
567
568 /* Dumps out the values of all the split variables for the case C. */
569 static void
570 dump_splits (struct ccase *c)
571 {
572   struct variable *const *split;
573   struct tab_table *t;
574   size_t split_cnt;
575   int i;
576
577   split_cnt = dict_get_split_cnt (default_dict);
578   if (split_cnt == 0)
579     return;
580
581   t = tab_create (3, split_cnt + 1, 0);
582   tab_dim (t, tab_natural_dimensions);
583   tab_vline (t, TAL_GAP, 1, 0, split_cnt);
584   tab_vline (t, TAL_GAP, 2, 0, split_cnt);
585   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
586   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
587   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
588   split = dict_get_split_vars (default_dict);
589   for (i = 0; i < split_cnt; i++)
590     {
591       struct variable *v = split[i];
592       char temp_buf[80];
593       const char *val_lab;
594
595       assert (v->type == NUMERIC || v->type == ALPHA);
596       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
597       
598       data_out (temp_buf, &v->print, case_data (c, v->fv));
599       
600       temp_buf[v->print.w] = 0;
601       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
602
603       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
604       if (val_lab)
605         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
606     }
607   tab_flags (t, SOMF_NO_TITLE);
608   tab_submit (t);
609 }
610 \f
611 /* Represents auxiliary data for handling SPLIT FILE in a
612    multipass procedure. */
613 struct multipass_split_aux_data 
614   {
615     struct ccase prev_case;     /* Data in previous case. */
616     struct casefile *casefile;  /* Accumulates data for a split. */
617
618     /* Function to call with the accumulated data. */
619     bool (*split_func) (const struct casefile *, void *);
620     void *func_aux;                            /* Auxiliary data. */ 
621   };
622
623 static bool multipass_split_callback (struct ccase *c, void *aux_);
624 static bool multipass_split_output (struct multipass_split_aux_data *);
625
626 /* Returns true if successful, false if an I/O error occurred. */
627 bool
628 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
629                                                      void *),
630                                  void *func_aux) 
631 {
632   struct multipass_split_aux_data aux;
633   bool ok;
634
635   assert (split_func != NULL);
636
637   open_active_file ();
638
639   case_nullify (&aux.prev_case);
640   aux.casefile = NULL;
641   aux.split_func = split_func;
642   aux.func_aux = func_aux;
643
644   ok = internal_procedure (multipass_split_callback, &aux);
645   if (aux.casefile != NULL)
646     ok = multipass_split_output (&aux) && ok;
647   case_destroy (&aux.prev_case);
648
649   if (!close_active_file ())
650     ok = false;
651
652   return ok;
653 }
654
655 /* procedure() callback used by multipass_procedure_with_splits(). */
656 static bool
657 multipass_split_callback (struct ccase *c, void *aux_)
658 {
659   struct multipass_split_aux_data *aux = aux_;
660   bool ok = true;
661
662   /* Start a new series if needed. */
663   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
664     {
665       /* Pass any cases to split_func. */
666       if (aux->casefile != NULL)
667         ok = multipass_split_output (aux);
668
669       /* Start a new casefile. */
670       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
671
672       /* Record split values. */
673       dump_splits (c);
674       case_destroy (&aux->prev_case);
675       case_clone (&aux->prev_case, c);
676     }
677
678   return casefile_append (aux->casefile, c) && ok;
679 }
680
681 static bool
682 multipass_split_output (struct multipass_split_aux_data *aux)
683 {
684   bool ok;
685   
686   assert (aux->casefile != NULL);
687   ok = aux->split_func (aux->casefile, aux->func_aux);
688   casefile_destroy (aux->casefile);
689   aux->casefile = NULL;
690
691   return ok;
692 }
693
694 /* Discards all the current state in preparation for a data-input
695    command like DATA LIST or GET. */
696 void
697 discard_variables (void)
698 {
699   dict_clear (default_dict);
700   fh_set_default_handle (NULL);
701
702   n_lag = 0;
703   
704   if (vfm_source != NULL)
705     {
706       free_case_source (vfm_source);
707       vfm_source = NULL;
708     }
709
710   proc_cancel_all_transformations ();
711
712   expr_free (process_if_expr);
713   process_if_expr = NULL;
714
715   proc_cancel_temporary_transformations ();
716 }
717 \f
718 /* Returns the current set of permanent transformations,
719    and clears the permanent transformations.
720    For use by INPUT PROGRAM. */
721 struct trns_chain *
722 proc_capture_transformations (void) 
723 {
724   struct trns_chain *chain;
725   
726   assert (temporary_trns_chain == NULL);
727   chain = permanent_trns_chain;
728   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
729   return chain;
730 }
731
732 /* Adds a transformation that processes a case with PROC and
733    frees itself with FREE to the current set of transformations.
734    The functions are passed AUX as auxiliary data. */
735 void
736 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
737 {
738   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
739 }
740
741 /* Adds a transformation that processes a case with PROC and
742    frees itself with FREE to the current set of transformations.
743    When parsing of the block of transformations is complete,
744    FINALIZE will be called.
745    The functions are passed AUX as auxiliary data. */
746 void
747 add_transformation_with_finalizer (trns_finalize_func *finalize,
748                                    trns_proc_func *proc,
749                                    trns_free_func *free, void *aux)
750 {
751   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
752 }
753
754 /* Returns the index of the next transformation.
755    This value can be returned by a transformation procedure
756    function to indicate a "jump" to that transformation. */
757 size_t
758 next_transformation (void) 
759 {
760   return trns_chain_next (cur_trns_chain);
761 }
762
763 /* Returns true if the next call to add_transformation() will add
764    a temporary transformation, false if it will add a permanent
765    transformation. */
766 bool
767 proc_in_temporary_transformations (void) 
768 {
769   return temporary_trns_chain != NULL;
770 }
771
772 /* Marks the start of temporary transformations.
773    Further calls to add_transformation() will add temporary
774    transformations. */
775 void
776 proc_start_temporary_transformations (void) 
777 {
778   if (!proc_in_temporary_transformations ())
779     {
780       add_case_limit_trns ();
781
782       permanent_dict = dict_clone (default_dict);
783       trns_chain_finalize (permanent_trns_chain);
784       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
785     }
786 }
787
788 /* Converts all the temporary transformations, if any, to
789    permanent transformations.  Further transformations will be
790    permanent.
791    Returns true if anything changed, false otherwise. */
792 bool
793 proc_make_temporary_transformations_permanent (void) 
794 {
795   if (proc_in_temporary_transformations ()) 
796     {
797       trns_chain_finalize (temporary_trns_chain);
798       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
799       temporary_trns_chain = NULL;
800
801       dict_destroy (permanent_dict);
802       permanent_dict = NULL;
803
804       return true;
805     }
806   else
807     return false;
808 }
809
810 /* Cancels all temporary transformations, if any.  Further
811    transformations will be permanent.
812    Returns true if anything changed, false otherwise. */
813 bool
814 proc_cancel_temporary_transformations (void) 
815 {
816   if (proc_in_temporary_transformations ()) 
817     {
818       dict_destroy (default_dict);
819       default_dict = permanent_dict;
820       permanent_dict = NULL;
821
822       trns_chain_destroy (temporary_trns_chain);
823       temporary_trns_chain = NULL;
824
825       return true;
826     }
827   else
828     return false;
829 }
830
831 /* Cancels all transformations, if any.
832    Returns true if successful, false on I/O error. */
833 bool
834 proc_cancel_all_transformations (void)
835 {
836   bool ok;
837   ok = trns_chain_destroy (permanent_trns_chain);
838   ok = trns_chain_destroy (temporary_trns_chain) && ok;
839   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
840   temporary_trns_chain = NULL;
841   return ok;
842 }
843 \f
844 /* Initializes procedure handling. */
845 void
846 proc_init (void) 
847 {
848   default_dict = dict_create ();
849   proc_cancel_all_transformations ();
850 }
851
852 /* Finishes up procedure handling. */
853 void
854 proc_done (void)
855 {
856   discard_variables ();
857 }
858
859 /* Sets SINK as the destination for procedure output from the
860    next procedure. */
861 void
862 proc_set_sink (struct case_sink *sink) 
863 {
864   assert (vfm_sink == NULL);
865   vfm_sink = sink;
866 }
867
868 /* Sets SOURCE as the source for procedure input for the next
869    procedure. */
870 void
871 proc_set_source (struct case_source *source) 
872 {
873   assert (vfm_source == NULL);
874   vfm_source = source;
875 }
876
877 /* Returns true if a source for the next procedure has been
878    configured, false otherwise. */
879 bool
880 proc_has_source (void) 
881 {
882   return vfm_source != NULL;
883 }
884
885 /* Returns the output from the previous procedure.
886    For use only immediately after executing a procedure.
887    The returned casefile is owned by the caller; it will not be
888    automatically used for the next procedure's input. */
889 struct casefile *
890 proc_capture_output (void) 
891 {
892   struct casefile *casefile;
893
894   /* Try to make sure that this function is called immediately
895      after procedure() or a similar function. */
896   assert (vfm_source != NULL);
897   assert (case_source_is_class (vfm_source, &storage_source_class));
898   assert (trns_chain_is_empty (permanent_trns_chain));
899   assert (!proc_in_temporary_transformations ());
900
901   casefile = storage_source_decapsulate (vfm_source);
902   vfm_source = NULL;
903
904   return casefile;
905 }
906 \f
907 static trns_proc_func case_limit_trns_proc;
908 static trns_free_func case_limit_trns_free;
909
910 /* Adds a transformation that limits the number of cases that may
911    pass through, if default_dict has a case limit. */
912 static void
913 add_case_limit_trns (void) 
914 {
915   size_t case_limit = dict_get_case_limit (default_dict);
916   if (case_limit != 0)
917     {
918       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
919       *cases_remaining = case_limit;
920       add_transformation (case_limit_trns_proc, case_limit_trns_free,
921                           cases_remaining);
922       dict_set_case_limit (default_dict, 0);
923     }
924 }
925
926 /* Limits the maximum number of cases processed to
927    *CASES_REMAINING. */
928 static int
929 case_limit_trns_proc (void *cases_remaining_,
930                       struct ccase *c UNUSED, int case_nr UNUSED) 
931 {
932   size_t *cases_remaining = cases_remaining_;
933   if (*cases_remaining > 0) 
934     {
935       *cases_remaining--;
936       return TRNS_CONTINUE;
937     }
938   else
939     return TRNS_DROP_CASE;
940 }
941
942 /* Frees the data associated with a case limit transformation. */
943 static bool
944 case_limit_trns_free (void *cases_remaining_) 
945 {
946   size_t *cases_remaining = cases_remaining_;
947   free (cases_remaining);
948   return true;
949 }
950 \f
951 static trns_proc_func filter_trns_proc;
952
953 /* Adds a temporary transformation to filter data according to
954    the variable specified on FILTER, if any. */
955 static void
956 add_filter_trns (void) 
957 {
958   struct variable *filter_var = dict_get_filter (default_dict);
959   if (filter_var != NULL) 
960     {
961       proc_start_temporary_transformations ();
962       add_transformation (filter_trns_proc, NULL, filter_var);
963     }
964 }
965
966 /* FILTER transformation. */
967 static int
968 filter_trns_proc (void *filter_var_,
969                   struct ccase *c UNUSED, int case_nr UNUSED) 
970   
971 {
972   struct variable *filter_var = filter_var_;
973   double f = case_num (c, filter_var->fv);
974   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
975           ? TRNS_CONTINUE : TRNS_DROP_CASE);
976 }
977 \f
978 static trns_proc_func process_if_trns_proc;
979 static trns_free_func process_if_trns_free;
980
981 /* Adds a temporary transformation to filter data according to
982    the expression specified on PROCESS IF, if any. */
983 static void
984 add_process_if_trns (void) 
985 {
986   if (process_if_expr != NULL) 
987     {
988       proc_start_temporary_transformations ();
989       add_transformation (process_if_trns_proc, process_if_trns_free,
990                           process_if_expr);
991       process_if_expr = NULL;
992     }
993 }
994
995 /* PROCESS IF transformation. */
996 static int
997 process_if_trns_proc (void *expression_,
998                       struct ccase *c UNUSED, int case_nr UNUSED) 
999   
1000 {
1001   struct expression *expression = expression_;
1002   return (expr_evaluate_num (expression, c, case_nr) == 1.0
1003           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1004 }
1005
1006 /* Frees a PROCESS IF transformation. */
1007 static bool
1008 process_if_trns_free (void *expression_) 
1009 {
1010   struct expression *expression = expression_;
1011   expr_free (expression);
1012   return true;
1013 }