Continue reforming procedure execution. In this phase, remove PROCESS
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 /*
42    Virtual File Manager (vfm):
43
44    vfm is used to process data files.  It uses the model that
45    data is read from one stream (the data source), processed,
46    then written to another (the data sink).  The data source is
47    then deleted and the data sink becomes the data source for the
48    next procedure. */
49
50 /* Procedure execution data. */
51 struct write_case_data
52   {
53     /* Function to call for each case. */
54     bool (*case_func) (const struct ccase *, void *);
55     void *aux;
56
57     struct ccase trns_case;     /* Case used for transformations. */
58     struct ccase sink_case;     /* Case written to sink, if
59                                    compaction is necessary. */
60     size_t cases_written;       /* Cases output so far. */
61   };
62
63 /* Cases are read from vfm_source,
64    pass through permanent_trns_chain (which transforms them into
65    the format described by permanent_dict),
66    are written to vfm_sink,
67    pass through temporary_trns_chain (which transforms them into
68    the format described by default_dict),
69    and are finally passed to the procedure. */
70 static struct case_source *vfm_source;
71 static struct trns_chain *permanent_trns_chain;
72 static struct dictionary *permanent_dict;
73 static struct case_sink *vfm_sink;
74 static struct trns_chain *temporary_trns_chain;
75 struct dictionary *default_dict;
76
77 /* The transformation chain that the next transformation will be
78    added to. */
79 static struct trns_chain *cur_trns_chain;
80
81 /* The compactor used to compact a case, if necessary;
82    otherwise a null pointer. */
83 static struct dict_compactor *compactor;
84
85 /* Time at which vfm was last invoked. */
86 static time_t last_vfm_invocation;
87
88 /* Lag queue. */
89 int n_lag;                      /* Number of cases to lag. */
90 static int lag_count;           /* Number of cases in lag_queue so far. */
91 static int lag_head;            /* Index where next case will be added. */
92 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
93
94 static void add_case_limit_trns (void);
95 static void add_filter_trns (void);
96
97 static bool internal_procedure (bool (*case_func) (const struct ccase *,
98                                                    void *),
99                                 bool (*end_func) (void *),
100                                 void *aux);
101 static void update_last_vfm_invocation (void);
102 static void create_trns_case (struct ccase *, struct dictionary *);
103 static void open_active_file (void);
104 static bool write_case (struct write_case_data *wc_data);
105 static void lag_case (const struct ccase *c);
106 static void clear_case (struct ccase *c);
107 static bool close_active_file (void);
108 \f
109 /* Public functions. */
110
111 /* Returns the last time the data was read. */
112 time_t
113 time_of_last_procedure (void) 
114 {
115   if (last_vfm_invocation == 0)
116     update_last_vfm_invocation ();
117   return last_vfm_invocation;
118 }
119 \f
120 /* Regular procedure. */
121
122 /* Reads the data from the input program and writes it to a new
123    active file.  For each case we read from the input program, we
124    do the following:
125
126    1. Execute permanent transformations.  If these drop the case,
127       start the next case from step 1.
128
129    2. Write case to replacement active file.
130    
131    3. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133       
134    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
135
136    Returns true if successful, false if an I/O error occurred. */
137 bool
138 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
139 {
140   return internal_procedure (proc_func, NULL, aux);
141 }
142 \f
143 /* Multipass procedure. */
144
145 struct multipass_aux_data 
146   {
147     struct casefile *casefile;
148     
149     bool (*proc_func) (const struct casefile *, void *aux);
150     void *aux;
151   };
152
153 /* Case processing function for multipass_procedure(). */
154 static bool
155 multipass_case_func (const struct ccase *c, void *aux_data_) 
156 {
157   struct multipass_aux_data *aux_data = aux_data_;
158   return casefile_append (aux_data->casefile, c);
159 }
160
161 /* End-of-file function for multipass_procedure(). */
162 static bool
163 multipass_end_func (void *aux_data_) 
164 {
165   struct multipass_aux_data *aux_data = aux_data_;
166   return (aux_data->proc_func == NULL
167           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
168 }
169
170 /* Procedure that allows multiple passes over the input data.
171    The entire active file is passed to PROC_FUNC, with the given
172    AUX as auxiliary data, as a unit. */
173 bool
174 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
175                      void *aux) 
176 {
177   struct multipass_aux_data aux_data;
178   bool ok;
179
180   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
181   aux_data.proc_func = proc_func;
182   aux_data.aux = aux;
183
184   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
185   ok = !casefile_error (aux_data.casefile) && ok;
186
187   casefile_destroy (aux_data.casefile);
188
189   return ok;
190 }
191 \f
192 /* Procedure implementation. */
193
194 /* Executes a procedure.
195    Passes each case to CASE_FUNC.
196    Calls END_FUNC after the last case.
197    Returns true if successful, false if an I/O error occurred (or
198    if CASE_FUNC or END_FUNC ever returned false). */
199 static bool
200 internal_procedure (bool (*case_func) (const struct ccase *, void *),
201                     bool (*end_func) (void *),
202                     void *aux) 
203 {
204   struct write_case_data wc_data;
205   bool ok = true;
206
207   assert (vfm_source != NULL);
208
209   update_last_vfm_invocation ();
210
211   /* Optimize the trivial case where we're not going to do
212      anything with the data, by not reading the data at all. */
213   if (case_func == NULL && end_func == NULL
214       && case_source_is_class (vfm_source, &storage_source_class)
215       && vfm_sink == NULL
216       && (temporary_trns_chain == NULL
217           || trns_chain_is_empty (temporary_trns_chain))
218       && trns_chain_is_empty (permanent_trns_chain))
219     {
220       n_lag = 0;
221       dict_set_case_limit (default_dict, 0);
222       dict_clear_vectors (default_dict);
223       return true;
224     }
225   
226   open_active_file ();
227   
228   wc_data.case_func = case_func;
229   wc_data.aux = aux;
230   create_trns_case (&wc_data.trns_case, default_dict);
231   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
232   wc_data.cases_written = 0;
233
234   ok = vfm_source->class->read (vfm_source,
235                                 &wc_data.trns_case,
236                                 write_case, &wc_data) && ok;
237   if (end_func != NULL)
238     ok = end_func (aux) && ok;
239
240   case_destroy (&wc_data.sink_case);
241   case_destroy (&wc_data.trns_case);
242
243   ok = close_active_file () && ok;
244
245   return ok;
246 }
247
248 /* Updates last_vfm_invocation. */
249 static void
250 update_last_vfm_invocation (void) 
251 {
252   last_vfm_invocation = time (NULL);
253 }
254
255 /* Creates and returns a case, initializing it from the vectors
256    that say which `value's need to be initialized just once, and
257    which ones need to be re-initialized before every case. */
258 static void
259 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
260 {
261   size_t var_cnt = dict_get_var_cnt (dict);
262   size_t i;
263
264   case_create (trns_case, dict_get_next_value_idx (dict));
265   for (i = 0; i < var_cnt; i++) 
266     {
267       struct variable *v = dict_get_var (dict, i);
268       union value *value = case_data_rw (trns_case, v->fv);
269
270       if (v->type == NUMERIC)
271         value->f = v->leave ? 0.0 : SYSMIS;
272       else
273         memset (value->s, ' ', v->width);
274     }
275 }
276
277 /* Makes all preparations for reading from the data source and writing
278    to the data sink. */
279 static void
280 open_active_file (void)
281 {
282   add_case_limit_trns ();
283   add_filter_trns ();
284
285   /* Finalize transformations. */
286   trns_chain_finalize (cur_trns_chain);
287
288   /* Make permanent_dict refer to the dictionary right before
289      data reaches the sink. */
290   if (permanent_dict == NULL)
291     permanent_dict = default_dict;
292
293   /* Figure out compaction. */
294   compactor = (dict_needs_compaction (permanent_dict)
295                ? dict_make_compactor (permanent_dict)
296                : NULL);
297
298   /* Prepare sink. */
299   if (vfm_sink == NULL)
300     vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
301   if (vfm_sink->class->open != NULL)
302     vfm_sink->class->open (vfm_sink);
303
304   /* Allocate memory for lag queue. */
305   if (n_lag > 0)
306     {
307       int i;
308   
309       lag_count = 0;
310       lag_head = 0;
311       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
312       for (i = 0; i < n_lag; i++)
313         case_nullify (&lag_queue[i]);
314     }
315 }
316
317 /* Transforms trns_case and writes it to the replacement active
318    file if advisable.  Returns true if more cases can be
319    accepted, false otherwise.  Do not call this function again
320    after it has returned false once.  */
321 static bool
322 write_case (struct write_case_data *wc_data)
323 {
324   enum trns_result retval;
325   size_t case_nr;
326   
327   /* Execute permanent transformations.  */
328   case_nr = wc_data->cases_written + 1;
329   retval = trns_chain_execute (permanent_trns_chain,
330                                &wc_data->trns_case, &case_nr);
331   if (retval != TRNS_CONTINUE)
332     goto done;
333
334   /* Write case to LAG queue. */
335   if (n_lag)
336     lag_case (&wc_data->trns_case);
337
338   /* Write case to replacement active file. */
339   wc_data->cases_written++;
340   if (vfm_sink->class->write != NULL) 
341     {
342       if (compactor != NULL) 
343         {
344           dict_compactor_compact (compactor, &wc_data->sink_case,
345                                   &wc_data->trns_case);
346           vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
347         }
348       else
349         vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
350     }
351   
352   /* Execute temporary transformations. */
353   if (temporary_trns_chain != NULL) 
354     {
355       retval = trns_chain_execute (temporary_trns_chain,
356                                    &wc_data->trns_case,
357                                    &wc_data->cases_written);
358       if (retval != TRNS_CONTINUE)
359         goto done;
360     }
361
362   /* Pass case to procedure. */
363   if (wc_data->case_func != NULL)
364     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
365       retval = TRNS_ERROR;
366
367  done:
368   clear_case (&wc_data->trns_case);
369   return retval != TRNS_ERROR;
370 }
371
372 /* Add C to the lag queue. */
373 static void
374 lag_case (const struct ccase *c)
375 {
376   if (lag_count < n_lag)
377     lag_count++;
378   case_destroy (&lag_queue[lag_head]);
379   case_clone (&lag_queue[lag_head], c);
380   if (++lag_head >= n_lag)
381     lag_head = 0;
382 }
383
384 /* Clears the variables in C that need to be cleared between
385    processing cases.  */
386 static void
387 clear_case (struct ccase *c)
388 {
389   size_t var_cnt = dict_get_var_cnt (default_dict);
390   size_t i;
391   
392   for (i = 0; i < var_cnt; i++) 
393     {
394       struct variable *v = dict_get_var (default_dict, i);
395       if (!v->leave) 
396         {
397           if (v->type == NUMERIC)
398             case_data_rw (c, v->fv)->f = SYSMIS;
399           else
400             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
401         } 
402     }
403 }
404
405 /* Closes the active file. */
406 static bool
407 close_active_file (void)
408 {
409   /* Free memory for lag queue, and turn off lagging. */
410   if (n_lag > 0)
411     {
412       int i;
413       
414       for (i = 0; i < n_lag; i++)
415         case_destroy (&lag_queue[i]);
416       free (lag_queue);
417       n_lag = 0;
418     }
419   
420   /* Dictionary from before TEMPORARY becomes permanent. */
421   proc_cancel_temporary_transformations ();
422
423   /* Finish compaction. */
424   if (compactor != NULL) 
425     {
426       dict_compactor_destroy (compactor);
427       dict_compact_values (default_dict);
428       compactor = NULL;
429     }
430     
431   /* Free data source. */
432   free_case_source (vfm_source);
433   vfm_source = NULL;
434
435   /* Old data sink becomes new data source. */
436   if (vfm_sink->class->make_source != NULL)
437     vfm_source = vfm_sink->class->make_source (vfm_sink);
438   free_case_sink (vfm_sink);
439   vfm_sink = NULL;
440
441   dict_clear_vectors (default_dict);
442   permanent_dict = NULL;
443   return proc_cancel_all_transformations ();
444 }
445 \f
446 /* Returns a pointer to the lagged case from N_BEFORE cases before the
447    current one, or NULL if there haven't been that many cases yet. */
448 struct ccase *
449 lagged_case (int n_before)
450 {
451   assert (n_before >= 1 );
452   assert (n_before <= n_lag);
453
454   if (n_before <= lag_count)
455     {
456       int index = lag_head - n_before;
457       if (index < 0)
458         index += n_lag;
459       return &lag_queue[index];
460     }
461   else
462     return NULL;
463 }
464 \f
465 /* Procedure that separates the data into SPLIT FILE groups. */
466
467 /* Represents auxiliary data for handling SPLIT FILE. */
468 struct split_aux_data 
469   {
470     size_t case_count;          /* Number of cases so far. */
471     struct ccase prev_case;     /* Data in previous case. */
472
473     /* Callback functions. */
474     void (*begin_func) (const struct ccase *, void *);
475     bool (*proc_func) (const struct ccase *, void *);
476     void (*end_func) (void *);
477     void *func_aux;
478   };
479
480 static int equal_splits (const struct ccase *, const struct ccase *);
481 static bool split_procedure_case_func (const struct ccase *c, void *);
482 static bool split_procedure_end_func (void *);
483
484 /* Like procedure(), but it automatically breaks the case stream
485    into SPLIT FILE break groups.  Before each group of cases with
486    identical SPLIT FILE variable values, BEGIN_FUNC is called
487    with the first case in the group.
488    Then PROC_FUNC is called for each case in the group (including
489    the first).
490    END_FUNC is called when the group is finished.  FUNC_AUX is
491    passed to each of the functions as auxiliary data.
492
493    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
494    and END_FUNC will be called at all. 
495
496    If SPLIT FILE is not in effect, then there is one break group
497    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
498    will be called once.
499    
500    Returns true if successful, false if an I/O error occurred. */
501 bool
502 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
503                        bool (*proc_func) (const struct ccase *, void *aux),
504                        void (*end_func) (void *aux),
505                        void *func_aux) 
506 {
507   struct split_aux_data split_aux;
508   bool ok;
509
510   split_aux.case_count = 0;
511   case_nullify (&split_aux.prev_case);
512   split_aux.begin_func = begin_func;
513   split_aux.proc_func = proc_func;
514   split_aux.end_func = end_func;
515   split_aux.func_aux = func_aux;
516
517   ok = internal_procedure (split_procedure_case_func,
518                            split_procedure_end_func, &split_aux);
519
520   case_destroy (&split_aux.prev_case);
521
522   return ok;
523 }
524
525 /* Case callback used by procedure_with_splits(). */
526 static bool
527 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
528 {
529   struct split_aux_data *split_aux = split_aux_;
530
531   /* Start a new series if needed. */
532   if (split_aux->case_count == 0
533       || !equal_splits (c, &split_aux->prev_case))
534     {
535       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
536         split_aux->end_func (split_aux->func_aux);
537
538       case_destroy (&split_aux->prev_case);
539       case_clone (&split_aux->prev_case, c);
540
541       if (split_aux->begin_func != NULL)
542         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
543     }
544
545   split_aux->case_count++;
546   return (split_aux->proc_func == NULL
547           || split_aux->proc_func (c, split_aux->func_aux));
548 }
549
550 /* End-of-file callback used by procedure_with_splits(). */
551 static bool
552 split_procedure_end_func (void *split_aux_) 
553 {
554   struct split_aux_data *split_aux = split_aux_;
555
556   if (split_aux->case_count > 0 && split_aux->end_func != NULL)
557     split_aux->end_func (split_aux->func_aux);
558   return true;
559 }
560
561 /* Compares the SPLIT FILE variables in cases A and B and returns
562    nonzero only if they differ. */
563 static int
564 equal_splits (const struct ccase *a, const struct ccase *b) 
565 {
566   return case_compare (a, b,
567                        dict_get_split_vars (default_dict),
568                        dict_get_split_cnt (default_dict)) == 0;
569 }
570 \f
571 /* Multipass procedure that separates the data into SPLIT FILE
572    groups. */
573
574 /* Represents auxiliary data for handling SPLIT FILE in a
575    multipass procedure. */
576 struct multipass_split_aux_data 
577   {
578     struct ccase prev_case;     /* Data in previous case. */
579     struct casefile *casefile;  /* Accumulates data for a split. */
580
581     /* Function to call with the accumulated data. */
582     bool (*split_func) (const struct ccase *first, const struct casefile *,
583                         void *);
584     void *func_aux;                            /* Auxiliary data. */ 
585   };
586
587 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
588 static bool multipass_split_end_func (void *aux_);
589 static bool multipass_split_output (struct multipass_split_aux_data *);
590
591 /* Returns true if successful, false if an I/O error occurred. */
592 bool
593 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
594                                                      const struct casefile *,
595                                                      void *aux),
596                                  void *func_aux)
597 {
598   struct multipass_split_aux_data aux;
599   bool ok;
600
601   case_nullify (&aux.prev_case);
602   aux.casefile = NULL;
603   aux.split_func = split_func;
604   aux.func_aux = func_aux;
605
606   ok = internal_procedure (multipass_split_case_func,
607                            multipass_split_end_func, &aux);
608   case_destroy (&aux.prev_case);
609
610   return ok;
611 }
612
613 /* Case callback used by multipass_procedure_with_splits(). */
614 static bool
615 multipass_split_case_func (const struct ccase *c, void *aux_)
616 {
617   struct multipass_split_aux_data *aux = aux_;
618   bool ok = true;
619
620   /* Start a new series if needed. */
621   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
622     {
623       /* Record split values. */
624       case_destroy (&aux->prev_case);
625       case_clone (&aux->prev_case, c);
626
627       /* Pass any cases to split_func. */
628       if (aux->casefile != NULL)
629         ok = multipass_split_output (aux);
630
631       /* Start a new casefile. */
632       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
633     }
634
635   return casefile_append (aux->casefile, c) && ok;
636 }
637
638 /* End-of-file callback used by multipass_procedure_with_splits(). */
639 static bool
640 multipass_split_end_func (void *aux_)
641 {
642   struct multipass_split_aux_data *aux = aux_;
643   return (aux->casefile == NULL || multipass_split_output (aux));
644 }
645
646 static bool
647 multipass_split_output (struct multipass_split_aux_data *aux)
648 {
649   bool ok;
650   
651   assert (aux->casefile != NULL);
652   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
653   casefile_destroy (aux->casefile);
654   aux->casefile = NULL;
655
656   return ok;
657 }
658 \f
659 /* Discards all the current state in preparation for a data-input
660    command like DATA LIST or GET. */
661 void
662 discard_variables (void)
663 {
664   dict_clear (default_dict);
665   fh_set_default_handle (NULL);
666
667   n_lag = 0;
668   
669   free_case_source (vfm_source);
670   vfm_source = NULL;
671
672   proc_cancel_all_transformations ();
673 }
674 \f
675 /* Returns the current set of permanent transformations,
676    and clears the permanent transformations.
677    For use by INPUT PROGRAM. */
678 struct trns_chain *
679 proc_capture_transformations (void) 
680 {
681   struct trns_chain *chain;
682   
683   assert (temporary_trns_chain == NULL);
684   chain = permanent_trns_chain;
685   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
686   return chain;
687 }
688
689 /* Adds a transformation that processes a case with PROC and
690    frees itself with FREE to the current set of transformations.
691    The functions are passed AUX as auxiliary data. */
692 void
693 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
694 {
695   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
696 }
697
698 /* Adds a transformation that processes a case with PROC and
699    frees itself with FREE to the current set of transformations.
700    When parsing of the block of transformations is complete,
701    FINALIZE will be called.
702    The functions are passed AUX as auxiliary data. */
703 void
704 add_transformation_with_finalizer (trns_finalize_func *finalize,
705                                    trns_proc_func *proc,
706                                    trns_free_func *free, void *aux)
707 {
708   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
709 }
710
711 /* Returns the index of the next transformation.
712    This value can be returned by a transformation procedure
713    function to indicate a "jump" to that transformation. */
714 size_t
715 next_transformation (void) 
716 {
717   return trns_chain_next (cur_trns_chain);
718 }
719
720 /* Returns true if the next call to add_transformation() will add
721    a temporary transformation, false if it will add a permanent
722    transformation. */
723 bool
724 proc_in_temporary_transformations (void) 
725 {
726   return temporary_trns_chain != NULL;
727 }
728
729 /* Marks the start of temporary transformations.
730    Further calls to add_transformation() will add temporary
731    transformations. */
732 void
733 proc_start_temporary_transformations (void) 
734 {
735   if (!proc_in_temporary_transformations ())
736     {
737       add_case_limit_trns ();
738
739       permanent_dict = dict_clone (default_dict);
740       trns_chain_finalize (permanent_trns_chain);
741       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
742     }
743 }
744
745 /* Converts all the temporary transformations, if any, to
746    permanent transformations.  Further transformations will be
747    permanent.
748    Returns true if anything changed, false otherwise. */
749 bool
750 proc_make_temporary_transformations_permanent (void) 
751 {
752   if (proc_in_temporary_transformations ()) 
753     {
754       trns_chain_finalize (temporary_trns_chain);
755       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
756       temporary_trns_chain = NULL;
757
758       dict_destroy (permanent_dict);
759       permanent_dict = NULL;
760
761       return true;
762     }
763   else
764     return false;
765 }
766
767 /* Cancels all temporary transformations, if any.  Further
768    transformations will be permanent.
769    Returns true if anything changed, false otherwise. */
770 bool
771 proc_cancel_temporary_transformations (void) 
772 {
773   if (proc_in_temporary_transformations ()) 
774     {
775       dict_destroy (default_dict);
776       default_dict = permanent_dict;
777       permanent_dict = NULL;
778
779       trns_chain_destroy (temporary_trns_chain);
780       temporary_trns_chain = NULL;
781
782       return true;
783     }
784   else
785     return false;
786 }
787
788 /* Cancels all transformations, if any.
789    Returns true if successful, false on I/O error. */
790 bool
791 proc_cancel_all_transformations (void)
792 {
793   bool ok;
794   ok = trns_chain_destroy (permanent_trns_chain);
795   ok = trns_chain_destroy (temporary_trns_chain) && ok;
796   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
797   temporary_trns_chain = NULL;
798   return ok;
799 }
800 \f
801 /* Initializes procedure handling. */
802 void
803 proc_init (void) 
804 {
805   default_dict = dict_create ();
806   proc_cancel_all_transformations ();
807 }
808
809 /* Finishes up procedure handling. */
810 void
811 proc_done (void)
812 {
813   discard_variables ();
814 }
815
816 /* Sets SINK as the destination for procedure output from the
817    next procedure. */
818 void
819 proc_set_sink (struct case_sink *sink) 
820 {
821   assert (vfm_sink == NULL);
822   vfm_sink = sink;
823 }
824
825 /* Sets SOURCE as the source for procedure input for the next
826    procedure. */
827 void
828 proc_set_source (struct case_source *source) 
829 {
830   assert (vfm_source == NULL);
831   vfm_source = source;
832 }
833
834 /* Returns true if a source for the next procedure has been
835    configured, false otherwise. */
836 bool
837 proc_has_source (void) 
838 {
839   return vfm_source != NULL;
840 }
841
842 /* Returns the output from the previous procedure.
843    For use only immediately after executing a procedure.
844    The returned casefile is owned by the caller; it will not be
845    automatically used for the next procedure's input. */
846 struct casefile *
847 proc_capture_output (void) 
848 {
849   struct casefile *casefile;
850
851   /* Try to make sure that this function is called immediately
852      after procedure() or a similar function. */
853   assert (vfm_source != NULL);
854   assert (case_source_is_class (vfm_source, &storage_source_class));
855   assert (trns_chain_is_empty (permanent_trns_chain));
856   assert (!proc_in_temporary_transformations ());
857
858   casefile = storage_source_decapsulate (vfm_source);
859   vfm_source = NULL;
860
861   return casefile;
862 }
863 \f
864 static trns_proc_func case_limit_trns_proc;
865 static trns_free_func case_limit_trns_free;
866
867 /* Adds a transformation that limits the number of cases that may
868    pass through, if default_dict has a case limit. */
869 static void
870 add_case_limit_trns (void) 
871 {
872   size_t case_limit = dict_get_case_limit (default_dict);
873   if (case_limit != 0)
874     {
875       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
876       *cases_remaining = case_limit;
877       add_transformation (case_limit_trns_proc, case_limit_trns_free,
878                           cases_remaining);
879       dict_set_case_limit (default_dict, 0);
880     }
881 }
882
883 /* Limits the maximum number of cases processed to
884    *CASES_REMAINING. */
885 static int
886 case_limit_trns_proc (void *cases_remaining_,
887                       struct ccase *c UNUSED, int case_nr UNUSED) 
888 {
889   size_t *cases_remaining = cases_remaining_;
890   if (*cases_remaining > 0) 
891     {
892       *cases_remaining--;
893       return TRNS_CONTINUE;
894     }
895   else
896     return TRNS_DROP_CASE;
897 }
898
899 /* Frees the data associated with a case limit transformation. */
900 static bool
901 case_limit_trns_free (void *cases_remaining_) 
902 {
903   size_t *cases_remaining = cases_remaining_;
904   free (cases_remaining);
905   return true;
906 }
907 \f
908 static trns_proc_func filter_trns_proc;
909
910 /* Adds a temporary transformation to filter data according to
911    the variable specified on FILTER, if any. */
912 static void
913 add_filter_trns (void) 
914 {
915   struct variable *filter_var = dict_get_filter (default_dict);
916   if (filter_var != NULL) 
917     {
918       proc_start_temporary_transformations ();
919       add_transformation (filter_trns_proc, NULL, filter_var);
920     }
921 }
922
923 /* FILTER transformation. */
924 static int
925 filter_trns_proc (void *filter_var_,
926                   struct ccase *c UNUSED, int case_nr UNUSED) 
927   
928 {
929   struct variable *filter_var = filter_var_;
930   double f = case_num (c, filter_var->fv);
931   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
932           ? TRNS_CONTINUE : TRNS_DROP_CASE);
933 }
934