4dcce32b0f8ec8ac467423d52c1d25ebf4cdb951
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 /* Procedure execution data. */
42 struct write_case_data
43   {
44     /* Function to call for each case. */
45     bool (*case_func) (const struct ccase *, void *);
46     void *aux;
47
48     struct ccase trns_case;     /* Case used for transformations. */
49     struct ccase sink_case;     /* Case written to sink, if
50                                    compacting is necessary. */
51     size_t cases_written;       /* Cases output so far. */
52   };
53
54 /* Cases are read from proc_source,
55    pass through permanent_trns_chain (which transforms them into
56    the format described by permanent_dict),
57    are written to proc_sink,
58    pass through temporary_trns_chain (which transforms them into
59    the format described by default_dict),
60    and are finally passed to the procedure. */
61 static struct case_source *proc_source;
62 static struct trns_chain *permanent_trns_chain;
63 static struct dictionary *permanent_dict;
64 static struct case_sink *proc_sink;
65 static struct trns_chain *temporary_trns_chain;
66 struct dictionary *default_dict;
67
68 /* The transformation chain that the next transformation will be
69    added to. */
70 static struct trns_chain *cur_trns_chain;
71
72 /* The compactor used to compact a case, if necessary;
73    otherwise a null pointer. */
74 static struct dict_compactor *compactor;
75
76 /* Time at which proc was last invoked. */
77 static time_t last_proc_invocation;
78
79 /* Lag queue. */
80 int n_lag;                      /* Number of cases to lag. */
81 static int lag_count;           /* Number of cases in lag_queue so far. */
82 static int lag_head;            /* Index where next case will be added. */
83 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
84
85 static void add_case_limit_trns (void);
86 static void add_filter_trns (void);
87
88 static bool internal_procedure (bool (*case_func) (const struct ccase *,
89                                                    void *),
90                                 bool (*end_func) (void *),
91                                 void *aux);
92 static void update_last_proc_invocation (void);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static bool write_case (struct write_case_data *wc_data);
96 static void lag_case (const struct ccase *c);
97 static void clear_case (struct ccase *c);
98 static bool close_active_file (void);
99 \f
100 /* Public functions. */
101
102 /* Returns the last time the data was read. */
103 time_t
104 time_of_last_procedure (void) 
105 {
106   if (last_proc_invocation == 0)
107     update_last_proc_invocation ();
108   return last_proc_invocation;
109 }
110 \f
111 /* Regular procedure. */
112
113 /* Reads the data from the input program and writes it to a new
114    active file.  For each case we read from the input program, we
115    do the following:
116
117    1. Execute permanent transformations.  If these drop the case,
118       start the next case from step 1.
119
120    2. Write case to replacement active file.
121    
122    3. Execute temporary transformations.  If these drop the case,
123       start the next case from step 1.
124       
125    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
126
127    Returns true if successful, false if an I/O error occurred. */
128 bool
129 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
130 {
131   return internal_procedure (proc_func, NULL, aux);
132 }
133 \f
134 /* Multipass procedure. */
135
136 struct multipass_aux_data 
137   {
138     struct casefile *casefile;
139     
140     bool (*proc_func) (const struct casefile *, void *aux);
141     void *aux;
142   };
143
144 /* Case processing function for multipass_procedure(). */
145 static bool
146 multipass_case_func (const struct ccase *c, void *aux_data_) 
147 {
148   struct multipass_aux_data *aux_data = aux_data_;
149   return casefile_append (aux_data->casefile, c);
150 }
151
152 /* End-of-file function for multipass_procedure(). */
153 static bool
154 multipass_end_func (void *aux_data_) 
155 {
156   struct multipass_aux_data *aux_data = aux_data_;
157   return (aux_data->proc_func == NULL
158           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
159 }
160
161 /* Procedure that allows multiple passes over the input data.
162    The entire active file is passed to PROC_FUNC, with the given
163    AUX as auxiliary data, as a unit. */
164 bool
165 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
166                      void *aux) 
167 {
168   struct multipass_aux_data aux_data;
169   bool ok;
170
171   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
172   aux_data.proc_func = proc_func;
173   aux_data.aux = aux;
174
175   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
176   ok = !casefile_error (aux_data.casefile) && ok;
177
178   casefile_destroy (aux_data.casefile);
179
180   return ok;
181 }
182 \f
183 /* Procedure implementation. */
184
185 /* Executes a procedure.
186    Passes each case to CASE_FUNC.
187    Calls END_FUNC after the last case.
188    Returns true if successful, false if an I/O error occurred (or
189    if CASE_FUNC or END_FUNC ever returned false). */
190 static bool
191 internal_procedure (bool (*case_func) (const struct ccase *, void *),
192                     bool (*end_func) (void *),
193                     void *aux) 
194 {
195   struct write_case_data wc_data;
196   bool ok = true;
197
198   assert (proc_source != NULL);
199
200   update_last_proc_invocation ();
201
202   /* Optimize the trivial case where we're not going to do
203      anything with the data, by not reading the data at all. */
204   if (case_func == NULL && end_func == NULL
205       && case_source_is_class (proc_source, &storage_source_class)
206       && proc_sink == NULL
207       && (temporary_trns_chain == NULL
208           || trns_chain_is_empty (temporary_trns_chain))
209       && trns_chain_is_empty (permanent_trns_chain))
210     {
211       n_lag = 0;
212       dict_set_case_limit (default_dict, 0);
213       dict_clear_vectors (default_dict);
214       return true;
215     }
216   
217   open_active_file ();
218   
219   wc_data.case_func = case_func;
220   wc_data.aux = aux;
221   create_trns_case (&wc_data.trns_case, default_dict);
222   case_create (&wc_data.sink_case,
223                dict_get_compacted_value_cnt (default_dict));
224   wc_data.cases_written = 0;
225
226   ok = proc_source->class->read (proc_source,
227                                  &wc_data.trns_case,
228                                  write_case, &wc_data) && ok;
229   if (end_func != NULL)
230     ok = end_func (aux) && ok;
231
232   case_destroy (&wc_data.sink_case);
233   case_destroy (&wc_data.trns_case);
234
235   ok = close_active_file () && ok;
236
237   return ok;
238 }
239
240 /* Updates last_proc_invocation. */
241 static void
242 update_last_proc_invocation (void) 
243 {
244   last_proc_invocation = time (NULL);
245 }
246
247 /* Creates and returns a case, initializing it from the vectors
248    that say which `value's need to be initialized just once, and
249    which ones need to be re-initialized before every case. */
250 static void
251 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
252 {
253   size_t var_cnt = dict_get_var_cnt (dict);
254   size_t i;
255
256   case_create (trns_case, dict_get_next_value_idx (dict));
257   for (i = 0; i < var_cnt; i++) 
258     {
259       struct variable *v = dict_get_var (dict, i);
260       union value *value = case_data_rw (trns_case, v->fv);
261
262       if (v->type == NUMERIC)
263         value->f = v->leave ? 0.0 : SYSMIS;
264       else
265         memset (value->s, ' ', v->width);
266     }
267 }
268
269 /* Makes all preparations for reading from the data source and writing
270    to the data sink. */
271 static void
272 open_active_file (void)
273 {
274   add_case_limit_trns ();
275   add_filter_trns ();
276
277   /* Finalize transformations. */
278   trns_chain_finalize (cur_trns_chain);
279
280   /* Make permanent_dict refer to the dictionary right before
281      data reaches the sink. */
282   if (permanent_dict == NULL)
283     permanent_dict = default_dict;
284
285   /* Figure out whether to compact. */
286   compactor = (dict_compacting_would_shrink (permanent_dict)
287                ? dict_make_compactor (permanent_dict)
288                : NULL);
289
290   /* Prepare sink. */
291   if (proc_sink == NULL)
292     proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
293   if (proc_sink->class->open != NULL)
294     proc_sink->class->open (proc_sink);
295
296   /* Allocate memory for lag queue. */
297   if (n_lag > 0)
298     {
299       int i;
300   
301       lag_count = 0;
302       lag_head = 0;
303       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
304       for (i = 0; i < n_lag; i++)
305         case_nullify (&lag_queue[i]);
306     }
307 }
308
309 /* Transforms trns_case and writes it to the replacement active
310    file if advisable.  Returns true if more cases can be
311    accepted, false otherwise.  Do not call this function again
312    after it has returned false once.  */
313 static bool
314 write_case (struct write_case_data *wc_data)
315 {
316   enum trns_result retval;
317   size_t case_nr;
318   
319   /* Execute permanent transformations.  */
320   case_nr = wc_data->cases_written + 1;
321   retval = trns_chain_execute (permanent_trns_chain,
322                                &wc_data->trns_case, &case_nr);
323   if (retval != TRNS_CONTINUE)
324     goto done;
325
326   /* Write case to LAG queue. */
327   if (n_lag)
328     lag_case (&wc_data->trns_case);
329
330   /* Write case to replacement active file. */
331   wc_data->cases_written++;
332   if (proc_sink->class->write != NULL) 
333     {
334       if (compactor != NULL) 
335         {
336           dict_compactor_compact (compactor, &wc_data->sink_case,
337                                   &wc_data->trns_case);
338           proc_sink->class->write (proc_sink, &wc_data->sink_case);
339         }
340       else
341         proc_sink->class->write (proc_sink, &wc_data->trns_case);
342     }
343   
344   /* Execute temporary transformations. */
345   if (temporary_trns_chain != NULL) 
346     {
347       retval = trns_chain_execute (temporary_trns_chain,
348                                    &wc_data->trns_case,
349                                    &wc_data->cases_written);
350       if (retval != TRNS_CONTINUE)
351         goto done;
352     }
353
354   /* Pass case to procedure. */
355   if (wc_data->case_func != NULL)
356     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
357       retval = TRNS_ERROR;
358
359  done:
360   clear_case (&wc_data->trns_case);
361   return retval != TRNS_ERROR;
362 }
363
364 /* Add C to the lag queue. */
365 static void
366 lag_case (const struct ccase *c)
367 {
368   if (lag_count < n_lag)
369     lag_count++;
370   case_destroy (&lag_queue[lag_head]);
371   case_clone (&lag_queue[lag_head], c);
372   if (++lag_head >= n_lag)
373     lag_head = 0;
374 }
375
376 /* Clears the variables in C that need to be cleared between
377    processing cases.  */
378 static void
379 clear_case (struct ccase *c)
380 {
381   size_t var_cnt = dict_get_var_cnt (default_dict);
382   size_t i;
383   
384   for (i = 0; i < var_cnt; i++) 
385     {
386       struct variable *v = dict_get_var (default_dict, i);
387       if (!v->leave) 
388         {
389           if (v->type == NUMERIC)
390             case_data_rw (c, v->fv)->f = SYSMIS;
391           else
392             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
393         } 
394     }
395 }
396
397 /* Closes the active file. */
398 static bool
399 close_active_file (void)
400 {
401   /* Free memory for lag queue, and turn off lagging. */
402   if (n_lag > 0)
403     {
404       int i;
405       
406       for (i = 0; i < n_lag; i++)
407         case_destroy (&lag_queue[i]);
408       free (lag_queue);
409       n_lag = 0;
410     }
411   
412   /* Dictionary from before TEMPORARY becomes permanent. */
413   proc_cancel_temporary_transformations ();
414
415   /* Finish compacting. */
416   if (compactor != NULL) 
417     {
418       dict_compactor_destroy (compactor);
419       dict_compact_values (default_dict);
420       compactor = NULL;
421     }
422     
423   /* Free data source. */
424   free_case_source (proc_source);
425   proc_source = NULL;
426
427   /* Old data sink becomes new data source. */
428   if (proc_sink->class->make_source != NULL)
429     proc_source = proc_sink->class->make_source (proc_sink);
430   free_case_sink (proc_sink);
431   proc_sink = NULL;
432
433   dict_clear_vectors (default_dict);
434   permanent_dict = NULL;
435   return proc_cancel_all_transformations ();
436 }
437 \f
438 /* Returns a pointer to the lagged case from N_BEFORE cases before the
439    current one, or NULL if there haven't been that many cases yet. */
440 struct ccase *
441 lagged_case (int n_before)
442 {
443   assert (n_before >= 1 );
444   assert (n_before <= n_lag);
445
446   if (n_before <= lag_count)
447     {
448       int index = lag_head - n_before;
449       if (index < 0)
450         index += n_lag;
451       return &lag_queue[index];
452     }
453   else
454     return NULL;
455 }
456 \f
457 /* Procedure that separates the data into SPLIT FILE groups. */
458
459 /* Represents auxiliary data for handling SPLIT FILE. */
460 struct split_aux_data 
461   {
462     struct ccase prev_case;     /* Data in previous case. */
463
464     /* Callback functions. */
465     void (*begin_func) (const struct ccase *, void *);
466     bool (*proc_func) (const struct ccase *, void *);
467     void (*end_func) (void *);
468     void *func_aux;
469   };
470
471 static int equal_splits (const struct ccase *, const struct ccase *);
472 static bool split_procedure_case_func (const struct ccase *c, void *);
473 static bool split_procedure_end_func (void *);
474
475 /* Like procedure(), but it automatically breaks the case stream
476    into SPLIT FILE break groups.  Before each group of cases with
477    identical SPLIT FILE variable values, BEGIN_FUNC is called
478    with the first case in the group.
479    Then PROC_FUNC is called for each case in the group (including
480    the first).
481    END_FUNC is called when the group is finished.  FUNC_AUX is
482    passed to each of the functions as auxiliary data.
483
484    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
485    and END_FUNC will be called at all. 
486
487    If SPLIT FILE is not in effect, then there is one break group
488    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
489    will be called once.
490    
491    Returns true if successful, false if an I/O error occurred. */
492 bool
493 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
494                        bool (*proc_func) (const struct ccase *, void *aux),
495                        void (*end_func) (void *aux),
496                        void *func_aux) 
497 {
498   struct split_aux_data split_aux;
499   bool ok;
500
501   case_nullify (&split_aux.prev_case);
502   split_aux.begin_func = begin_func;
503   split_aux.proc_func = proc_func;
504   split_aux.end_func = end_func;
505   split_aux.func_aux = func_aux;
506
507   ok = internal_procedure (split_procedure_case_func,
508                            split_procedure_end_func, &split_aux);
509
510   case_destroy (&split_aux.prev_case);
511
512   return ok;
513 }
514
515 /* Case callback used by procedure_with_splits(). */
516 static bool
517 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
518 {
519   struct split_aux_data *split_aux = split_aux_;
520
521   /* Start a new series if needed. */
522   if (case_is_null (&split_aux->prev_case)
523       || !equal_splits (c, &split_aux->prev_case))
524     {
525       if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
526         split_aux->end_func (split_aux->func_aux);
527
528       case_destroy (&split_aux->prev_case);
529       case_clone (&split_aux->prev_case, c);
530
531       if (split_aux->begin_func != NULL)
532         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
533     }
534
535   return (split_aux->proc_func == NULL
536           || split_aux->proc_func (c, split_aux->func_aux));
537 }
538
539 /* End-of-file callback used by procedure_with_splits(). */
540 static bool
541 split_procedure_end_func (void *split_aux_) 
542 {
543   struct split_aux_data *split_aux = split_aux_;
544
545   if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
546     split_aux->end_func (split_aux->func_aux);
547   return true;
548 }
549
550 /* Compares the SPLIT FILE variables in cases A and B and returns
551    nonzero only if they differ. */
552 static int
553 equal_splits (const struct ccase *a, const struct ccase *b) 
554 {
555   return case_compare (a, b,
556                        dict_get_split_vars (default_dict),
557                        dict_get_split_cnt (default_dict)) == 0;
558 }
559 \f
560 /* Multipass procedure that separates the data into SPLIT FILE
561    groups. */
562
563 /* Represents auxiliary data for handling SPLIT FILE in a
564    multipass procedure. */
565 struct multipass_split_aux_data 
566   {
567     struct ccase prev_case;     /* Data in previous case. */
568     struct casefile *casefile;  /* Accumulates data for a split. */
569
570     /* Function to call with the accumulated data. */
571     bool (*split_func) (const struct ccase *first, const struct casefile *,
572                         void *);
573     void *func_aux;                            /* Auxiliary data. */ 
574   };
575
576 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
577 static bool multipass_split_end_func (void *aux_);
578 static bool multipass_split_output (struct multipass_split_aux_data *);
579
580 /* Returns true if successful, false if an I/O error occurred. */
581 bool
582 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
583                                                      const struct casefile *,
584                                                      void *aux),
585                                  void *func_aux)
586 {
587   struct multipass_split_aux_data aux;
588   bool ok;
589
590   case_nullify (&aux.prev_case);
591   aux.casefile = NULL;
592   aux.split_func = split_func;
593   aux.func_aux = func_aux;
594
595   ok = internal_procedure (multipass_split_case_func,
596                            multipass_split_end_func, &aux);
597   case_destroy (&aux.prev_case);
598
599   return ok;
600 }
601
602 /* Case callback used by multipass_procedure_with_splits(). */
603 static bool
604 multipass_split_case_func (const struct ccase *c, void *aux_)
605 {
606   struct multipass_split_aux_data *aux = aux_;
607   bool ok = true;
608
609   /* Start a new series if needed. */
610   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
611     {
612       /* Record split values. */
613       case_destroy (&aux->prev_case);
614       case_clone (&aux->prev_case, c);
615
616       /* Pass any cases to split_func. */
617       if (aux->casefile != NULL)
618         ok = multipass_split_output (aux);
619
620       /* Start a new casefile. */
621       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
622     }
623
624   return casefile_append (aux->casefile, c) && ok;
625 }
626
627 /* End-of-file callback used by multipass_procedure_with_splits(). */
628 static bool
629 multipass_split_end_func (void *aux_)
630 {
631   struct multipass_split_aux_data *aux = aux_;
632   return (aux->casefile == NULL || multipass_split_output (aux));
633 }
634
635 static bool
636 multipass_split_output (struct multipass_split_aux_data *aux)
637 {
638   bool ok;
639   
640   assert (aux->casefile != NULL);
641   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
642   casefile_destroy (aux->casefile);
643   aux->casefile = NULL;
644
645   return ok;
646 }
647 \f
648 /* Discards all the current state in preparation for a data-input
649    command like DATA LIST or GET. */
650 void
651 discard_variables (void)
652 {
653   dict_clear (default_dict);
654   fh_set_default_handle (NULL);
655
656   n_lag = 0;
657   
658   free_case_source (proc_source);
659   proc_source = NULL;
660
661   proc_cancel_all_transformations ();
662 }
663 \f
664 /* Returns the current set of permanent transformations,
665    and clears the permanent transformations.
666    For use by INPUT PROGRAM. */
667 struct trns_chain *
668 proc_capture_transformations (void) 
669 {
670   struct trns_chain *chain;
671   
672   assert (temporary_trns_chain == NULL);
673   chain = permanent_trns_chain;
674   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
675   return chain;
676 }
677
678 /* Adds a transformation that processes a case with PROC and
679    frees itself with FREE to the current set of transformations.
680    The functions are passed AUX as auxiliary data. */
681 void
682 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
683 {
684   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
685 }
686
687 /* Adds a transformation that processes a case with PROC and
688    frees itself with FREE to the current set of transformations.
689    When parsing of the block of transformations is complete,
690    FINALIZE will be called.
691    The functions are passed AUX as auxiliary data. */
692 void
693 add_transformation_with_finalizer (trns_finalize_func *finalize,
694                                    trns_proc_func *proc,
695                                    trns_free_func *free, void *aux)
696 {
697   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
698 }
699
700 /* Returns the index of the next transformation.
701    This value can be returned by a transformation procedure
702    function to indicate a "jump" to that transformation. */
703 size_t
704 next_transformation (void) 
705 {
706   return trns_chain_next (cur_trns_chain);
707 }
708
709 /* Returns true if the next call to add_transformation() will add
710    a temporary transformation, false if it will add a permanent
711    transformation. */
712 bool
713 proc_in_temporary_transformations (void) 
714 {
715   return temporary_trns_chain != NULL;
716 }
717
718 /* Marks the start of temporary transformations.
719    Further calls to add_transformation() will add temporary
720    transformations. */
721 void
722 proc_start_temporary_transformations (void) 
723 {
724   if (!proc_in_temporary_transformations ())
725     {
726       add_case_limit_trns ();
727
728       permanent_dict = dict_clone (default_dict);
729       trns_chain_finalize (permanent_trns_chain);
730       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
731     }
732 }
733
734 /* Converts all the temporary transformations, if any, to
735    permanent transformations.  Further transformations will be
736    permanent.
737    Returns true if anything changed, false otherwise. */
738 bool
739 proc_make_temporary_transformations_permanent (void) 
740 {
741   if (proc_in_temporary_transformations ()) 
742     {
743       trns_chain_finalize (temporary_trns_chain);
744       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
745       temporary_trns_chain = NULL;
746
747       dict_destroy (permanent_dict);
748       permanent_dict = NULL;
749
750       return true;
751     }
752   else
753     return false;
754 }
755
756 /* Cancels all temporary transformations, if any.  Further
757    transformations will be permanent.
758    Returns true if anything changed, false otherwise. */
759 bool
760 proc_cancel_temporary_transformations (void) 
761 {
762   if (proc_in_temporary_transformations ()) 
763     {
764       dict_destroy (default_dict);
765       default_dict = permanent_dict;
766       permanent_dict = NULL;
767
768       trns_chain_destroy (temporary_trns_chain);
769       temporary_trns_chain = NULL;
770
771       return true;
772     }
773   else
774     return false;
775 }
776
777 /* Cancels all transformations, if any.
778    Returns true if successful, false on I/O error. */
779 bool
780 proc_cancel_all_transformations (void)
781 {
782   bool ok;
783   ok = trns_chain_destroy (permanent_trns_chain);
784   ok = trns_chain_destroy (temporary_trns_chain) && ok;
785   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
786   temporary_trns_chain = NULL;
787   return ok;
788 }
789 \f
790 /* Initializes procedure handling. */
791 void
792 proc_init (void) 
793 {
794   default_dict = dict_create ();
795   proc_cancel_all_transformations ();
796 }
797
798 /* Finishes up procedure handling. */
799 void
800 proc_done (void)
801 {
802   discard_variables ();
803   dict_destroy (default_dict);
804 }
805
806 /* Sets SINK as the destination for procedure output from the
807    next procedure. */
808 void
809 proc_set_sink (struct case_sink *sink) 
810 {
811   assert (proc_sink == NULL);
812   proc_sink = sink;
813 }
814
815 /* Sets SOURCE as the source for procedure input for the next
816    procedure. */
817 void
818 proc_set_source (struct case_source *source) 
819 {
820   assert (proc_source == NULL);
821   proc_source = source;
822 }
823
824 /* Returns true if a source for the next procedure has been
825    configured, false otherwise. */
826 bool
827 proc_has_source (void) 
828 {
829   return proc_source != NULL;
830 }
831
832 /* Returns the output from the previous procedure.
833    For use only immediately after executing a procedure.
834    The returned casefile is owned by the caller; it will not be
835    automatically used for the next procedure's input. */
836 struct casefile *
837 proc_capture_output (void) 
838 {
839   struct casefile *casefile;
840
841   /* Try to make sure that this function is called immediately
842      after procedure() or a similar function. */
843   assert (proc_source != NULL);
844   assert (case_source_is_class (proc_source, &storage_source_class));
845   assert (trns_chain_is_empty (permanent_trns_chain));
846   assert (!proc_in_temporary_transformations ());
847
848   casefile = storage_source_decapsulate (proc_source);
849   proc_source = NULL;
850
851   return casefile;
852 }
853 \f
854 static trns_proc_func case_limit_trns_proc;
855 static trns_free_func case_limit_trns_free;
856
857 /* Adds a transformation that limits the number of cases that may
858    pass through, if default_dict has a case limit. */
859 static void
860 add_case_limit_trns (void) 
861 {
862   size_t case_limit = dict_get_case_limit (default_dict);
863   if (case_limit != 0)
864     {
865       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
866       *cases_remaining = case_limit;
867       add_transformation (case_limit_trns_proc, case_limit_trns_free,
868                           cases_remaining);
869       dict_set_case_limit (default_dict, 0);
870     }
871 }
872
873 /* Limits the maximum number of cases processed to
874    *CASES_REMAINING. */
875 static int
876 case_limit_trns_proc (void *cases_remaining_,
877                       struct ccase *c UNUSED, int case_nr UNUSED) 
878 {
879   size_t *cases_remaining = cases_remaining_;
880   if (*cases_remaining > 0) 
881     {
882       *cases_remaining--;
883       return TRNS_CONTINUE;
884     }
885   else
886     return TRNS_DROP_CASE;
887 }
888
889 /* Frees the data associated with a case limit transformation. */
890 static bool
891 case_limit_trns_free (void *cases_remaining_) 
892 {
893   size_t *cases_remaining = cases_remaining_;
894   free (cases_remaining);
895   return true;
896 }
897 \f
898 static trns_proc_func filter_trns_proc;
899
900 /* Adds a temporary transformation to filter data according to
901    the variable specified on FILTER, if any. */
902 static void
903 add_filter_trns (void) 
904 {
905   struct variable *filter_var = dict_get_filter (default_dict);
906   if (filter_var != NULL) 
907     {
908       proc_start_temporary_transformations ();
909       add_transformation (filter_trns_proc, NULL, filter_var);
910     }
911 }
912
913 /* FILTER transformation. */
914 static int
915 filter_trns_proc (void *filter_var_,
916                   struct ccase *c UNUSED, int case_nr UNUSED) 
917   
918 {
919   struct variable *filter_var = filter_var_;
920   double f = case_num (c, filter_var->fv);
921   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
922           ? TRNS_CONTINUE : TRNS_DROP_CASE);
923 }
924