Continue reforming procedure execution. In this phase, move
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/value-labels.h>
38 #include <data/variable.h>
39 #include <language/expressions/public.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44 #include <output/manager.h>
45 #include <output/table.h>
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /*
51    Virtual File Manager (vfm):
52
53    vfm is used to process data files.  It uses the model that
54    data is read from one stream (the data source), processed,
55    then written to another (the data sink).  The data source is
56    then deleted and the data sink becomes the data source for the
57    next procedure. */
58
59 /* Procedure execution data. */
60 struct write_case_data
61   {
62     /* Function to call for each case. */
63     bool (*proc_func) (struct ccase *, void *); /* Function. */
64     void *aux;                                 /* Auxiliary data. */ 
65
66     struct ccase trns_case;     /* Case used for transformations. */
67     struct ccase sink_case;     /* Case written to sink, if
68                                    compaction is necessary. */
69     size_t cases_written;       /* Cases output so far. */
70     size_t cases_analyzed;      /* Cases passed to procedure so far. */
71   };
72
73 /* Cases are read from vfm_source,
74    pass through permanent_trns_chain (which transforms them into
75    the format described by permanent_dict),
76    are written to vfm_sink,
77    pass through temporary_trns_chain (which transforms them into
78    the format described by default_dict),
79    and are finally passed to the procedure. */
80 static struct case_source *vfm_source;
81 static struct trns_chain *permanent_trns_chain;
82 static struct dictionary *permanent_dict;
83 static struct case_sink *vfm_sink;
84 static struct trns_chain *temporary_trns_chain;
85 struct dictionary *default_dict;
86
87 /* The transformation chain that the next transformation will be
88    added to. */
89 static struct trns_chain *cur_trns_chain;
90
91 /* The compactor used to compact a case, if necessary;
92    otherwise a null pointer. */
93 static struct dict_compactor *compactor;
94
95 /* Time at which vfm was last invoked. */
96 static time_t last_vfm_invocation;
97
98 /* Lag queue. */
99 int n_lag;                      /* Number of cases to lag. */
100 static int lag_count;           /* Number of cases in lag_queue so far. */
101 static int lag_head;            /* Index where next case will be added. */
102 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
103
104 static void add_case_limit_trns (void);
105 static void add_filter_trns (void);
106 static void add_process_if_trns (void);
107
108 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
109                                 void *aux);
110 static void update_last_vfm_invocation (void);
111 static void create_trns_case (struct ccase *, struct dictionary *);
112 static void open_active_file (void);
113 static bool write_case (struct write_case_data *wc_data);
114 static void lag_case (const struct ccase *c);
115 static void clear_case (struct ccase *c);
116 static bool close_active_file (void);
117 \f
118 /* Public functions. */
119
120 /* Returns the last time the data was read. */
121 time_t
122 time_of_last_procedure (void) 
123 {
124   if (last_vfm_invocation == 0)
125     update_last_vfm_invocation ();
126   return last_vfm_invocation;
127 }
128
129 /* Reads the data from the input program and writes it to a new
130    active file.  For each case we read from the input program, we
131    do the following:
132
133    1. Execute permanent transformations.  If these drop the case,
134       start the next case from step 1.
135
136    2. Write case to replacement active file.
137    
138    3. Execute temporary transformations.  If these drop the case,
139       start the next case from step 1.
140       
141    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
142
143    Returns true if successful, false if an I/O error occurred. */
144 bool
145 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
146 {
147   if (proc_func == NULL
148       && case_source_is_class (vfm_source, &storage_source_class)
149       && vfm_sink == NULL
150       && temporary_trns_chain == NULL
151       && trns_chain_is_empty (permanent_trns_chain))
152     {
153       expr_free (process_if_expr);
154       process_if_expr = NULL;
155       dict_set_case_limit (default_dict, 0);
156       dict_clear_vectors (default_dict);
157
158       update_last_vfm_invocation ();
159       return true;
160     }
161   else 
162     {
163       bool ok;
164       
165       open_active_file ();
166       ok = internal_procedure (proc_func, aux);
167       ok = close_active_file () && ok;
168
169       return ok;
170     }
171 }
172
173 /* Callback function for multipass_procedure(). */
174 static bool
175 multipass_callback (struct ccase *c, void *cf_) 
176 {
177   struct casefile *cf = cf_;
178   return casefile_append (cf, c);
179 }
180
181 /* Procedure that allows multiple passes over the input data.
182    The entire active file is passed to PROC_FUNC, with the given
183    AUX as auxiliary data, as a unit. */
184 bool
185 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
186                      void *aux) 
187 {
188   if (case_source_is_class (vfm_source, &storage_source_class)
189       && vfm_sink == NULL
190       && temporary_trns_chain == NULL
191       && trns_chain_is_empty (permanent_trns_chain))
192     {
193       proc_func (storage_source_get_casefile (vfm_source), aux);
194
195       expr_free (process_if_expr);
196       process_if_expr = NULL;
197       dict_set_case_limit (default_dict, 0);
198       dict_clear_vectors (default_dict);
199
200       update_last_vfm_invocation ();
201       return true;
202     }
203   else 
204     {
205       struct casefile *cf;
206       bool ok;
207
208       assert (proc_func != NULL);
209
210       cf = casefile_create (dict_get_next_value_idx (default_dict));
211
212       open_active_file ();
213       ok = internal_procedure (multipass_callback, cf);
214       ok = proc_func (cf, aux) && ok;
215       ok = close_active_file () && ok;
216
217       casefile_destroy (cf);
218
219       return ok;
220     }
221 }
222
223 /* Executes a procedure, as procedure(), except that the caller
224    is responsible for calling open_active_file() and
225    close_active_file().
226    Returns true if successful, false if an I/O error occurred. */
227 static bool
228 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
229 {
230   struct write_case_data wc_data;
231   bool ok;
232
233   wc_data.proc_func = proc_func;
234   wc_data.aux = aux;
235   create_trns_case (&wc_data.trns_case, default_dict);
236   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
237   wc_data.cases_written = 0;
238
239   update_last_vfm_invocation ();
240
241   ok = (vfm_source == NULL
242         || vfm_source->class->read (vfm_source,
243                                     &wc_data.trns_case,
244                                     write_case, &wc_data));
245
246   case_destroy (&wc_data.sink_case);
247   case_destroy (&wc_data.trns_case);
248
249   return ok;
250 }
251
252 /* Updates last_vfm_invocation. */
253 static void
254 update_last_vfm_invocation (void) 
255 {
256   last_vfm_invocation = time (NULL);
257 }
258
259 /* Creates and returns a case, initializing it from the vectors
260    that say which `value's need to be initialized just once, and
261    which ones need to be re-initialized before every case. */
262 static void
263 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
264 {
265   size_t var_cnt = dict_get_var_cnt (dict);
266   size_t i;
267
268   case_create (trns_case, dict_get_next_value_idx (dict));
269   for (i = 0; i < var_cnt; i++) 
270     {
271       struct variable *v = dict_get_var (dict, i);
272       union value *value = case_data_rw (trns_case, v->fv);
273
274       if (v->type == NUMERIC)
275         value->f = v->leave ? 0.0 : SYSMIS;
276       else
277         memset (value->s, ' ', v->width);
278     }
279 }
280
281 /* Makes all preparations for reading from the data source and writing
282    to the data sink. */
283 static void
284 open_active_file (void)
285 {
286   add_case_limit_trns ();
287   add_filter_trns ();
288   add_process_if_trns ();
289
290   /* Finalize transformations. */
291   trns_chain_finalize (cur_trns_chain);
292
293   /* Make permanent_dict refer to the dictionary right before
294      data reaches the sink. */
295   if (permanent_dict == NULL)
296     permanent_dict = default_dict;
297
298   /* Figure out compaction. */
299   compactor = (dict_needs_compaction (permanent_dict)
300                ? dict_make_compactor (permanent_dict)
301                : NULL);
302
303   /* Prepare sink. */
304   if (vfm_sink == NULL)
305     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
306   if (vfm_sink->class->open != NULL)
307     vfm_sink->class->open (vfm_sink);
308
309   /* Allocate memory for lag queue. */
310   if (n_lag > 0)
311     {
312       int i;
313   
314       lag_count = 0;
315       lag_head = 0;
316       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
317       for (i = 0; i < n_lag; i++)
318         case_nullify (&lag_queue[i]);
319     }
320 }
321
322 /* Transforms trns_case and writes it to the replacement active
323    file if advisable.  Returns true if more cases can be
324    accepted, false otherwise.  Do not call this function again
325    after it has returned false once.  */
326 static bool
327 write_case (struct write_case_data *wc_data)
328 {
329   enum trns_result retval;
330   size_t case_nr;
331   
332   /* Execute permanent transformations.  */
333   case_nr = wc_data->cases_written + 1;
334   retval = trns_chain_execute (permanent_trns_chain,
335                                &wc_data->trns_case, &case_nr);
336   if (retval != TRNS_CONTINUE)
337     goto done;
338
339   /* Write case to LAG queue. */
340   if (n_lag)
341     lag_case (&wc_data->trns_case);
342
343   /* Write case to replacement active file. */
344   wc_data->cases_written++;
345   if (vfm_sink->class->write != NULL) 
346     {
347       if (compactor != NULL) 
348         {
349           dict_compactor_compact (compactor, &wc_data->sink_case,
350                                   &wc_data->trns_case);
351           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
352         }
353       else
354         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
355     }
356   
357   /* Execute temporary transformations. */
358   if (temporary_trns_chain != NULL) 
359     {
360       retval = trns_chain_execute (temporary_trns_chain,
361                                    &wc_data->trns_case,
362                                    &wc_data->cases_written);
363       if (retval != TRNS_CONTINUE)
364         goto done;
365     }
366
367   /* Pass case to procedure. */
368   wc_data->cases_analyzed++;
369   if (wc_data->proc_func != NULL)
370     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
371       retval = TRNS_ERROR;
372
373  done:
374   clear_case (&wc_data->trns_case);
375   return retval != TRNS_ERROR;
376 }
377
378 /* Add C to the lag queue. */
379 static void
380 lag_case (const struct ccase *c)
381 {
382   if (lag_count < n_lag)
383     lag_count++;
384   case_destroy (&lag_queue[lag_head]);
385   case_clone (&lag_queue[lag_head], c);
386   if (++lag_head >= n_lag)
387     lag_head = 0;
388 }
389
390 /* Clears the variables in C that need to be cleared between
391    processing cases.  */
392 static void
393 clear_case (struct ccase *c)
394 {
395   size_t var_cnt = dict_get_var_cnt (default_dict);
396   size_t i;
397   
398   for (i = 0; i < var_cnt; i++) 
399     {
400       struct variable *v = dict_get_var (default_dict, i);
401       if (!v->leave) 
402         {
403           if (v->type == NUMERIC)
404             case_data_rw (c, v->fv)->f = SYSMIS;
405           else
406             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
407         } 
408     }
409 }
410
411 /* Closes the active file. */
412 static bool
413 close_active_file (void)
414 {
415   /* Free memory for lag queue, and turn off lagging. */
416   if (n_lag > 0)
417     {
418       int i;
419       
420       for (i = 0; i < n_lag; i++)
421         case_destroy (&lag_queue[i]);
422       free (lag_queue);
423       n_lag = 0;
424     }
425   
426   /* Dictionary from before TEMPORARY becomes permanent. */
427   proc_cancel_temporary_transformations ();
428
429   /* Finish compaction. */
430   if (compactor != NULL) 
431     {
432       dict_compactor_destroy (compactor);
433       dict_compact_values (default_dict); 
434     }
435     
436   /* Free data source. */
437   free_case_source (vfm_source);
438   vfm_source = NULL;
439
440   /* Old data sink becomes new data source. */
441   if (vfm_sink->class->make_source != NULL)
442     vfm_source = vfm_sink->class->make_source (vfm_sink);
443   free_case_sink (vfm_sink);
444   vfm_sink = NULL;
445
446   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
447      and get rid of all the transformations. */
448   dict_clear_vectors (default_dict);
449   permanent_dict = NULL;
450   return proc_cancel_all_transformations ();
451 }
452 \f
453 /* Returns a pointer to the lagged case from N_BEFORE cases before the
454    current one, or NULL if there haven't been that many cases yet. */
455 struct ccase *
456 lagged_case (int n_before)
457 {
458   assert (n_before >= 1 );
459   assert (n_before <= n_lag);
460
461   if (n_before <= lag_count)
462     {
463       int index = lag_head - n_before;
464       if (index < 0)
465         index += n_lag;
466       return &lag_queue[index];
467     }
468   else
469     return NULL;
470 }
471 \f
472 /* Represents auxiliary data for handling SPLIT FILE. */
473 struct split_aux_data 
474   {
475     size_t case_count;          /* Number of cases so far. */
476     struct ccase prev_case;     /* Data in previous case. */
477
478     /* Functions to call... */
479     void (*begin_func) (void *);               /* ...before data. */
480     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
481     void (*end_func) (void *);                 /* ...after data. */
482     void *func_aux;                            /* Auxiliary data. */ 
483   };
484
485 static int equal_splits (const struct ccase *, const struct ccase *);
486 static bool procedure_with_splits_callback (struct ccase *, void *);
487 static void dump_splits (struct ccase *);
488
489 /* Like procedure(), but it automatically breaks the case stream
490    into SPLIT FILE break groups.  Before each group of cases with
491    identical SPLIT FILE variable values, BEGIN_FUNC is called.
492    Then PROC_FUNC is called with each case in the group.  
493    END_FUNC is called when the group is finished.  FUNC_AUX is
494    passed to each of the functions as auxiliary data.
495
496    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
497    and END_FUNC will be called at all. 
498
499    If SPLIT FILE is not in effect, then there is one break group
500    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
501    will be called once.
502    
503    Returns true if successful, false if an I/O error occurred. */
504 bool
505 procedure_with_splits (void (*begin_func) (void *aux),
506                        bool (*proc_func) (struct ccase *, void *aux),
507                        void (*end_func) (void *aux),
508                        void *func_aux) 
509 {
510   struct split_aux_data split_aux;
511   bool ok;
512
513   split_aux.case_count = 0;
514   case_nullify (&split_aux.prev_case);
515   split_aux.begin_func = begin_func;
516   split_aux.proc_func = proc_func;
517   split_aux.end_func = end_func;
518   split_aux.func_aux = func_aux;
519
520   open_active_file ();
521   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
522   if (split_aux.case_count > 0 && end_func != NULL)
523     end_func (func_aux);
524   if (!close_active_file ())
525     ok = false;
526
527   case_destroy (&split_aux.prev_case);
528
529   return ok;
530 }
531
532 /* procedure() callback used by procedure_with_splits(). */
533 static bool
534 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
535 {
536   struct split_aux_data *split_aux = split_aux_;
537
538   /* Start a new series if needed. */
539   if (split_aux->case_count == 0
540       || !equal_splits (c, &split_aux->prev_case))
541     {
542       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
543         split_aux->end_func (split_aux->func_aux);
544
545       dump_splits (c);
546       case_destroy (&split_aux->prev_case);
547       case_clone (&split_aux->prev_case, c);
548
549       if (split_aux->begin_func != NULL)
550         split_aux->begin_func (split_aux->func_aux);
551     }
552
553   split_aux->case_count++;
554   if (split_aux->proc_func != NULL)
555     return split_aux->proc_func (c, split_aux->func_aux);
556   else
557     return true;
558 }
559
560 /* Compares the SPLIT FILE variables in cases A and B and returns
561    nonzero only if they differ. */
562 static int
563 equal_splits (const struct ccase *a, const struct ccase *b) 
564 {
565   return case_compare (a, b,
566                        dict_get_split_vars (default_dict),
567                        dict_get_split_cnt (default_dict)) == 0;
568 }
569
570 /* Dumps out the values of all the split variables for the case C. */
571 static void
572 dump_splits (struct ccase *c)
573 {
574   struct variable *const *split;
575   struct tab_table *t;
576   size_t split_cnt;
577   int i;
578
579   split_cnt = dict_get_split_cnt (default_dict);
580   if (split_cnt == 0)
581     return;
582
583   t = tab_create (3, split_cnt + 1, 0);
584   tab_dim (t, tab_natural_dimensions);
585   tab_vline (t, TAL_GAP, 1, 0, split_cnt);
586   tab_vline (t, TAL_GAP, 2, 0, split_cnt);
587   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
588   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
589   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
590   split = dict_get_split_vars (default_dict);
591   for (i = 0; i < split_cnt; i++)
592     {
593       struct variable *v = split[i];
594       char temp_buf[80];
595       const char *val_lab;
596
597       assert (v->type == NUMERIC || v->type == ALPHA);
598       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
599       
600       data_out (temp_buf, &v->print, case_data (c, v->fv));
601       
602       temp_buf[v->print.w] = 0;
603       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
604
605       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
606       if (val_lab)
607         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
608     }
609   tab_flags (t, SOMF_NO_TITLE);
610   tab_submit (t);
611 }
612 \f
613 /* Represents auxiliary data for handling SPLIT FILE in a
614    multipass procedure. */
615 struct multipass_split_aux_data 
616   {
617     struct ccase prev_case;     /* Data in previous case. */
618     struct casefile *casefile;  /* Accumulates data for a split. */
619
620     /* Function to call with the accumulated data. */
621     bool (*split_func) (const struct casefile *, void *);
622     void *func_aux;                            /* Auxiliary data. */ 
623   };
624
625 static bool multipass_split_callback (struct ccase *c, void *aux_);
626 static bool multipass_split_output (struct multipass_split_aux_data *);
627
628 /* Returns true if successful, false if an I/O error occurred. */
629 bool
630 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
631                                                      void *),
632                                  void *func_aux) 
633 {
634   struct multipass_split_aux_data aux;
635   bool ok;
636
637   assert (split_func != NULL);
638
639   open_active_file ();
640
641   case_nullify (&aux.prev_case);
642   aux.casefile = NULL;
643   aux.split_func = split_func;
644   aux.func_aux = func_aux;
645
646   ok = internal_procedure (multipass_split_callback, &aux);
647   if (aux.casefile != NULL)
648     ok = multipass_split_output (&aux) && ok;
649   case_destroy (&aux.prev_case);
650
651   if (!close_active_file ())
652     ok = false;
653
654   return ok;
655 }
656
657 /* procedure() callback used by multipass_procedure_with_splits(). */
658 static bool
659 multipass_split_callback (struct ccase *c, void *aux_)
660 {
661   struct multipass_split_aux_data *aux = aux_;
662   bool ok = true;
663
664   /* Start a new series if needed. */
665   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
666     {
667       /* Pass any cases to split_func. */
668       if (aux->casefile != NULL)
669         ok = multipass_split_output (aux);
670
671       /* Start a new casefile. */
672       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
673
674       /* Record split values. */
675       dump_splits (c);
676       case_destroy (&aux->prev_case);
677       case_clone (&aux->prev_case, c);
678     }
679
680   return casefile_append (aux->casefile, c) && ok;
681 }
682
683 static bool
684 multipass_split_output (struct multipass_split_aux_data *aux)
685 {
686   bool ok;
687   
688   assert (aux->casefile != NULL);
689   ok = aux->split_func (aux->casefile, aux->func_aux);
690   casefile_destroy (aux->casefile);
691   aux->casefile = NULL;
692
693   return ok;
694 }
695
696 /* Discards all the current state in preparation for a data-input
697    command like DATA LIST or GET. */
698 void
699 discard_variables (void)
700 {
701   dict_clear (default_dict);
702   fh_set_default_handle (NULL);
703
704   n_lag = 0;
705   
706   if (vfm_source != NULL)
707     {
708       free_case_source (vfm_source);
709       vfm_source = NULL;
710     }
711
712   proc_cancel_all_transformations ();
713
714   expr_free (process_if_expr);
715   process_if_expr = NULL;
716
717   proc_cancel_temporary_transformations ();
718 }
719 \f
720 /* Returns the current set of permanent transformations,
721    and clears the permanent transformations.
722    For use by INPUT PROGRAM. */
723 struct trns_chain *
724 proc_capture_transformations (void) 
725 {
726   struct trns_chain *chain;
727   
728   assert (temporary_trns_chain == NULL);
729   chain = permanent_trns_chain;
730   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
731   return chain;
732 }
733
734 /* Adds a transformation that processes a case with PROC and
735    frees itself with FREE to the current set of transformations.
736    The functions are passed AUX as auxiliary data. */
737 void
738 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
739 {
740   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
741 }
742
743 /* Adds a transformation that processes a case with PROC and
744    frees itself with FREE to the current set of transformations.
745    When parsing of the block of transformations is complete,
746    FINALIZE will be called.
747    The functions are passed AUX as auxiliary data. */
748 void
749 add_transformation_with_finalizer (trns_finalize_func *finalize,
750                                    trns_proc_func *proc,
751                                    trns_free_func *free, void *aux)
752 {
753   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
754 }
755
756 /* Returns the index of the next transformation.
757    This value can be returned by a transformation procedure
758    function to indicate a "jump" to that transformation. */
759 size_t
760 next_transformation (void) 
761 {
762   return trns_chain_next (cur_trns_chain);
763 }
764
765 /* Returns true if the next call to add_transformation() will add
766    a temporary transformation, false if it will add a permanent
767    transformation. */
768 bool
769 proc_in_temporary_transformations (void) 
770 {
771   return temporary_trns_chain != NULL;
772 }
773
774 /* Marks the start of temporary transformations.
775    Further calls to add_transformation() will add temporary
776    transformations. */
777 void
778 proc_start_temporary_transformations (void) 
779 {
780   if (!proc_in_temporary_transformations ())
781     {
782       add_case_limit_trns ();
783
784       permanent_dict = dict_clone (default_dict);
785       trns_chain_finalize (permanent_trns_chain);
786       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
787     }
788 }
789
790 /* Converts all the temporary transformations, if any, to
791    permanent transformations.  Further transformations will be
792    permanent.
793    Returns true if anything changed, false otherwise. */
794 bool
795 proc_make_temporary_transformations_permanent (void) 
796 {
797   if (proc_in_temporary_transformations ()) 
798     {
799       trns_chain_finalize (temporary_trns_chain);
800       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
801       temporary_trns_chain = NULL;
802
803       dict_destroy (permanent_dict);
804       permanent_dict = NULL;
805
806       return true;
807     }
808   else
809     return false;
810 }
811
812 /* Cancels all temporary transformations, if any.  Further
813    transformations will be permanent.
814    Returns true if anything changed, false otherwise. */
815 bool
816 proc_cancel_temporary_transformations (void) 
817 {
818   if (proc_in_temporary_transformations ()) 
819     {
820       dict_destroy (default_dict);
821       default_dict = permanent_dict;
822       permanent_dict = NULL;
823
824       trns_chain_destroy (temporary_trns_chain);
825       temporary_trns_chain = NULL;
826
827       return true;
828     }
829   else
830     return false;
831 }
832
833 /* Cancels all transformations, if any.
834    Returns true if successful, false on I/O error. */
835 bool
836 proc_cancel_all_transformations (void)
837 {
838   bool ok;
839   ok = trns_chain_destroy (permanent_trns_chain);
840   ok = trns_chain_destroy (temporary_trns_chain) && ok;
841   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
842   temporary_trns_chain = NULL;
843   return ok;
844 }
845 \f
846 /* Initializes procedure handling. */
847 void
848 proc_init (void) 
849 {
850   default_dict = dict_create ();
851   proc_cancel_all_transformations ();
852 }
853
854 /* Finishes up procedure handling. */
855 void
856 proc_done (void)
857 {
858   discard_variables ();
859 }
860
861 /* Sets SINK as the destination for procedure output from the
862    next procedure. */
863 void
864 proc_set_sink (struct case_sink *sink) 
865 {
866   assert (vfm_sink == NULL);
867   vfm_sink = sink;
868 }
869
870 /* Sets SOURCE as the source for procedure input for the next
871    procedure. */
872 void
873 proc_set_source (struct case_source *source) 
874 {
875   assert (vfm_source == NULL);
876   vfm_source = source;
877 }
878
879 /* Returns true if a source for the next procedure has been
880    configured, false otherwise. */
881 bool
882 proc_has_source (void) 
883 {
884   return vfm_source != NULL;
885 }
886
887 /* Returns the output from the previous procedure.
888    For use only immediately after executing a procedure.
889    The returned casefile is owned by the caller; it will not be
890    automatically used for the next procedure's input. */
891 struct casefile *
892 proc_capture_output (void) 
893 {
894   struct casefile *casefile;
895
896   /* Try to make sure that this function is called immediately
897      after procedure() or a similar function. */
898   assert (vfm_source != NULL);
899   assert (case_source_is_class (vfm_source, &storage_source_class));
900   assert (trns_chain_is_empty (permanent_trns_chain));
901   assert (!proc_in_temporary_transformations ());
902
903   casefile = storage_source_decapsulate (vfm_source);
904   vfm_source = NULL;
905
906   return casefile;
907 }
908 \f
909 static trns_proc_func case_limit_trns_proc;
910 static trns_free_func case_limit_trns_free;
911
912 /* Adds a transformation that limits the number of cases that may
913    pass through, if default_dict has a case limit. */
914 static void
915 add_case_limit_trns (void) 
916 {
917   size_t case_limit = dict_get_case_limit (default_dict);
918   if (case_limit != 0)
919     {
920       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
921       *cases_remaining = case_limit;
922       add_transformation (case_limit_trns_proc, case_limit_trns_free,
923                           cases_remaining);
924       dict_set_case_limit (default_dict, 0);
925     }
926 }
927
928 /* Limits the maximum number of cases processed to
929    *CASES_REMAINING. */
930 static int
931 case_limit_trns_proc (void *cases_remaining_,
932                       struct ccase *c UNUSED, int case_nr UNUSED) 
933 {
934   size_t *cases_remaining = cases_remaining_;
935   if (*cases_remaining > 0) 
936     {
937       *cases_remaining--;
938       return TRNS_CONTINUE;
939     }
940   else
941     return TRNS_DROP_CASE;
942 }
943
944 /* Frees the data associated with a case limit transformation. */
945 static bool
946 case_limit_trns_free (void *cases_remaining_) 
947 {
948   size_t *cases_remaining = cases_remaining_;
949   free (cases_remaining);
950   return true;
951 }
952 \f
953 static trns_proc_func filter_trns_proc;
954
955 /* Adds a temporary transformation to filter data according to
956    the variable specified on FILTER, if any. */
957 static void
958 add_filter_trns (void) 
959 {
960   struct variable *filter_var = dict_get_filter (default_dict);
961   if (filter_var != NULL) 
962     {
963       proc_start_temporary_transformations ();
964       add_transformation (filter_trns_proc, NULL, filter_var);
965     }
966 }
967
968 /* FILTER transformation. */
969 static int
970 filter_trns_proc (void *filter_var_,
971                   struct ccase *c UNUSED, int case_nr UNUSED) 
972   
973 {
974   struct variable *filter_var = filter_var_;
975   double f = case_num (c, filter_var->fv);
976   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
977           ? TRNS_CONTINUE : TRNS_DROP_CASE);
978 }
979 \f
980 static trns_proc_func process_if_trns_proc;
981 static trns_free_func process_if_trns_free;
982
983 /* Adds a temporary transformation to filter data according to
984    the expression specified on PROCESS IF, if any. */
985 static void
986 add_process_if_trns (void) 
987 {
988   if (process_if_expr != NULL) 
989     {
990       proc_start_temporary_transformations ();
991       add_transformation (process_if_trns_proc, process_if_trns_free,
992                           process_if_expr);
993       process_if_expr = NULL;
994     }
995 }
996
997 /* PROCESS IF transformation. */
998 static int
999 process_if_trns_proc (void *expression_,
1000                       struct ccase *c UNUSED, int case_nr UNUSED) 
1001   
1002 {
1003   struct expression *expression = expression_;
1004   return (expr_evaluate_num (expression, c, case_nr) == 1.0
1005           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1006 }
1007
1008 /* Frees a PROCESS IF transformation. */
1009 static bool
1010 process_if_trns_free (void *expression_) 
1011 {
1012   struct expression *expression = expression_;
1013   expr_free (expression);
1014   return true;
1015 }