9b94da01910e4743c3cc58955d7024e5ef2637b9
[pspp-builds.git] / src / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <procedure.h>
23
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28
29 #include "expressions/public.h"
30 #include <data/case-source.h>
31 #include <data/case-sink.h>
32 #include <data/case.h>
33 #include <data/casefile.h>
34 #include <data/dictionary.h>
35 #include <data/file-handle-def.h>
36 #include <data/settings.h>
37 #include <data/storage-stream.h>
38 #include <data/transformations.h>
39 #include <data/value-labels.h>
40 #include <data/variable.h>
41 #include <libpspp/alloc.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/str.h>
45 #include <output/manager.h>
46 #include <output/table.h>
47
48 #include "gettext.h"
49 #define _(msgid) gettext (msgid)
50
51 /*
52    Virtual File Manager (vfm):
53
54    vfm is used to process data files.  It uses the model that
55    data is read from one stream (the data source), processed,
56    then written to another (the data sink).  The data source is
57    then deleted and the data sink becomes the data source for the
58    next procedure. */
59
60 /* Procedure execution data. */
61 struct write_case_data
62   {
63     /* Function to call for each case. */
64     bool (*proc_func) (struct ccase *, void *); /* Function. */
65     void *aux;                                 /* Auxiliary data. */ 
66
67     struct ccase trns_case;     /* Case used for transformations. */
68     struct ccase sink_case;     /* Case written to sink, if
69                                    compaction is necessary. */
70     size_t cases_written;       /* Cases output so far. */
71     size_t cases_analyzed;      /* Cases passed to procedure so far. */
72   };
73
74 /* Cases are read from vfm_source,
75    pass through permanent_trns_chain (which transforms them into
76    the format described by permanent_dict),
77    are written to vfm_sink,
78    pass through temporary_trns_chain (which transforms them into
79    the format described by default_dict),
80    and are finally passed to the procedure. */
81 static struct case_source *vfm_source;
82 static struct trns_chain *permanent_trns_chain;
83 static struct dictionary *permanent_dict;
84 static struct case_sink *vfm_sink;
85 static struct trns_chain *temporary_trns_chain;
86 struct dictionary *default_dict;
87
88 /* The transformation chain that the next transformation will be
89    added to. */
90 static struct trns_chain *cur_trns_chain;
91
92 /* The compactor used to compact a case, if necessary;
93    otherwise a null pointer. */
94 static struct dict_compactor *compactor;
95
96 /* Time at which vfm was last invoked. */
97 static time_t last_vfm_invocation;
98
99 /* Lag queue. */
100 int n_lag;                      /* Number of cases to lag. */
101 static int lag_count;           /* Number of cases in lag_queue so far. */
102 static int lag_head;            /* Index where next case will be added. */
103 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
104
105 static void add_case_limit_trns (void);
106 static void add_filter_trns (void);
107 static void add_process_if_trns (void);
108
109 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
110                                 void *aux);
111 static void update_last_vfm_invocation (void);
112 static void create_trns_case (struct ccase *, struct dictionary *);
113 static void open_active_file (void);
114 static bool write_case (struct write_case_data *wc_data);
115 static void lag_case (const struct ccase *c);
116 static void clear_case (struct ccase *c);
117 static bool close_active_file (void);
118 \f
119 /* Public functions. */
120
121 /* Returns the last time the data was read. */
122 time_t
123 time_of_last_procedure (void) 
124 {
125   if (last_vfm_invocation == 0)
126     update_last_vfm_invocation ();
127   return last_vfm_invocation;
128 }
129
130 /* Reads the data from the input program and writes it to a new
131    active file.  For each case we read from the input program, we
132    do the following:
133
134    1. Execute permanent transformations.  If these drop the case,
135       start the next case from step 1.
136
137    2. Write case to replacement active file.
138    
139    3. Execute temporary transformations.  If these drop the case,
140       start the next case from step 1.
141       
142    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
143
144    Returns true if successful, false if an I/O error occurred. */
145 bool
146 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
147 {
148   if (proc_func == NULL
149       && case_source_is_class (vfm_source, &storage_source_class)
150       && vfm_sink == NULL
151       && temporary_trns_chain == NULL
152       && trns_chain_is_empty (permanent_trns_chain))
153     {
154       expr_free (process_if_expr);
155       process_if_expr = NULL;
156       dict_set_case_limit (default_dict, 0);
157       dict_clear_vectors (default_dict);
158
159       update_last_vfm_invocation ();
160       return true;
161     }
162   else 
163     {
164       bool ok;
165       
166       open_active_file ();
167       ok = internal_procedure (proc_func, aux);
168       ok = close_active_file () && ok;
169
170       return ok;
171     }
172 }
173
174 /* Callback function for multipass_procedure(). */
175 static bool
176 multipass_callback (struct ccase *c, void *cf_) 
177 {
178   struct casefile *cf = cf_;
179   return casefile_append (cf, c);
180 }
181
182 /* Procedure that allows multiple passes over the input data.
183    The entire active file is passed to PROC_FUNC, with the given
184    AUX as auxiliary data, as a unit. */
185 bool
186 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
187                      void *aux) 
188 {
189   if (case_source_is_class (vfm_source, &storage_source_class)
190       && vfm_sink == NULL
191       && temporary_trns_chain == NULL
192       && trns_chain_is_empty (permanent_trns_chain))
193     {
194       proc_func (storage_source_get_casefile (vfm_source), aux);
195
196       expr_free (process_if_expr);
197       process_if_expr = NULL;
198       dict_set_case_limit (default_dict, 0);
199       dict_clear_vectors (default_dict);
200
201       update_last_vfm_invocation ();
202       return true;
203     }
204   else 
205     {
206       struct casefile *cf;
207       bool ok;
208
209       assert (proc_func != NULL);
210
211       cf = casefile_create (dict_get_next_value_idx (default_dict));
212
213       open_active_file ();
214       ok = internal_procedure (multipass_callback, cf);
215       ok = proc_func (cf, aux) && ok;
216       ok = close_active_file () && ok;
217
218       casefile_destroy (cf);
219
220       return ok;
221     }
222 }
223
224 /* Executes a procedure, as procedure(), except that the caller
225    is responsible for calling open_active_file() and
226    close_active_file().
227    Returns true if successful, false if an I/O error occurred. */
228 static bool
229 internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux) 
230 {
231   struct write_case_data wc_data;
232   bool ok;
233
234   wc_data.proc_func = proc_func;
235   wc_data.aux = aux;
236   create_trns_case (&wc_data.trns_case, default_dict);
237   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
238   wc_data.cases_written = 0;
239
240   update_last_vfm_invocation ();
241
242   ok = (vfm_source == NULL
243         || vfm_source->class->read (vfm_source,
244                                     &wc_data.trns_case,
245                                     write_case, &wc_data));
246
247   case_destroy (&wc_data.sink_case);
248   case_destroy (&wc_data.trns_case);
249
250   return ok;
251 }
252
253 /* Updates last_vfm_invocation. */
254 static void
255 update_last_vfm_invocation (void) 
256 {
257   last_vfm_invocation = time (NULL);
258 }
259
260 /* Creates and returns a case, initializing it from the vectors
261    that say which `value's need to be initialized just once, and
262    which ones need to be re-initialized before every case. */
263 static void
264 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
265 {
266   size_t var_cnt = dict_get_var_cnt (dict);
267   size_t i;
268
269   case_create (trns_case, dict_get_next_value_idx (dict));
270   for (i = 0; i < var_cnt; i++) 
271     {
272       struct variable *v = dict_get_var (dict, i);
273       union value *value = case_data_rw (trns_case, v->fv);
274
275       if (v->type == NUMERIC)
276         value->f = v->leave ? 0.0 : SYSMIS;
277       else
278         memset (value->s, ' ', v->width);
279     }
280 }
281
282 /* Makes all preparations for reading from the data source and writing
283    to the data sink. */
284 static void
285 open_active_file (void)
286 {
287   add_case_limit_trns ();
288   add_filter_trns ();
289   add_process_if_trns ();
290
291   /* Finalize transformations. */
292   trns_chain_finalize (cur_trns_chain);
293
294   /* Make permanent_dict refer to the dictionary right before
295      data reaches the sink. */
296   if (permanent_dict == NULL)
297     permanent_dict = default_dict;
298
299   /* Figure out compaction. */
300   compactor = (dict_needs_compaction (permanent_dict)
301                ? dict_make_compactor (permanent_dict)
302                : NULL);
303
304   /* Prepare sink. */
305   if (vfm_sink == NULL)
306     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
307   if (vfm_sink->class->open != NULL)
308     vfm_sink->class->open (vfm_sink);
309
310   /* Allocate memory for lag queue. */
311   if (n_lag > 0)
312     {
313       int i;
314   
315       lag_count = 0;
316       lag_head = 0;
317       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
318       for (i = 0; i < n_lag; i++)
319         case_nullify (&lag_queue[i]);
320     }
321 }
322
323 /* Transforms trns_case and writes it to the replacement active
324    file if advisable.  Returns true if more cases can be
325    accepted, false otherwise.  Do not call this function again
326    after it has returned false once.  */
327 static bool
328 write_case (struct write_case_data *wc_data)
329 {
330   enum trns_result retval;
331   size_t case_nr;
332   
333   /* Execute permanent transformations.  */
334   case_nr = wc_data->cases_written + 1;
335   retval = trns_chain_execute (permanent_trns_chain,
336                                &wc_data->trns_case, &case_nr);
337   if (retval != TRNS_CONTINUE)
338     goto done;
339
340   /* Write case to LAG queue. */
341   if (n_lag)
342     lag_case (&wc_data->trns_case);
343
344   /* Write case to replacement active file. */
345   wc_data->cases_written++;
346   if (vfm_sink->class->write != NULL) 
347     {
348       if (compactor != NULL) 
349         {
350           dict_compactor_compact (compactor, &wc_data->sink_case,
351                                   &wc_data->trns_case);
352           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
353         }
354       else
355         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
356     }
357   
358   /* Execute temporary transformations. */
359   if (temporary_trns_chain != NULL) 
360     {
361       retval = trns_chain_execute (temporary_trns_chain,
362                                    &wc_data->trns_case,
363                                    &wc_data->cases_written);
364       if (retval != TRNS_CONTINUE)
365         goto done;
366     }
367
368   /* Pass case to procedure. */
369   wc_data->cases_analyzed++;
370   if (wc_data->proc_func != NULL)
371     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
372       retval = TRNS_ERROR;
373
374  done:
375   clear_case (&wc_data->trns_case);
376   return retval != TRNS_ERROR;
377 }
378
379 /* Add C to the lag queue. */
380 static void
381 lag_case (const struct ccase *c)
382 {
383   if (lag_count < n_lag)
384     lag_count++;
385   case_destroy (&lag_queue[lag_head]);
386   case_clone (&lag_queue[lag_head], c);
387   if (++lag_head >= n_lag)
388     lag_head = 0;
389 }
390
391 /* Clears the variables in C that need to be cleared between
392    processing cases.  */
393 static void
394 clear_case (struct ccase *c)
395 {
396   size_t var_cnt = dict_get_var_cnt (default_dict);
397   size_t i;
398   
399   for (i = 0; i < var_cnt; i++) 
400     {
401       struct variable *v = dict_get_var (default_dict, i);
402       if (!v->leave) 
403         {
404           if (v->type == NUMERIC)
405             case_data_rw (c, v->fv)->f = SYSMIS;
406           else
407             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
408         } 
409     }
410 }
411
412 /* Closes the active file. */
413 static bool
414 close_active_file (void)
415 {
416   /* Free memory for lag queue, and turn off lagging. */
417   if (n_lag > 0)
418     {
419       int i;
420       
421       for (i = 0; i < n_lag; i++)
422         case_destroy (&lag_queue[i]);
423       free (lag_queue);
424       n_lag = 0;
425     }
426   
427   /* Dictionary from before TEMPORARY becomes permanent. */
428   proc_cancel_temporary_transformations ();
429
430   /* Finish compaction. */
431   if (compactor != NULL) 
432     {
433       dict_compactor_destroy (compactor);
434       dict_compact_values (default_dict); 
435     }
436     
437   /* Free data source. */
438   free_case_source (vfm_source);
439   vfm_source = NULL;
440
441   /* Old data sink becomes new data source. */
442   if (vfm_sink->class->make_source != NULL)
443     vfm_source = vfm_sink->class->make_source (vfm_sink);
444   free_case_sink (vfm_sink);
445   vfm_sink = NULL;
446
447   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
448      and get rid of all the transformations. */
449   dict_clear_vectors (default_dict);
450   permanent_dict = NULL;
451   return proc_cancel_all_transformations ();
452 }
453 \f
454 /* Returns a pointer to the lagged case from N_BEFORE cases before the
455    current one, or NULL if there haven't been that many cases yet. */
456 struct ccase *
457 lagged_case (int n_before)
458 {
459   assert (n_before >= 1 );
460   assert (n_before <= n_lag);
461
462   if (n_before <= lag_count)
463     {
464       int index = lag_head - n_before;
465       if (index < 0)
466         index += n_lag;
467       return &lag_queue[index];
468     }
469   else
470     return NULL;
471 }
472 \f
473 /* Represents auxiliary data for handling SPLIT FILE. */
474 struct split_aux_data 
475   {
476     size_t case_count;          /* Number of cases so far. */
477     struct ccase prev_case;     /* Data in previous case. */
478
479     /* Functions to call... */
480     void (*begin_func) (void *);               /* ...before data. */
481     bool (*proc_func) (struct ccase *, void *); /* ...with data. */
482     void (*end_func) (void *);                 /* ...after data. */
483     void *func_aux;                            /* Auxiliary data. */ 
484   };
485
486 static int equal_splits (const struct ccase *, const struct ccase *);
487 static bool procedure_with_splits_callback (struct ccase *, void *);
488 static void dump_splits (struct ccase *);
489
490 /* Like procedure(), but it automatically breaks the case stream
491    into SPLIT FILE break groups.  Before each group of cases with
492    identical SPLIT FILE variable values, BEGIN_FUNC is called.
493    Then PROC_FUNC is called with each case in the group.  
494    END_FUNC is called when the group is finished.  FUNC_AUX is
495    passed to each of the functions as auxiliary data.
496
497    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
498    and END_FUNC will be called at all. 
499
500    If SPLIT FILE is not in effect, then there is one break group
501    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
502    will be called once.
503    
504    Returns true if successful, false if an I/O error occurred. */
505 bool
506 procedure_with_splits (void (*begin_func) (void *aux),
507                        bool (*proc_func) (struct ccase *, void *aux),
508                        void (*end_func) (void *aux),
509                        void *func_aux) 
510 {
511   struct split_aux_data split_aux;
512   bool ok;
513
514   split_aux.case_count = 0;
515   case_nullify (&split_aux.prev_case);
516   split_aux.begin_func = begin_func;
517   split_aux.proc_func = proc_func;
518   split_aux.end_func = end_func;
519   split_aux.func_aux = func_aux;
520
521   open_active_file ();
522   ok = internal_procedure (procedure_with_splits_callback, &split_aux);
523   if (split_aux.case_count > 0 && end_func != NULL)
524     end_func (func_aux);
525   if (!close_active_file ())
526     ok = false;
527
528   case_destroy (&split_aux.prev_case);
529
530   return ok;
531 }
532
533 /* procedure() callback used by procedure_with_splits(). */
534 static bool
535 procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
536 {
537   struct split_aux_data *split_aux = split_aux_;
538
539   /* Start a new series if needed. */
540   if (split_aux->case_count == 0
541       || !equal_splits (c, &split_aux->prev_case))
542     {
543       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
544         split_aux->end_func (split_aux->func_aux);
545
546       dump_splits (c);
547       case_destroy (&split_aux->prev_case);
548       case_clone (&split_aux->prev_case, c);
549
550       if (split_aux->begin_func != NULL)
551         split_aux->begin_func (split_aux->func_aux);
552     }
553
554   split_aux->case_count++;
555   if (split_aux->proc_func != NULL)
556     return split_aux->proc_func (c, split_aux->func_aux);
557   else
558     return true;
559 }
560
561 /* Compares the SPLIT FILE variables in cases A and B and returns
562    nonzero only if they differ. */
563 static int
564 equal_splits (const struct ccase *a, const struct ccase *b) 
565 {
566   return case_compare (a, b,
567                        dict_get_split_vars (default_dict),
568                        dict_get_split_cnt (default_dict)) == 0;
569 }
570
571 /* Dumps out the values of all the split variables for the case C. */
572 static void
573 dump_splits (struct ccase *c)
574 {
575   struct variable *const *split;
576   struct tab_table *t;
577   size_t split_cnt;
578   int i;
579
580   split_cnt = dict_get_split_cnt (default_dict);
581   if (split_cnt == 0)
582     return;
583
584   t = tab_create (3, split_cnt + 1, 0);
585   tab_dim (t, tab_natural_dimensions);
586   tab_vline (t, TAL_GAP, 1, 0, split_cnt);
587   tab_vline (t, TAL_GAP, 2, 0, split_cnt);
588   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
589   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
590   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
591   split = dict_get_split_vars (default_dict);
592   for (i = 0; i < split_cnt; i++)
593     {
594       struct variable *v = split[i];
595       char temp_buf[80];
596       const char *val_lab;
597
598       assert (v->type == NUMERIC || v->type == ALPHA);
599       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
600       
601       data_out (temp_buf, &v->print, case_data (c, v->fv));
602       
603       temp_buf[v->print.w] = 0;
604       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
605
606       val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
607       if (val_lab)
608         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
609     }
610   tab_flags (t, SOMF_NO_TITLE);
611   tab_submit (t);
612 }
613 \f
614 /* Represents auxiliary data for handling SPLIT FILE in a
615    multipass procedure. */
616 struct multipass_split_aux_data 
617   {
618     struct ccase prev_case;     /* Data in previous case. */
619     struct casefile *casefile;  /* Accumulates data for a split. */
620
621     /* Function to call with the accumulated data. */
622     bool (*split_func) (const struct casefile *, void *);
623     void *func_aux;                            /* Auxiliary data. */ 
624   };
625
626 static bool multipass_split_callback (struct ccase *c, void *aux_);
627 static bool multipass_split_output (struct multipass_split_aux_data *);
628
629 /* Returns true if successful, false if an I/O error occurred. */
630 bool
631 multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
632                                                      void *),
633                                  void *func_aux) 
634 {
635   struct multipass_split_aux_data aux;
636   bool ok;
637
638   assert (split_func != NULL);
639
640   open_active_file ();
641
642   case_nullify (&aux.prev_case);
643   aux.casefile = NULL;
644   aux.split_func = split_func;
645   aux.func_aux = func_aux;
646
647   ok = internal_procedure (multipass_split_callback, &aux);
648   if (aux.casefile != NULL)
649     ok = multipass_split_output (&aux) && ok;
650   case_destroy (&aux.prev_case);
651
652   if (!close_active_file ())
653     ok = false;
654
655   return ok;
656 }
657
658 /* procedure() callback used by multipass_procedure_with_splits(). */
659 static bool
660 multipass_split_callback (struct ccase *c, void *aux_)
661 {
662   struct multipass_split_aux_data *aux = aux_;
663   bool ok = true;
664
665   /* Start a new series if needed. */
666   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
667     {
668       /* Pass any cases to split_func. */
669       if (aux->casefile != NULL)
670         ok = multipass_split_output (aux);
671
672       /* Start a new casefile. */
673       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
674
675       /* Record split values. */
676       dump_splits (c);
677       case_destroy (&aux->prev_case);
678       case_clone (&aux->prev_case, c);
679     }
680
681   return casefile_append (aux->casefile, c) && ok;
682 }
683
684 static bool
685 multipass_split_output (struct multipass_split_aux_data *aux)
686 {
687   bool ok;
688   
689   assert (aux->casefile != NULL);
690   ok = aux->split_func (aux->casefile, aux->func_aux);
691   casefile_destroy (aux->casefile);
692   aux->casefile = NULL;
693
694   return ok;
695 }
696
697 /* Discards all the current state in preparation for a data-input
698    command like DATA LIST or GET. */
699 void
700 discard_variables (void)
701 {
702   dict_clear (default_dict);
703   fh_set_default_handle (NULL);
704
705   n_lag = 0;
706   
707   if (vfm_source != NULL)
708     {
709       free_case_source (vfm_source);
710       vfm_source = NULL;
711     }
712
713   proc_cancel_all_transformations ();
714
715   expr_free (process_if_expr);
716   process_if_expr = NULL;
717
718   proc_cancel_temporary_transformations ();
719 }
720 \f
721 /* Returns the current set of permanent transformations,
722    and clears the permanent transformations.
723    For use by INPUT PROGRAM. */
724 struct trns_chain *
725 proc_capture_transformations (void) 
726 {
727   struct trns_chain *chain;
728   
729   assert (temporary_trns_chain == NULL);
730   chain = permanent_trns_chain;
731   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
732   return chain;
733 }
734
735 /* Adds a transformation that processes a case with PROC and
736    frees itself with FREE to the current set of transformations.
737    The functions are passed AUX as auxiliary data. */
738 void
739 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
740 {
741   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
742 }
743
744 /* Adds a transformation that processes a case with PROC and
745    frees itself with FREE to the current set of transformations.
746    When parsing of the block of transformations is complete,
747    FINALIZE will be called.
748    The functions are passed AUX as auxiliary data. */
749 void
750 add_transformation_with_finalizer (trns_finalize_func *finalize,
751                                    trns_proc_func *proc,
752                                    trns_free_func *free, void *aux)
753 {
754   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
755 }
756
757 /* Returns the index of the next transformation.
758    This value can be returned by a transformation procedure
759    function to indicate a "jump" to that transformation. */
760 size_t
761 next_transformation (void) 
762 {
763   return trns_chain_next (cur_trns_chain);
764 }
765
766 /* Returns true if the next call to add_transformation() will add
767    a temporary transformation, false if it will add a permanent
768    transformation. */
769 bool
770 proc_in_temporary_transformations (void) 
771 {
772   return temporary_trns_chain != NULL;
773 }
774
775 /* Marks the start of temporary transformations.
776    Further calls to add_transformation() will add temporary
777    transformations. */
778 void
779 proc_start_temporary_transformations (void) 
780 {
781   if (!proc_in_temporary_transformations ())
782     {
783       add_case_limit_trns ();
784
785       permanent_dict = dict_clone (default_dict);
786       trns_chain_finalize (permanent_trns_chain);
787       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
788     }
789 }
790
791 /* Converts all the temporary transformations, if any, to
792    permanent transformations.  Further transformations will be
793    permanent.
794    Returns true if anything changed, false otherwise. */
795 bool
796 proc_make_temporary_transformations_permanent (void) 
797 {
798   if (proc_in_temporary_transformations ()) 
799     {
800       trns_chain_finalize (temporary_trns_chain);
801       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
802       temporary_trns_chain = NULL;
803
804       dict_destroy (permanent_dict);
805       permanent_dict = NULL;
806
807       return true;
808     }
809   else
810     return false;
811 }
812
813 /* Cancels all temporary transformations, if any.  Further
814    transformations will be permanent.
815    Returns true if anything changed, false otherwise. */
816 bool
817 proc_cancel_temporary_transformations (void) 
818 {
819   if (proc_in_temporary_transformations ()) 
820     {
821       dict_destroy (default_dict);
822       default_dict = permanent_dict;
823       permanent_dict = NULL;
824
825       trns_chain_destroy (temporary_trns_chain);
826       temporary_trns_chain = NULL;
827
828       return true;
829     }
830   else
831     return false;
832 }
833
834 /* Cancels all transformations, if any.
835    Returns true if successful, false on I/O error. */
836 bool
837 proc_cancel_all_transformations (void)
838 {
839   bool ok;
840   ok = trns_chain_destroy (permanent_trns_chain);
841   ok = trns_chain_destroy (temporary_trns_chain) && ok;
842   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
843   temporary_trns_chain = NULL;
844   return ok;
845 }
846 \f
847 /* Initializes procedure handling. */
848 void
849 proc_init (void) 
850 {
851   default_dict = dict_create ();
852   proc_cancel_all_transformations ();
853 }
854
855 /* Finishes up procedure handling. */
856 void
857 proc_done (void)
858 {
859   discard_variables ();
860 }
861
862 /* Sets SINK as the destination for procedure output from the
863    next procedure. */
864 void
865 proc_set_sink (struct case_sink *sink) 
866 {
867   assert (vfm_sink == NULL);
868   vfm_sink = sink;
869 }
870
871 /* Sets SOURCE as the source for procedure input for the next
872    procedure. */
873 void
874 proc_set_source (struct case_source *source) 
875 {
876   assert (vfm_source == NULL);
877   vfm_source = source;
878 }
879
880 /* Returns true if a source for the next procedure has been
881    configured, false otherwise. */
882 bool
883 proc_has_source (void) 
884 {
885   return vfm_source != NULL;
886 }
887
888 /* Returns the output from the previous procedure.
889    For use only immediately after executing a procedure.
890    The returned casefile is owned by the caller; it will not be
891    automatically used for the next procedure's input. */
892 struct casefile *
893 proc_capture_output (void) 
894 {
895   struct casefile *casefile;
896
897   /* Try to make sure that this function is called immediately
898      after procedure() or a similar function. */
899   assert (vfm_source != NULL);
900   assert (case_source_is_class (vfm_source, &storage_source_class));
901   assert (trns_chain_is_empty (permanent_trns_chain));
902   assert (!proc_in_temporary_transformations ());
903
904   casefile = storage_source_decapsulate (vfm_source);
905   vfm_source = NULL;
906
907   return casefile;
908 }
909 \f
910 static trns_proc_func case_limit_trns_proc;
911 static trns_free_func case_limit_trns_free;
912
913 /* Adds a transformation that limits the number of cases that may
914    pass through, if default_dict has a case limit. */
915 static void
916 add_case_limit_trns (void) 
917 {
918   size_t case_limit = dict_get_case_limit (default_dict);
919   if (case_limit != 0)
920     {
921       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
922       *cases_remaining = case_limit;
923       add_transformation (case_limit_trns_proc, case_limit_trns_free,
924                           cases_remaining);
925       dict_set_case_limit (default_dict, 0);
926     }
927 }
928
929 /* Limits the maximum number of cases processed to
930    *CASES_REMAINING. */
931 static int
932 case_limit_trns_proc (void *cases_remaining_,
933                       struct ccase *c UNUSED, int case_nr UNUSED) 
934 {
935   size_t *cases_remaining = cases_remaining_;
936   if (*cases_remaining > 0) 
937     {
938       *cases_remaining--;
939       return TRNS_CONTINUE;
940     }
941   else
942     return TRNS_DROP_CASE;
943 }
944
945 /* Frees the data associated with a case limit transformation. */
946 static bool
947 case_limit_trns_free (void *cases_remaining_) 
948 {
949   size_t *cases_remaining = cases_remaining_;
950   free (cases_remaining);
951   return true;
952 }
953 \f
954 static trns_proc_func filter_trns_proc;
955
956 /* Adds a temporary transformation to filter data according to
957    the variable specified on FILTER, if any. */
958 static void
959 add_filter_trns (void) 
960 {
961   struct variable *filter_var = dict_get_filter (default_dict);
962   if (filter_var != NULL) 
963     {
964       proc_start_temporary_transformations ();
965       add_transformation (filter_trns_proc, NULL, filter_var);
966     }
967 }
968
969 /* FILTER transformation. */
970 static int
971 filter_trns_proc (void *filter_var_,
972                   struct ccase *c UNUSED, int case_nr UNUSED) 
973   
974 {
975   struct variable *filter_var = filter_var_;
976   double f = case_num (c, filter_var->fv);
977   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
978           ? TRNS_CONTINUE : TRNS_DROP_CASE);
979 }
980 \f
981 static trns_proc_func process_if_trns_proc;
982 static trns_free_func process_if_trns_free;
983
984 /* Adds a temporary transformation to filter data according to
985    the expression specified on PROCESS IF, if any. */
986 static void
987 add_process_if_trns (void) 
988 {
989   if (process_if_expr != NULL) 
990     {
991       proc_start_temporary_transformations ();
992       add_transformation (process_if_trns_proc, process_if_trns_free,
993                           process_if_expr);
994       process_if_expr = NULL;
995     }
996 }
997
998 /* PROCESS IF transformation. */
999 static int
1000 process_if_trns_proc (void *expression_,
1001                       struct ccase *c UNUSED, int case_nr UNUSED) 
1002   
1003 {
1004   struct expression *expression = expression_;
1005   return (expr_evaluate_num (expression, c, case_nr) == 1.0
1006           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1007 }
1008
1009 /* Frees a PROCESS IF transformation. */
1010 static bool
1011 process_if_trns_free (void *expression_) 
1012 {
1013   struct expression *expression = expression_;
1014   expr_free (expression);
1015   return true;
1016 }