dde7fcee7541633eda6a2f7e051b5ba2c5179a4f
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 /*
43    Virtual File Manager (vfm):
44
45    vfm is used to process data files.  It uses the model that
46    data is read from one stream (the data source), processed,
47    then written to another (the data sink).  The data source is
48    then deleted and the data sink becomes the data source for the
49    next procedure. */
50
51 /* Procedure execution data. */
52 struct write_case_data
53   {
54     /* Function to call for each case. */
55     bool (*case_func) (const struct ccase *, void *);
56     void *aux;
57
58     struct ccase trns_case;     /* Case used for transformations. */
59     struct ccase sink_case;     /* Case written to sink, if
60                                    compaction is necessary. */
61     size_t cases_written;       /* Cases output so far. */
62   };
63
64 /* Cases are read from vfm_source,
65    pass through permanent_trns_chain (which transforms them into
66    the format described by permanent_dict),
67    are written to vfm_sink,
68    pass through temporary_trns_chain (which transforms them into
69    the format described by default_dict),
70    and are finally passed to the procedure. */
71 static struct case_source *vfm_source;
72 static struct trns_chain *permanent_trns_chain;
73 static struct dictionary *permanent_dict;
74 static struct case_sink *vfm_sink;
75 static struct trns_chain *temporary_trns_chain;
76 struct dictionary *default_dict;
77
78 /* The transformation chain that the next transformation will be
79    added to. */
80 static struct trns_chain *cur_trns_chain;
81
82 /* The compactor used to compact a case, if necessary;
83    otherwise a null pointer. */
84 static struct dict_compactor *compactor;
85
86 /* Time at which vfm was last invoked. */
87 static time_t last_vfm_invocation;
88
89 /* Lag queue. */
90 int n_lag;                      /* Number of cases to lag. */
91 static int lag_count;           /* Number of cases in lag_queue so far. */
92 static int lag_head;            /* Index where next case will be added. */
93 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
94
95 static void add_case_limit_trns (void);
96 static void add_filter_trns (void);
97 static void add_process_if_trns (void);
98
99 static bool internal_procedure (bool (*case_func) (const struct ccase *,
100                                                    void *),
101                                 bool (*end_func) (void *),
102                                 void *aux);
103 static void update_last_vfm_invocation (void);
104 static void create_trns_case (struct ccase *, struct dictionary *);
105 static void open_active_file (void);
106 static bool write_case (struct write_case_data *wc_data);
107 static void lag_case (const struct ccase *c);
108 static void clear_case (struct ccase *c);
109 static bool close_active_file (void);
110 \f
111 /* Public functions. */
112
113 /* Returns the last time the data was read. */
114 time_t
115 time_of_last_procedure (void) 
116 {
117   if (last_vfm_invocation == 0)
118     update_last_vfm_invocation ();
119   return last_vfm_invocation;
120 }
121 \f
122 /* Regular procedure. */
123
124 /* Reads the data from the input program and writes it to a new
125    active file.  For each case we read from the input program, we
126    do the following:
127
128    1. Execute permanent transformations.  If these drop the case,
129       start the next case from step 1.
130
131    2. Write case to replacement active file.
132    
133    3. Execute temporary transformations.  If these drop the case,
134       start the next case from step 1.
135       
136    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
137
138    Returns true if successful, false if an I/O error occurred. */
139 bool
140 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
141 {
142   return internal_procedure (proc_func, NULL, aux);
143 }
144 \f
145 /* Multipass procedure. */
146
147 struct multipass_aux_data 
148   {
149     struct casefile *casefile;
150     
151     bool (*proc_func) (const struct casefile *, void *aux);
152     void *aux;
153   };
154
155 /* Case processing function for multipass_procedure(). */
156 static bool
157 multipass_case_func (const struct ccase *c, void *aux_data_) 
158 {
159   struct multipass_aux_data *aux_data = aux_data_;
160   return casefile_append (aux_data->casefile, c);
161 }
162
163 /* End-of-file function for multipass_procedure(). */
164 static bool
165 multipass_end_func (void *aux_data_) 
166 {
167   struct multipass_aux_data *aux_data = aux_data_;
168   return (aux_data->proc_func == NULL
169           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
170 }
171
172 /* Procedure that allows multiple passes over the input data.
173    The entire active file is passed to PROC_FUNC, with the given
174    AUX as auxiliary data, as a unit. */
175 bool
176 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
177                      void *aux) 
178 {
179   struct multipass_aux_data aux_data;
180   bool ok;
181
182   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
183   aux_data.proc_func = proc_func;
184   aux_data.aux = aux;
185
186   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
187   ok = !casefile_error (aux_data.casefile) && ok;
188
189   casefile_destroy (aux_data.casefile);
190
191   return ok;
192 }
193 \f
194 /* Procedure implementation. */
195
196 /* Executes a procedure.
197    Passes each case to CASE_FUNC.
198    Calls END_FUNC after the last case.
199    Returns true if successful, false if an I/O error occurred (or
200    if CASE_FUNC or END_FUNC ever returned false). */
201 static bool
202 internal_procedure (bool (*case_func) (const struct ccase *, void *),
203                     bool (*end_func) (void *),
204                     void *aux) 
205 {
206   struct write_case_data wc_data;
207   bool ok = true;
208
209   assert (vfm_source != NULL);
210
211   update_last_vfm_invocation ();
212
213   /* Optimize the trivial case where we're not going to do
214      anything with the data, by not reading the data at all. */
215   if (case_func == NULL && end_func == NULL
216       && case_source_is_class (vfm_source, &storage_source_class)
217       && vfm_sink == NULL
218       && (temporary_trns_chain == NULL
219           || trns_chain_is_empty (temporary_trns_chain))
220       && trns_chain_is_empty (permanent_trns_chain))
221     {
222       n_lag = 0;
223       expr_free (process_if_expr);
224       process_if_expr = NULL;
225       dict_set_case_limit (default_dict, 0);
226       dict_clear_vectors (default_dict);
227       return true;
228     }
229   
230   open_active_file ();
231   
232   wc_data.case_func = case_func;
233   wc_data.aux = aux;
234   create_trns_case (&wc_data.trns_case, default_dict);
235   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
236   wc_data.cases_written = 0;
237
238   ok = vfm_source->class->read (vfm_source,
239                                 &wc_data.trns_case,
240                                 write_case, &wc_data) && ok;
241   if (end_func != NULL)
242     ok = end_func (aux) && ok;
243
244   case_destroy (&wc_data.sink_case);
245   case_destroy (&wc_data.trns_case);
246
247   ok = close_active_file () && ok;
248
249   return ok;
250 }
251
252 /* Updates last_vfm_invocation. */
253 static void
254 update_last_vfm_invocation (void) 
255 {
256   last_vfm_invocation = time (NULL);
257 }
258
259 /* Creates and returns a case, initializing it from the vectors
260    that say which `value's need to be initialized just once, and
261    which ones need to be re-initialized before every case. */
262 static void
263 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
264 {
265   size_t var_cnt = dict_get_var_cnt (dict);
266   size_t i;
267
268   case_create (trns_case, dict_get_next_value_idx (dict));
269   for (i = 0; i < var_cnt; i++) 
270     {
271       struct variable *v = dict_get_var (dict, i);
272       union value *value = case_data_rw (trns_case, v->fv);
273
274       if (v->type == NUMERIC)
275         value->f = v->leave ? 0.0 : SYSMIS;
276       else
277         memset (value->s, ' ', v->width);
278     }
279 }
280
281 /* Makes all preparations for reading from the data source and writing
282    to the data sink. */
283 static void
284 open_active_file (void)
285 {
286   add_case_limit_trns ();
287   add_filter_trns ();
288   add_process_if_trns ();
289
290   /* Finalize transformations. */
291   trns_chain_finalize (cur_trns_chain);
292
293   /* Make permanent_dict refer to the dictionary right before
294      data reaches the sink. */
295   if (permanent_dict == NULL)
296     permanent_dict = default_dict;
297
298   /* Figure out compaction. */
299   compactor = (dict_needs_compaction (permanent_dict)
300                ? dict_make_compactor (permanent_dict)
301                : NULL);
302
303   /* Prepare sink. */
304   if (vfm_sink == NULL)
305     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
306   if (vfm_sink->class->open != NULL)
307     vfm_sink->class->open (vfm_sink);
308
309   /* Allocate memory for lag queue. */
310   if (n_lag > 0)
311     {
312       int i;
313   
314       lag_count = 0;
315       lag_head = 0;
316       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
317       for (i = 0; i < n_lag; i++)
318         case_nullify (&lag_queue[i]);
319     }
320 }
321
322 /* Transforms trns_case and writes it to the replacement active
323    file if advisable.  Returns true if more cases can be
324    accepted, false otherwise.  Do not call this function again
325    after it has returned false once.  */
326 static bool
327 write_case (struct write_case_data *wc_data)
328 {
329   enum trns_result retval;
330   size_t case_nr;
331   
332   /* Execute permanent transformations.  */
333   case_nr = wc_data->cases_written + 1;
334   retval = trns_chain_execute (permanent_trns_chain,
335                                &wc_data->trns_case, &case_nr);
336   if (retval != TRNS_CONTINUE)
337     goto done;
338
339   /* Write case to LAG queue. */
340   if (n_lag)
341     lag_case (&wc_data->trns_case);
342
343   /* Write case to replacement active file. */
344   wc_data->cases_written++;
345   if (vfm_sink->class->write != NULL) 
346     {
347       if (compactor != NULL) 
348         {
349           dict_compactor_compact (compactor, &wc_data->sink_case,
350                                   &wc_data->trns_case);
351           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
352         }
353       else
354         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
355     }
356   
357   /* Execute temporary transformations. */
358   if (temporary_trns_chain != NULL) 
359     {
360       retval = trns_chain_execute (temporary_trns_chain,
361                                    &wc_data->trns_case,
362                                    &wc_data->cases_written);
363       if (retval != TRNS_CONTINUE)
364         goto done;
365     }
366
367   /* Pass case to procedure. */
368   if (wc_data->case_func != NULL)
369     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
370       retval = TRNS_ERROR;
371
372  done:
373   clear_case (&wc_data->trns_case);
374   return retval != TRNS_ERROR;
375 }
376
377 /* Add C to the lag queue. */
378 static void
379 lag_case (const struct ccase *c)
380 {
381   if (lag_count < n_lag)
382     lag_count++;
383   case_destroy (&lag_queue[lag_head]);
384   case_clone (&lag_queue[lag_head], c);
385   if (++lag_head >= n_lag)
386     lag_head = 0;
387 }
388
389 /* Clears the variables in C that need to be cleared between
390    processing cases.  */
391 static void
392 clear_case (struct ccase *c)
393 {
394   size_t var_cnt = dict_get_var_cnt (default_dict);
395   size_t i;
396   
397   for (i = 0; i < var_cnt; i++) 
398     {
399       struct variable *v = dict_get_var (default_dict, i);
400       if (!v->leave) 
401         {
402           if (v->type == NUMERIC)
403             case_data_rw (c, v->fv)->f = SYSMIS;
404           else
405             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
406         } 
407     }
408 }
409
410 /* Closes the active file. */
411 static bool
412 close_active_file (void)
413 {
414   /* Free memory for lag queue, and turn off lagging. */
415   if (n_lag > 0)
416     {
417       int i;
418       
419       for (i = 0; i < n_lag; i++)
420         case_destroy (&lag_queue[i]);
421       free (lag_queue);
422       n_lag = 0;
423     }
424   
425   /* Dictionary from before TEMPORARY becomes permanent. */
426   proc_cancel_temporary_transformations ();
427
428   /* Finish compaction. */
429   if (compactor != NULL) 
430     {
431       dict_compactor_destroy (compactor);
432       dict_compact_values (default_dict);
433       compactor = NULL;
434     }
435     
436   /* Free data source. */
437   free_case_source (vfm_source);
438   vfm_source = NULL;
439
440   /* Old data sink becomes new data source. */
441   if (vfm_sink->class->make_source != NULL)
442     vfm_source = vfm_sink->class->make_source (vfm_sink);
443   free_case_sink (vfm_sink);
444   vfm_sink = NULL;
445
446   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
447      and get rid of all the transformations. */
448   dict_clear_vectors (default_dict);
449   permanent_dict = NULL;
450   return proc_cancel_all_transformations ();
451 }
452 \f
453 /* Returns a pointer to the lagged case from N_BEFORE cases before the
454    current one, or NULL if there haven't been that many cases yet. */
455 struct ccase *
456 lagged_case (int n_before)
457 {
458   assert (n_before >= 1 );
459   assert (n_before <= n_lag);
460
461   if (n_before <= lag_count)
462     {
463       int index = lag_head - n_before;
464       if (index < 0)
465         index += n_lag;
466       return &lag_queue[index];
467     }
468   else
469     return NULL;
470 }
471 \f
472 /* Procedure that separates the data into SPLIT FILE groups. */
473
474 /* Represents auxiliary data for handling SPLIT FILE. */
475 struct split_aux_data 
476   {
477     size_t case_count;          /* Number of cases so far. */
478     struct ccase prev_case;     /* Data in previous case. */
479
480     /* Callback functions. */
481     void (*begin_func) (const struct ccase *, void *);
482     bool (*proc_func) (const struct ccase *, void *);
483     void (*end_func) (void *);
484     void *func_aux;
485   };
486
487 static int equal_splits (const struct ccase *, const struct ccase *);
488 static bool split_procedure_case_func (const struct ccase *c, void *);
489 static bool split_procedure_end_func (void *);
490
491 /* Like procedure(), but it automatically breaks the case stream
492    into SPLIT FILE break groups.  Before each group of cases with
493    identical SPLIT FILE variable values, BEGIN_FUNC is called
494    with the first case in the group.
495    Then PROC_FUNC is called for each case in the group (including
496    the first).
497    END_FUNC is called when the group is finished.  FUNC_AUX is
498    passed to each of the functions as auxiliary data.
499
500    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
501    and END_FUNC will be called at all. 
502
503    If SPLIT FILE is not in effect, then there is one break group
504    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
505    will be called once.
506    
507    Returns true if successful, false if an I/O error occurred. */
508 bool
509 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
510                        bool (*proc_func) (const struct ccase *, void *aux),
511                        void (*end_func) (void *aux),
512                        void *func_aux) 
513 {
514   struct split_aux_data split_aux;
515   bool ok;
516
517   split_aux.case_count = 0;
518   case_nullify (&split_aux.prev_case);
519   split_aux.begin_func = begin_func;
520   split_aux.proc_func = proc_func;
521   split_aux.end_func = end_func;
522   split_aux.func_aux = func_aux;
523
524   ok = internal_procedure (split_procedure_case_func,
525                            split_procedure_end_func, &split_aux);
526
527   case_destroy (&split_aux.prev_case);
528
529   return ok;
530 }
531
532 /* Case callback used by procedure_with_splits(). */
533 static bool
534 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
535 {
536   struct split_aux_data *split_aux = split_aux_;
537
538   /* Start a new series if needed. */
539   if (split_aux->case_count == 0
540       || !equal_splits (c, &split_aux->prev_case))
541     {
542       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
543         split_aux->end_func (split_aux->func_aux);
544
545       case_destroy (&split_aux->prev_case);
546       case_clone (&split_aux->prev_case, c);
547
548       if (split_aux->begin_func != NULL)
549         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
550     }
551
552   split_aux->case_count++;
553   return (split_aux->proc_func == NULL
554           || split_aux->proc_func (c, split_aux->func_aux));
555 }
556
557 /* End-of-file callback used by procedure_with_splits(). */
558 static bool
559 split_procedure_end_func (void *split_aux_) 
560 {
561   struct split_aux_data *split_aux = split_aux_;
562
563   if (split_aux->case_count > 0 && split_aux->end_func != NULL)
564     split_aux->end_func (split_aux->func_aux);
565   return true;
566 }
567
568 /* Compares the SPLIT FILE variables in cases A and B and returns
569    nonzero only if they differ. */
570 static int
571 equal_splits (const struct ccase *a, const struct ccase *b) 
572 {
573   return case_compare (a, b,
574                        dict_get_split_vars (default_dict),
575                        dict_get_split_cnt (default_dict)) == 0;
576 }
577 \f
578 /* Multipass procedure that separates the data into SPLIT FILE
579    groups. */
580
581 /* Represents auxiliary data for handling SPLIT FILE in a
582    multipass procedure. */
583 struct multipass_split_aux_data 
584   {
585     struct ccase prev_case;     /* Data in previous case. */
586     struct casefile *casefile;  /* Accumulates data for a split. */
587
588     /* Function to call with the accumulated data. */
589     bool (*split_func) (const struct ccase *first, const struct casefile *,
590                         void *);
591     void *func_aux;                            /* Auxiliary data. */ 
592   };
593
594 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
595 static bool multipass_split_end_func (void *aux_);
596 static bool multipass_split_output (struct multipass_split_aux_data *);
597
598 /* Returns true if successful, false if an I/O error occurred. */
599 bool
600 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
601                                                      const struct casefile *,
602                                                      void *aux),
603                                  void *func_aux)
604 {
605   struct multipass_split_aux_data aux;
606   bool ok;
607
608   case_nullify (&aux.prev_case);
609   aux.casefile = NULL;
610   aux.split_func = split_func;
611   aux.func_aux = func_aux;
612
613   ok = internal_procedure (multipass_split_case_func,
614                            multipass_split_end_func, &aux);
615   case_destroy (&aux.prev_case);
616
617   return ok;
618 }
619
620 /* Case callback used by multipass_procedure_with_splits(). */
621 static bool
622 multipass_split_case_func (const struct ccase *c, void *aux_)
623 {
624   struct multipass_split_aux_data *aux = aux_;
625   bool ok = true;
626
627   /* Start a new series if needed. */
628   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
629     {
630       /* Record split values. */
631       case_destroy (&aux->prev_case);
632       case_clone (&aux->prev_case, c);
633
634       /* Pass any cases to split_func. */
635       if (aux->casefile != NULL)
636         ok = multipass_split_output (aux);
637
638       /* Start a new casefile. */
639       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
640     }
641
642   return casefile_append (aux->casefile, c) && ok;
643 }
644
645 /* End-of-file callback used by multipass_procedure_with_splits(). */
646 static bool
647 multipass_split_end_func (void *aux_)
648 {
649   struct multipass_split_aux_data *aux = aux_;
650   return (aux->casefile == NULL || multipass_split_output (aux));
651 }
652
653 static bool
654 multipass_split_output (struct multipass_split_aux_data *aux)
655 {
656   bool ok;
657   
658   assert (aux->casefile != NULL);
659   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
660   casefile_destroy (aux->casefile);
661   aux->casefile = NULL;
662
663   return ok;
664 }
665 \f
666 /* Discards all the current state in preparation for a data-input
667    command like DATA LIST or GET. */
668 void
669 discard_variables (void)
670 {
671   dict_clear (default_dict);
672   fh_set_default_handle (NULL);
673
674   n_lag = 0;
675   
676   free_case_source (vfm_source);
677   vfm_source = NULL;
678
679   proc_cancel_all_transformations ();
680
681   expr_free (process_if_expr);
682   process_if_expr = NULL;
683
684   proc_cancel_temporary_transformations ();
685 }
686 \f
687 /* Returns the current set of permanent transformations,
688    and clears the permanent transformations.
689    For use by INPUT PROGRAM. */
690 struct trns_chain *
691 proc_capture_transformations (void) 
692 {
693   struct trns_chain *chain;
694   
695   assert (temporary_trns_chain == NULL);
696   chain = permanent_trns_chain;
697   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
698   return chain;
699 }
700
701 /* Adds a transformation that processes a case with PROC and
702    frees itself with FREE to the current set of transformations.
703    The functions are passed AUX as auxiliary data. */
704 void
705 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
706 {
707   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
708 }
709
710 /* Adds a transformation that processes a case with PROC and
711    frees itself with FREE to the current set of transformations.
712    When parsing of the block of transformations is complete,
713    FINALIZE will be called.
714    The functions are passed AUX as auxiliary data. */
715 void
716 add_transformation_with_finalizer (trns_finalize_func *finalize,
717                                    trns_proc_func *proc,
718                                    trns_free_func *free, void *aux)
719 {
720   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
721 }
722
723 /* Returns the index of the next transformation.
724    This value can be returned by a transformation procedure
725    function to indicate a "jump" to that transformation. */
726 size_t
727 next_transformation (void) 
728 {
729   return trns_chain_next (cur_trns_chain);
730 }
731
732 /* Returns true if the next call to add_transformation() will add
733    a temporary transformation, false if it will add a permanent
734    transformation. */
735 bool
736 proc_in_temporary_transformations (void) 
737 {
738   return temporary_trns_chain != NULL;
739 }
740
741 /* Marks the start of temporary transformations.
742    Further calls to add_transformation() will add temporary
743    transformations. */
744 void
745 proc_start_temporary_transformations (void) 
746 {
747   if (!proc_in_temporary_transformations ())
748     {
749       add_case_limit_trns ();
750
751       permanent_dict = dict_clone (default_dict);
752       trns_chain_finalize (permanent_trns_chain);
753       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
754     }
755 }
756
757 /* Converts all the temporary transformations, if any, to
758    permanent transformations.  Further transformations will be
759    permanent.
760    Returns true if anything changed, false otherwise. */
761 bool
762 proc_make_temporary_transformations_permanent (void) 
763 {
764   if (proc_in_temporary_transformations ()) 
765     {
766       trns_chain_finalize (temporary_trns_chain);
767       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
768       temporary_trns_chain = NULL;
769
770       dict_destroy (permanent_dict);
771       permanent_dict = NULL;
772
773       return true;
774     }
775   else
776     return false;
777 }
778
779 /* Cancels all temporary transformations, if any.  Further
780    transformations will be permanent.
781    Returns true if anything changed, false otherwise. */
782 bool
783 proc_cancel_temporary_transformations (void) 
784 {
785   if (proc_in_temporary_transformations ()) 
786     {
787       dict_destroy (default_dict);
788       default_dict = permanent_dict;
789       permanent_dict = NULL;
790
791       trns_chain_destroy (temporary_trns_chain);
792       temporary_trns_chain = NULL;
793
794       return true;
795     }
796   else
797     return false;
798 }
799
800 /* Cancels all transformations, if any.
801    Returns true if successful, false on I/O error. */
802 bool
803 proc_cancel_all_transformations (void)
804 {
805   bool ok;
806   ok = trns_chain_destroy (permanent_trns_chain);
807   ok = trns_chain_destroy (temporary_trns_chain) && ok;
808   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
809   temporary_trns_chain = NULL;
810   return ok;
811 }
812 \f
813 /* Initializes procedure handling. */
814 void
815 proc_init (void) 
816 {
817   default_dict = dict_create ();
818   proc_cancel_all_transformations ();
819 }
820
821 /* Finishes up procedure handling. */
822 void
823 proc_done (void)
824 {
825   discard_variables ();
826 }
827
828 /* Sets SINK as the destination for procedure output from the
829    next procedure. */
830 void
831 proc_set_sink (struct case_sink *sink) 
832 {
833   assert (vfm_sink == NULL);
834   vfm_sink = sink;
835 }
836
837 /* Sets SOURCE as the source for procedure input for the next
838    procedure. */
839 void
840 proc_set_source (struct case_source *source) 
841 {
842   assert (vfm_source == NULL);
843   vfm_source = source;
844 }
845
846 /* Returns true if a source for the next procedure has been
847    configured, false otherwise. */
848 bool
849 proc_has_source (void) 
850 {
851   return vfm_source != NULL;
852 }
853
854 /* Returns the output from the previous procedure.
855    For use only immediately after executing a procedure.
856    The returned casefile is owned by the caller; it will not be
857    automatically used for the next procedure's input. */
858 struct casefile *
859 proc_capture_output (void) 
860 {
861   struct casefile *casefile;
862
863   /* Try to make sure that this function is called immediately
864      after procedure() or a similar function. */
865   assert (vfm_source != NULL);
866   assert (case_source_is_class (vfm_source, &storage_source_class));
867   assert (trns_chain_is_empty (permanent_trns_chain));
868   assert (!proc_in_temporary_transformations ());
869
870   casefile = storage_source_decapsulate (vfm_source);
871   vfm_source = NULL;
872
873   return casefile;
874 }
875 \f
876 static trns_proc_func case_limit_trns_proc;
877 static trns_free_func case_limit_trns_free;
878
879 /* Adds a transformation that limits the number of cases that may
880    pass through, if default_dict has a case limit. */
881 static void
882 add_case_limit_trns (void) 
883 {
884   size_t case_limit = dict_get_case_limit (default_dict);
885   if (case_limit != 0)
886     {
887       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
888       *cases_remaining = case_limit;
889       add_transformation (case_limit_trns_proc, case_limit_trns_free,
890                           cases_remaining);
891       dict_set_case_limit (default_dict, 0);
892     }
893 }
894
895 /* Limits the maximum number of cases processed to
896    *CASES_REMAINING. */
897 static int
898 case_limit_trns_proc (void *cases_remaining_,
899                       struct ccase *c UNUSED, int case_nr UNUSED) 
900 {
901   size_t *cases_remaining = cases_remaining_;
902   if (*cases_remaining > 0) 
903     {
904       *cases_remaining--;
905       return TRNS_CONTINUE;
906     }
907   else
908     return TRNS_DROP_CASE;
909 }
910
911 /* Frees the data associated with a case limit transformation. */
912 static bool
913 case_limit_trns_free (void *cases_remaining_) 
914 {
915   size_t *cases_remaining = cases_remaining_;
916   free (cases_remaining);
917   return true;
918 }
919 \f
920 static trns_proc_func filter_trns_proc;
921
922 /* Adds a temporary transformation to filter data according to
923    the variable specified on FILTER, if any. */
924 static void
925 add_filter_trns (void) 
926 {
927   struct variable *filter_var = dict_get_filter (default_dict);
928   if (filter_var != NULL) 
929     {
930       proc_start_temporary_transformations ();
931       add_transformation (filter_trns_proc, NULL, filter_var);
932     }
933 }
934
935 /* FILTER transformation. */
936 static int
937 filter_trns_proc (void *filter_var_,
938                   struct ccase *c UNUSED, int case_nr UNUSED) 
939   
940 {
941   struct variable *filter_var = filter_var_;
942   double f = case_num (c, filter_var->fv);
943   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
944           ? TRNS_CONTINUE : TRNS_DROP_CASE);
945 }
946 \f
947 static trns_proc_func process_if_trns_proc;
948 static trns_free_func process_if_trns_free;
949
950 /* Adds a temporary transformation to filter data according to
951    the expression specified on PROCESS IF, if any. */
952 static void
953 add_process_if_trns (void) 
954 {
955   if (process_if_expr != NULL) 
956     {
957       proc_start_temporary_transformations ();
958       add_transformation (process_if_trns_proc, process_if_trns_free,
959                           process_if_expr);
960       process_if_expr = NULL;
961     }
962 }
963
964 /* PROCESS IF transformation. */
965 static int
966 process_if_trns_proc (void *expression_,
967                       struct ccase *c UNUSED, int case_nr UNUSED) 
968   
969 {
970   struct expression *expression = expression_;
971   return (expr_evaluate_num (expression, c, case_nr) == 1.0
972           ? TRNS_CONTINUE : TRNS_DROP_CASE);
973 }
974
975 /* Frees a PROCESS IF transformation. */
976 static bool
977 process_if_trns_free (void *expression_) 
978 {
979   struct expression *expression = expression_;
980   expr_free (expression);
981   return true;
982 }