Continue reforming procedure execution. In this phase, get rid of the
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 /*
43    Virtual File Manager (vfm):
44
45    vfm is used to process data files.  It uses the model that
46    data is read from one stream (the data source), processed,
47    then written to another (the data sink).  The data source is
48    then deleted and the data sink becomes the data source for the
49    next procedure. */
50
51 /* Procedure execution data. */
52 struct write_case_data
53   {
54     /* Function to call for each case. */
55     bool (*case_func) (struct ccase *, void *); /* Function. */
56     void *aux;                                 /* Auxiliary data. */ 
57
58     struct ccase trns_case;     /* Case used for transformations. */
59     struct ccase sink_case;     /* Case written to sink, if
60                                    compaction is necessary. */
61     size_t cases_written;       /* Cases output so far. */
62   };
63
64 /* Cases are read from vfm_source,
65    pass through permanent_trns_chain (which transforms them into
66    the format described by permanent_dict),
67    are written to vfm_sink,
68    pass through temporary_trns_chain (which transforms them into
69    the format described by default_dict),
70    and are finally passed to the procedure. */
71 static struct case_source *vfm_source;
72 static struct trns_chain *permanent_trns_chain;
73 static struct dictionary *permanent_dict;
74 static struct case_sink *vfm_sink;
75 static struct trns_chain *temporary_trns_chain;
76 struct dictionary *default_dict;
77
78 /* The transformation chain that the next transformation will be
79    added to. */
80 static struct trns_chain *cur_trns_chain;
81
82 /* The compactor used to compact a case, if necessary;
83    otherwise a null pointer. */
84 static struct dict_compactor *compactor;
85
86 /* Time at which vfm was last invoked. */
87 static time_t last_vfm_invocation;
88
89 /* Lag queue. */
90 int n_lag;                      /* Number of cases to lag. */
91 static int lag_count;           /* Number of cases in lag_queue so far. */
92 static int lag_head;            /* Index where next case will be added. */
93 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
94
95 static void add_case_limit_trns (void);
96 static void add_filter_trns (void);
97 static void add_process_if_trns (void);
98
99 static bool internal_procedure (bool (*case_func) (struct ccase *, void *),
100                                 bool (*end_func) (void *),
101                                 void *aux);
102 static void update_last_vfm_invocation (void);
103 static void create_trns_case (struct ccase *, struct dictionary *);
104 static void open_active_file (void);
105 static bool write_case (struct write_case_data *wc_data);
106 static void lag_case (const struct ccase *c);
107 static void clear_case (struct ccase *c);
108 static bool close_active_file (void);
109 \f
110 /* Public functions. */
111
112 /* Returns the last time the data was read. */
113 time_t
114 time_of_last_procedure (void) 
115 {
116   if (last_vfm_invocation == 0)
117     update_last_vfm_invocation ();
118   return last_vfm_invocation;
119 }
120 \f
121 /* Regular procedure. */
122
123 /* Reads the data from the input program and writes it to a new
124    active file.  For each case we read from the input program, we
125    do the following:
126
127    1. Execute permanent transformations.  If these drop the case,
128       start the next case from step 1.
129
130    2. Write case to replacement active file.
131    
132    3. Execute temporary transformations.  If these drop the case,
133       start the next case from step 1.
134       
135    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
136
137    Returns true if successful, false if an I/O error occurred. */
138 bool
139 procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
140 {
141   return internal_procedure (proc_func, NULL, aux);
142 }
143 \f
144 /* Multipass procedure. */
145
146 struct multipass_aux_data 
147   {
148     struct casefile *casefile;
149     
150     bool (*proc_func) (const struct casefile *, void *aux);
151     void *aux;
152   };
153
154 /* Case processing function for multipass_procedure(). */
155 static bool
156 multipass_case_func (struct ccase *c, void *aux_data_) 
157 {
158   struct multipass_aux_data *aux_data = aux_data_;
159   return casefile_append (aux_data->casefile, c);
160 }
161
162 /* End-of-file function for multipass_procedure(). */
163 static bool
164 multipass_end_func (void *aux_data_) 
165 {
166   struct multipass_aux_data *aux_data = aux_data_;
167   return (aux_data->proc_func == NULL
168           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
169 }
170
171 /* Procedure that allows multiple passes over the input data.
172    The entire active file is passed to PROC_FUNC, with the given
173    AUX as auxiliary data, as a unit. */
174 bool
175 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
176                      void *aux) 
177 {
178   struct multipass_aux_data aux_data;
179   bool ok;
180
181   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
182   aux_data.proc_func = proc_func;
183   aux_data.aux = aux;
184
185   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
186   ok = !casefile_error (aux_data.casefile) && ok;
187
188   casefile_destroy (aux_data.casefile);
189
190   return ok;
191 }
192 \f
193 /* Procedure implementation. */
194
195 /* Executes a procedure.
196    Passes each case to CASE_FUNC.
197    Calls END_FUNC after the last case.
198    Returns true if successful, false if an I/O error occurred (or
199    if CASE_FUNC or END_FUNC ever returned false). */
200 static bool
201 internal_procedure (bool (*case_func) (struct ccase *, void *),
202                     bool (*end_func) (void *),
203                     void *aux) 
204 {
205   struct write_case_data wc_data;
206   bool ok = true;
207
208   assert (vfm_source != NULL);
209
210   update_last_vfm_invocation ();
211
212   /* Optimize the trivial case where we're not going to do
213      anything with the data, by not reading the data at all. */
214   if (case_func == NULL && end_func == NULL
215       && case_source_is_class (vfm_source, &storage_source_class)
216       && vfm_sink == NULL
217       && (temporary_trns_chain == NULL
218           || trns_chain_is_empty (temporary_trns_chain))
219       && trns_chain_is_empty (permanent_trns_chain))
220     {
221       n_lag = 0;
222       expr_free (process_if_expr);
223       process_if_expr = NULL;
224       dict_set_case_limit (default_dict, 0);
225       dict_clear_vectors (default_dict);
226       return true;
227     }
228   
229   open_active_file ();
230   
231   wc_data.case_func = case_func;
232   wc_data.aux = aux;
233   create_trns_case (&wc_data.trns_case, default_dict);
234   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
235   wc_data.cases_written = 0;
236
237   ok = vfm_source->class->read (vfm_source,
238                                 &wc_data.trns_case,
239                                 write_case, &wc_data) && ok;
240   if (end_func != NULL)
241     ok = end_func (aux) && ok;
242
243   case_destroy (&wc_data.sink_case);
244   case_destroy (&wc_data.trns_case);
245
246   ok = close_active_file () && ok;
247
248   return ok;
249 }
250
251 /* Updates last_vfm_invocation. */
252 static void
253 update_last_vfm_invocation (void) 
254 {
255   last_vfm_invocation = time (NULL);
256 }
257
258 /* Creates and returns a case, initializing it from the vectors
259    that say which `value's need to be initialized just once, and
260    which ones need to be re-initialized before every case. */
261 static void
262 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
263 {
264   size_t var_cnt = dict_get_var_cnt (dict);
265   size_t i;
266
267   case_create (trns_case, dict_get_next_value_idx (dict));
268   for (i = 0; i < var_cnt; i++) 
269     {
270       struct variable *v = dict_get_var (dict, i);
271       union value *value = case_data_rw (trns_case, v->fv);
272
273       if (v->type == NUMERIC)
274         value->f = v->leave ? 0.0 : SYSMIS;
275       else
276         memset (value->s, ' ', v->width);
277     }
278 }
279
280 /* Makes all preparations for reading from the data source and writing
281    to the data sink. */
282 static void
283 open_active_file (void)
284 {
285   add_case_limit_trns ();
286   add_filter_trns ();
287   add_process_if_trns ();
288
289   /* Finalize transformations. */
290   trns_chain_finalize (cur_trns_chain);
291
292   /* Make permanent_dict refer to the dictionary right before
293      data reaches the sink. */
294   if (permanent_dict == NULL)
295     permanent_dict = default_dict;
296
297   /* Figure out compaction. */
298   compactor = (dict_needs_compaction (permanent_dict)
299                ? dict_make_compactor (permanent_dict)
300                : NULL);
301
302   /* Prepare sink. */
303   if (vfm_sink == NULL)
304     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
305   if (vfm_sink->class->open != NULL)
306     vfm_sink->class->open (vfm_sink);
307
308   /* Allocate memory for lag queue. */
309   if (n_lag > 0)
310     {
311       int i;
312   
313       lag_count = 0;
314       lag_head = 0;
315       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
316       for (i = 0; i < n_lag; i++)
317         case_nullify (&lag_queue[i]);
318     }
319 }
320
321 /* Transforms trns_case and writes it to the replacement active
322    file if advisable.  Returns true if more cases can be
323    accepted, false otherwise.  Do not call this function again
324    after it has returned false once.  */
325 static bool
326 write_case (struct write_case_data *wc_data)
327 {
328   enum trns_result retval;
329   size_t case_nr;
330   
331   /* Execute permanent transformations.  */
332   case_nr = wc_data->cases_written + 1;
333   retval = trns_chain_execute (permanent_trns_chain,
334                                &wc_data->trns_case, &case_nr);
335   if (retval != TRNS_CONTINUE)
336     goto done;
337
338   /* Write case to LAG queue. */
339   if (n_lag)
340     lag_case (&wc_data->trns_case);
341
342   /* Write case to replacement active file. */
343   wc_data->cases_written++;
344   if (vfm_sink->class->write != NULL) 
345     {
346       if (compactor != NULL) 
347         {
348           dict_compactor_compact (compactor, &wc_data->sink_case,
349                                   &wc_data->trns_case);
350           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
351         }
352       else
353         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
354     }
355   
356   /* Execute temporary transformations. */
357   if (temporary_trns_chain != NULL) 
358     {
359       retval = trns_chain_execute (temporary_trns_chain,
360                                    &wc_data->trns_case,
361                                    &wc_data->cases_written);
362       if (retval != TRNS_CONTINUE)
363         goto done;
364     }
365
366   /* Pass case to procedure. */
367   if (wc_data->case_func != NULL)
368     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
369       retval = TRNS_ERROR;
370
371  done:
372   clear_case (&wc_data->trns_case);
373   return retval != TRNS_ERROR;
374 }
375
376 /* Add C to the lag queue. */
377 static void
378 lag_case (const struct ccase *c)
379 {
380   if (lag_count < n_lag)
381     lag_count++;
382   case_destroy (&lag_queue[lag_head]);
383   case_clone (&lag_queue[lag_head], c);
384   if (++lag_head >= n_lag)
385     lag_head = 0;
386 }
387
388 /* Clears the variables in C that need to be cleared between
389    processing cases.  */
390 static void
391 clear_case (struct ccase *c)
392 {
393   size_t var_cnt = dict_get_var_cnt (default_dict);
394   size_t i;
395   
396   for (i = 0; i < var_cnt; i++) 
397     {
398       struct variable *v = dict_get_var (default_dict, i);
399       if (!v->leave) 
400         {
401           if (v->type == NUMERIC)
402             case_data_rw (c, v->fv)->f = SYSMIS;
403           else
404             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
405         } 
406     }
407 }
408
409 /* Closes the active file. */
410 static bool
411 close_active_file (void)
412 {
413   /* Free memory for lag queue, and turn off lagging. */
414   if (n_lag > 0)
415     {
416       int i;
417       
418       for (i = 0; i < n_lag; i++)
419         case_destroy (&lag_queue[i]);
420       free (lag_queue);
421       n_lag = 0;
422     }
423   
424   /* Dictionary from before TEMPORARY becomes permanent. */
425   proc_cancel_temporary_transformations ();
426
427   /* Finish compaction. */
428   if (compactor != NULL) 
429     {
430       dict_compactor_destroy (compactor);
431       dict_compact_values (default_dict);
432       compactor = NULL;
433     }
434     
435   /* Free data source. */
436   free_case_source (vfm_source);
437   vfm_source = NULL;
438
439   /* Old data sink becomes new data source. */
440   if (vfm_sink->class->make_source != NULL)
441     vfm_source = vfm_sink->class->make_source (vfm_sink);
442   free_case_sink (vfm_sink);
443   vfm_sink = NULL;
444
445   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
446      and get rid of all the transformations. */
447   dict_clear_vectors (default_dict);
448   permanent_dict = NULL;
449   return proc_cancel_all_transformations ();
450 }
451 \f
452 /* Returns a pointer to the lagged case from N_BEFORE cases before the
453    current one, or NULL if there haven't been that many cases yet. */
454 struct ccase *
455 lagged_case (int n_before)
456 {
457   assert (n_before >= 1 );
458   assert (n_before <= n_lag);
459
460   if (n_before <= lag_count)
461     {
462       int index = lag_head - n_before;
463       if (index < 0)
464         index += n_lag;
465       return &lag_queue[index];
466     }
467   else
468     return NULL;
469 }
470 \f
471 /* Procedure that separates the data into SPLIT FILE groups. */
472
473 /* Represents auxiliary data for handling SPLIT FILE. */
474 struct split_aux_data 
475   {
476     size_t case_count;          /* Number of cases so far. */
477     struct ccase prev_case;     /* Data in previous case. */
478
479     /* Callback functions. */
480     void (*begin_func) (const struct ccase *, void *);
481     bool (*proc_func) (const struct ccase *, void *);
482     void (*end_func) (void *);
483     void *func_aux;
484   };
485
486 static int equal_splits (const struct ccase *, const struct ccase *);
487 static bool split_procedure_case_func (struct ccase *c, void *split_aux_);
488 static bool split_procedure_end_func (void *split_aux_);
489
490 /* Like procedure(), but it automatically breaks the case stream
491    into SPLIT FILE break groups.  Before each group of cases with
492    identical SPLIT FILE variable values, BEGIN_FUNC is called
493    with the first case in the group.
494    Then PROC_FUNC is called for each case in the group (including
495    the first).
496    END_FUNC is called when the group is finished.  FUNC_AUX is
497    passed to each of the functions as auxiliary data.
498
499    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
500    and END_FUNC will be called at all. 
501
502    If SPLIT FILE is not in effect, then there is one break group
503    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
504    will be called once.
505    
506    Returns true if successful, false if an I/O error occurred. */
507 bool
508 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
509                        bool (*proc_func) (const struct ccase *, void *aux),
510                        void (*end_func) (void *aux),
511                        void *func_aux) 
512 {
513   struct split_aux_data split_aux;
514   bool ok;
515
516   split_aux.case_count = 0;
517   case_nullify (&split_aux.prev_case);
518   split_aux.begin_func = begin_func;
519   split_aux.proc_func = proc_func;
520   split_aux.end_func = end_func;
521   split_aux.func_aux = func_aux;
522
523   ok = internal_procedure (split_procedure_case_func,
524                            split_procedure_end_func, &split_aux);
525
526   case_destroy (&split_aux.prev_case);
527
528   return ok;
529 }
530
531 /* Case callback used by procedure_with_splits(). */
532 static bool
533 split_procedure_case_func (struct ccase *c, void *split_aux_) 
534 {
535   struct split_aux_data *split_aux = split_aux_;
536
537   /* Start a new series if needed. */
538   if (split_aux->case_count == 0
539       || !equal_splits (c, &split_aux->prev_case))
540     {
541       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
542         split_aux->end_func (split_aux->func_aux);
543
544       case_destroy (&split_aux->prev_case);
545       case_clone (&split_aux->prev_case, c);
546
547       if (split_aux->begin_func != NULL)
548         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
549     }
550
551   split_aux->case_count++;
552   return (split_aux->proc_func == NULL
553           || split_aux->proc_func (c, split_aux->func_aux));
554 }
555
556 /* End-of-file callback used by procedure_with_splits(). */
557 static bool
558 split_procedure_end_func (void *split_aux_) 
559 {
560   struct split_aux_data *split_aux = split_aux_;
561
562   if (split_aux->case_count > 0 && split_aux->end_func != NULL)
563     split_aux->end_func (split_aux->func_aux);
564   return true;
565 }
566
567 /* Compares the SPLIT FILE variables in cases A and B and returns
568    nonzero only if they differ. */
569 static int
570 equal_splits (const struct ccase *a, const struct ccase *b) 
571 {
572   return case_compare (a, b,
573                        dict_get_split_vars (default_dict),
574                        dict_get_split_cnt (default_dict)) == 0;
575 }
576 \f
577 /* Multipass procedure that separates the data into SPLIT FILE
578    groups. */
579
580 /* Represents auxiliary data for handling SPLIT FILE in a
581    multipass procedure. */
582 struct multipass_split_aux_data 
583   {
584     struct ccase prev_case;     /* Data in previous case. */
585     struct casefile *casefile;  /* Accumulates data for a split. */
586
587     /* Function to call with the accumulated data. */
588     bool (*split_func) (const struct ccase *first, const struct casefile *,
589                         void *);
590     void *func_aux;                            /* Auxiliary data. */ 
591   };
592
593 static bool multipass_split_case_func (struct ccase *c, void *aux_);
594 static bool multipass_split_end_func (void *aux_);
595 static bool multipass_split_output (struct multipass_split_aux_data *);
596
597 /* Returns true if successful, false if an I/O error occurred. */
598 bool
599 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
600                                                      const struct casefile *,
601                                                      void *aux),
602                                  void *func_aux)
603 {
604   struct multipass_split_aux_data aux;
605   bool ok;
606
607   case_nullify (&aux.prev_case);
608   aux.casefile = NULL;
609   aux.split_func = split_func;
610   aux.func_aux = func_aux;
611
612   ok = internal_procedure (multipass_split_case_func,
613                            multipass_split_end_func, &aux);
614   case_destroy (&aux.prev_case);
615
616   return ok;
617 }
618
619 /* Case callback used by multipass_procedure_with_splits(). */
620 static bool
621 multipass_split_case_func (struct ccase *c, void *aux_)
622 {
623   struct multipass_split_aux_data *aux = aux_;
624   bool ok = true;
625
626   /* Start a new series if needed. */
627   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
628     {
629       /* Record split values. */
630       case_destroy (&aux->prev_case);
631       case_clone (&aux->prev_case, c);
632
633       /* Pass any cases to split_func. */
634       if (aux->casefile != NULL)
635         ok = multipass_split_output (aux);
636
637       /* Start a new casefile. */
638       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
639     }
640
641   return casefile_append (aux->casefile, c) && ok;
642 }
643
644 /* End-of-file callback used by multipass_procedure_with_splits(). */
645 static bool
646 multipass_split_end_func (void *aux_)
647 {
648   struct multipass_split_aux_data *aux = aux_;
649   return (aux->casefile == NULL || multipass_split_output (aux));
650 }
651
652 static bool
653 multipass_split_output (struct multipass_split_aux_data *aux)
654 {
655   bool ok;
656   
657   assert (aux->casefile != NULL);
658   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
659   casefile_destroy (aux->casefile);
660   aux->casefile = NULL;
661
662   return ok;
663 }
664 \f
665 /* Discards all the current state in preparation for a data-input
666    command like DATA LIST or GET. */
667 void
668 discard_variables (void)
669 {
670   dict_clear (default_dict);
671   fh_set_default_handle (NULL);
672
673   n_lag = 0;
674   
675   free_case_source (vfm_source);
676   vfm_source = NULL;
677
678   proc_cancel_all_transformations ();
679
680   expr_free (process_if_expr);
681   process_if_expr = NULL;
682
683   proc_cancel_temporary_transformations ();
684 }
685 \f
686 /* Returns the current set of permanent transformations,
687    and clears the permanent transformations.
688    For use by INPUT PROGRAM. */
689 struct trns_chain *
690 proc_capture_transformations (void) 
691 {
692   struct trns_chain *chain;
693   
694   assert (temporary_trns_chain == NULL);
695   chain = permanent_trns_chain;
696   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
697   return chain;
698 }
699
700 /* Adds a transformation that processes a case with PROC and
701    frees itself with FREE to the current set of transformations.
702    The functions are passed AUX as auxiliary data. */
703 void
704 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
705 {
706   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
707 }
708
709 /* Adds a transformation that processes a case with PROC and
710    frees itself with FREE to the current set of transformations.
711    When parsing of the block of transformations is complete,
712    FINALIZE will be called.
713    The functions are passed AUX as auxiliary data. */
714 void
715 add_transformation_with_finalizer (trns_finalize_func *finalize,
716                                    trns_proc_func *proc,
717                                    trns_free_func *free, void *aux)
718 {
719   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
720 }
721
722 /* Returns the index of the next transformation.
723    This value can be returned by a transformation procedure
724    function to indicate a "jump" to that transformation. */
725 size_t
726 next_transformation (void) 
727 {
728   return trns_chain_next (cur_trns_chain);
729 }
730
731 /* Returns true if the next call to add_transformation() will add
732    a temporary transformation, false if it will add a permanent
733    transformation. */
734 bool
735 proc_in_temporary_transformations (void) 
736 {
737   return temporary_trns_chain != NULL;
738 }
739
740 /* Marks the start of temporary transformations.
741    Further calls to add_transformation() will add temporary
742    transformations. */
743 void
744 proc_start_temporary_transformations (void) 
745 {
746   if (!proc_in_temporary_transformations ())
747     {
748       add_case_limit_trns ();
749
750       permanent_dict = dict_clone (default_dict);
751       trns_chain_finalize (permanent_trns_chain);
752       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
753     }
754 }
755
756 /* Converts all the temporary transformations, if any, to
757    permanent transformations.  Further transformations will be
758    permanent.
759    Returns true if anything changed, false otherwise. */
760 bool
761 proc_make_temporary_transformations_permanent (void) 
762 {
763   if (proc_in_temporary_transformations ()) 
764     {
765       trns_chain_finalize (temporary_trns_chain);
766       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
767       temporary_trns_chain = NULL;
768
769       dict_destroy (permanent_dict);
770       permanent_dict = NULL;
771
772       return true;
773     }
774   else
775     return false;
776 }
777
778 /* Cancels all temporary transformations, if any.  Further
779    transformations will be permanent.
780    Returns true if anything changed, false otherwise. */
781 bool
782 proc_cancel_temporary_transformations (void) 
783 {
784   if (proc_in_temporary_transformations ()) 
785     {
786       dict_destroy (default_dict);
787       default_dict = permanent_dict;
788       permanent_dict = NULL;
789
790       trns_chain_destroy (temporary_trns_chain);
791       temporary_trns_chain = NULL;
792
793       return true;
794     }
795   else
796     return false;
797 }
798
799 /* Cancels all transformations, if any.
800    Returns true if successful, false on I/O error. */
801 bool
802 proc_cancel_all_transformations (void)
803 {
804   bool ok;
805   ok = trns_chain_destroy (permanent_trns_chain);
806   ok = trns_chain_destroy (temporary_trns_chain) && ok;
807   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
808   temporary_trns_chain = NULL;
809   return ok;
810 }
811 \f
812 /* Initializes procedure handling. */
813 void
814 proc_init (void) 
815 {
816   default_dict = dict_create ();
817   proc_cancel_all_transformations ();
818 }
819
820 /* Finishes up procedure handling. */
821 void
822 proc_done (void)
823 {
824   discard_variables ();
825 }
826
827 /* Sets SINK as the destination for procedure output from the
828    next procedure. */
829 void
830 proc_set_sink (struct case_sink *sink) 
831 {
832   assert (vfm_sink == NULL);
833   vfm_sink = sink;
834 }
835
836 /* Sets SOURCE as the source for procedure input for the next
837    procedure. */
838 void
839 proc_set_source (struct case_source *source) 
840 {
841   assert (vfm_source == NULL);
842   vfm_source = source;
843 }
844
845 /* Returns true if a source for the next procedure has been
846    configured, false otherwise. */
847 bool
848 proc_has_source (void) 
849 {
850   return vfm_source != NULL;
851 }
852
853 /* Returns the output from the previous procedure.
854    For use only immediately after executing a procedure.
855    The returned casefile is owned by the caller; it will not be
856    automatically used for the next procedure's input. */
857 struct casefile *
858 proc_capture_output (void) 
859 {
860   struct casefile *casefile;
861
862   /* Try to make sure that this function is called immediately
863      after procedure() or a similar function. */
864   assert (vfm_source != NULL);
865   assert (case_source_is_class (vfm_source, &storage_source_class));
866   assert (trns_chain_is_empty (permanent_trns_chain));
867   assert (!proc_in_temporary_transformations ());
868
869   casefile = storage_source_decapsulate (vfm_source);
870   vfm_source = NULL;
871
872   return casefile;
873 }
874 \f
875 static trns_proc_func case_limit_trns_proc;
876 static trns_free_func case_limit_trns_free;
877
878 /* Adds a transformation that limits the number of cases that may
879    pass through, if default_dict has a case limit. */
880 static void
881 add_case_limit_trns (void) 
882 {
883   size_t case_limit = dict_get_case_limit (default_dict);
884   if (case_limit != 0)
885     {
886       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
887       *cases_remaining = case_limit;
888       add_transformation (case_limit_trns_proc, case_limit_trns_free,
889                           cases_remaining);
890       dict_set_case_limit (default_dict, 0);
891     }
892 }
893
894 /* Limits the maximum number of cases processed to
895    *CASES_REMAINING. */
896 static int
897 case_limit_trns_proc (void *cases_remaining_,
898                       struct ccase *c UNUSED, int case_nr UNUSED) 
899 {
900   size_t *cases_remaining = cases_remaining_;
901   if (*cases_remaining > 0) 
902     {
903       *cases_remaining--;
904       return TRNS_CONTINUE;
905     }
906   else
907     return TRNS_DROP_CASE;
908 }
909
910 /* Frees the data associated with a case limit transformation. */
911 static bool
912 case_limit_trns_free (void *cases_remaining_) 
913 {
914   size_t *cases_remaining = cases_remaining_;
915   free (cases_remaining);
916   return true;
917 }
918 \f
919 static trns_proc_func filter_trns_proc;
920
921 /* Adds a temporary transformation to filter data according to
922    the variable specified on FILTER, if any. */
923 static void
924 add_filter_trns (void) 
925 {
926   struct variable *filter_var = dict_get_filter (default_dict);
927   if (filter_var != NULL) 
928     {
929       proc_start_temporary_transformations ();
930       add_transformation (filter_trns_proc, NULL, filter_var);
931     }
932 }
933
934 /* FILTER transformation. */
935 static int
936 filter_trns_proc (void *filter_var_,
937                   struct ccase *c UNUSED, int case_nr UNUSED) 
938   
939 {
940   struct variable *filter_var = filter_var_;
941   double f = case_num (c, filter_var->fv);
942   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
943           ? TRNS_CONTINUE : TRNS_DROP_CASE);
944 }
945 \f
946 static trns_proc_func process_if_trns_proc;
947 static trns_free_func process_if_trns_free;
948
949 /* Adds a temporary transformation to filter data according to
950    the expression specified on PROCESS IF, if any. */
951 static void
952 add_process_if_trns (void) 
953 {
954   if (process_if_expr != NULL) 
955     {
956       proc_start_temporary_transformations ();
957       add_transformation (process_if_trns_proc, process_if_trns_free,
958                           process_if_expr);
959       process_if_expr = NULL;
960     }
961 }
962
963 /* PROCESS IF transformation. */
964 static int
965 process_if_trns_proc (void *expression_,
966                       struct ccase *c UNUSED, int case_nr UNUSED) 
967   
968 {
969   struct expression *expression = expression_;
970   return (expr_evaluate_num (expression, c, case_nr) == 1.0
971           ? TRNS_CONTINUE : TRNS_DROP_CASE);
972 }
973
974 /* Frees a PROCESS IF transformation. */
975 static bool
976 process_if_trns_free (void *expression_) 
977 {
978   struct expression *expression = expression_;
979   expr_free (expression);
980   return true;
981 }