a2ca8b23ee85f8b51285a6c8f61c422911f06638
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 /* Procedure execution data. */
42 struct write_case_data
43   {
44     /* Function to call for each case. */
45     bool (*case_func) (const struct ccase *, void *);
46     void *aux;
47
48     struct ccase trns_case;     /* Case used for transformations. */
49     struct ccase sink_case;     /* Case written to sink, if
50                                    compacting is necessary. */
51     size_t cases_written;       /* Cases output so far. */
52   };
53
54 /* Cases are read from proc_source,
55    pass through permanent_trns_chain (which transforms them into
56    the format described by permanent_dict),
57    are written to proc_sink,
58    pass through temporary_trns_chain (which transforms them into
59    the format described by default_dict),
60    and are finally passed to the procedure. */
61 static struct case_source *proc_source;
62 static struct trns_chain *permanent_trns_chain;
63 static struct dictionary *permanent_dict;
64 static struct case_sink *proc_sink;
65 static struct trns_chain *temporary_trns_chain;
66 struct dictionary *default_dict;
67
68 /* The transformation chain that the next transformation will be
69    added to. */
70 static struct trns_chain *cur_trns_chain;
71
72 /* The compactor used to compact a case, if necessary;
73    otherwise a null pointer. */
74 static struct dict_compactor *compactor;
75
76 /* Time at which proc was last invoked. */
77 static time_t last_proc_invocation;
78
79 /* Lag queue. */
80 int n_lag;                      /* Number of cases to lag. */
81 static int lag_count;           /* Number of cases in lag_queue so far. */
82 static int lag_head;            /* Index where next case will be added. */
83 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
84
85 static void add_case_limit_trns (void);
86 static void add_filter_trns (void);
87
88 static bool internal_procedure (bool (*case_func) (const struct ccase *,
89                                                    void *),
90                                 bool (*end_func) (void *),
91                                 void *aux);
92 static void update_last_proc_invocation (void);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static bool write_case (struct write_case_data *wc_data);
96 static void lag_case (const struct ccase *c);
97 static void clear_case (struct ccase *c);
98 static bool close_active_file (void);
99 \f
100 /* Public functions. */
101
102 /* Returns the last time the data was read. */
103 time_t
104 time_of_last_procedure (void) 
105 {
106   if (last_proc_invocation == 0)
107     update_last_proc_invocation ();
108   return last_proc_invocation;
109 }
110 \f
111 /* Regular procedure. */
112
113 /* Reads the data from the input program and writes it to a new
114    active file.  For each case we read from the input program, we
115    do the following:
116
117    1. Execute permanent transformations.  If these drop the case,
118       start the next case from step 1.
119
120    2. Write case to replacement active file.
121    
122    3. Execute temporary transformations.  If these drop the case,
123       start the next case from step 1.
124       
125    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
126
127    Returns true if successful, false if an I/O error occurred. */
128 bool
129 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
130 {
131   return internal_procedure (proc_func, NULL, aux);
132 }
133 \f
134 /* Multipass procedure. */
135
136 struct multipass_aux_data 
137   {
138     struct casefile *casefile;
139     
140     bool (*proc_func) (const struct casefile *, void *aux);
141     void *aux;
142   };
143
144 /* Case processing function for multipass_procedure(). */
145 static bool
146 multipass_case_func (const struct ccase *c, void *aux_data_) 
147 {
148   struct multipass_aux_data *aux_data = aux_data_;
149   return casefile_append (aux_data->casefile, c);
150 }
151
152 /* End-of-file function for multipass_procedure(). */
153 static bool
154 multipass_end_func (void *aux_data_) 
155 {
156   struct multipass_aux_data *aux_data = aux_data_;
157   return (aux_data->proc_func == NULL
158           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
159 }
160
161 /* Procedure that allows multiple passes over the input data.
162    The entire active file is passed to PROC_FUNC, with the given
163    AUX as auxiliary data, as a unit. */
164 bool
165 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
166                      void *aux) 
167 {
168   struct multipass_aux_data aux_data;
169   bool ok;
170
171   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
172   aux_data.proc_func = proc_func;
173   aux_data.aux = aux;
174
175   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
176   ok = !casefile_error (aux_data.casefile) && ok;
177
178   casefile_destroy (aux_data.casefile);
179
180   return ok;
181 }
182 \f
183 /* Procedure implementation. */
184
185 /* Executes a procedure.
186    Passes each case to CASE_FUNC.
187    Calls END_FUNC after the last case.
188    Returns true if successful, false if an I/O error occurred (or
189    if CASE_FUNC or END_FUNC ever returned false). */
190 static bool
191 internal_procedure (bool (*case_func) (const struct ccase *, void *),
192                     bool (*end_func) (void *),
193                     void *aux) 
194 {
195   struct write_case_data wc_data;
196   bool ok = true;
197
198   assert (proc_source != NULL);
199
200   update_last_proc_invocation ();
201
202   /* Optimize the trivial case where we're not going to do
203      anything with the data, by not reading the data at all. */
204   if (case_func == NULL && end_func == NULL
205       && case_source_is_class (proc_source, &storage_source_class)
206       && proc_sink == NULL
207       && (temporary_trns_chain == NULL
208           || trns_chain_is_empty (temporary_trns_chain))
209       && trns_chain_is_empty (permanent_trns_chain))
210     {
211       n_lag = 0;
212       dict_set_case_limit (default_dict, 0);
213       dict_clear_vectors (default_dict);
214       return true;
215     }
216   
217   open_active_file ();
218   
219   wc_data.case_func = case_func;
220   wc_data.aux = aux;
221   create_trns_case (&wc_data.trns_case, default_dict);
222   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
223   wc_data.cases_written = 0;
224
225   ok = proc_source->class->read (proc_source,
226                                  &wc_data.trns_case,
227                                  write_case, &wc_data) && ok;
228   if (end_func != NULL)
229     ok = end_func (aux) && ok;
230
231   case_destroy (&wc_data.sink_case);
232   case_destroy (&wc_data.trns_case);
233
234   ok = close_active_file () && ok;
235
236   return ok;
237 }
238
239 /* Updates last_proc_invocation. */
240 static void
241 update_last_proc_invocation (void) 
242 {
243   last_proc_invocation = time (NULL);
244 }
245
246 /* Creates and returns a case, initializing it from the vectors
247    that say which `value's need to be initialized just once, and
248    which ones need to be re-initialized before every case. */
249 static void
250 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
251 {
252   size_t var_cnt = dict_get_var_cnt (dict);
253   size_t i;
254
255   case_create (trns_case, dict_get_next_value_idx (dict));
256   for (i = 0; i < var_cnt; i++) 
257     {
258       struct variable *v = dict_get_var (dict, i);
259       union value *value = case_data_rw (trns_case, v->fv);
260
261       if (v->type == NUMERIC)
262         value->f = v->leave ? 0.0 : SYSMIS;
263       else
264         memset (value->s, ' ', v->width);
265     }
266 }
267
268 /* Makes all preparations for reading from the data source and writing
269    to the data sink. */
270 static void
271 open_active_file (void)
272 {
273   add_case_limit_trns ();
274   add_filter_trns ();
275
276   /* Finalize transformations. */
277   trns_chain_finalize (cur_trns_chain);
278
279   /* Make permanent_dict refer to the dictionary right before
280      data reaches the sink. */
281   if (permanent_dict == NULL)
282     permanent_dict = default_dict;
283
284   /* Figure out whether to compact. */
285   compactor = (dict_compacting_would_shrink (permanent_dict)
286                ? dict_make_compactor (permanent_dict)
287                : NULL);
288
289   /* Prepare sink. */
290   if (proc_sink == NULL)
291     proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
292   if (proc_sink->class->open != NULL)
293     proc_sink->class->open (proc_sink);
294
295   /* Allocate memory for lag queue. */
296   if (n_lag > 0)
297     {
298       int i;
299   
300       lag_count = 0;
301       lag_head = 0;
302       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
303       for (i = 0; i < n_lag; i++)
304         case_nullify (&lag_queue[i]);
305     }
306 }
307
308 /* Transforms trns_case and writes it to the replacement active
309    file if advisable.  Returns true if more cases can be
310    accepted, false otherwise.  Do not call this function again
311    after it has returned false once.  */
312 static bool
313 write_case (struct write_case_data *wc_data)
314 {
315   enum trns_result retval;
316   size_t case_nr;
317   
318   /* Execute permanent transformations.  */
319   case_nr = wc_data->cases_written + 1;
320   retval = trns_chain_execute (permanent_trns_chain,
321                                &wc_data->trns_case, &case_nr);
322   if (retval != TRNS_CONTINUE)
323     goto done;
324
325   /* Write case to LAG queue. */
326   if (n_lag)
327     lag_case (&wc_data->trns_case);
328
329   /* Write case to replacement active file. */
330   wc_data->cases_written++;
331   if (proc_sink->class->write != NULL) 
332     {
333       if (compactor != NULL) 
334         {
335           dict_compactor_compact (compactor, &wc_data->sink_case,
336                                   &wc_data->trns_case);
337           proc_sink->class->write (proc_sink, &wc_data->sink_case);
338         }
339       else
340         proc_sink->class->write (proc_sink, &wc_data->trns_case);
341     }
342   
343   /* Execute temporary transformations. */
344   if (temporary_trns_chain != NULL) 
345     {
346       retval = trns_chain_execute (temporary_trns_chain,
347                                    &wc_data->trns_case,
348                                    &wc_data->cases_written);
349       if (retval != TRNS_CONTINUE)
350         goto done;
351     }
352
353   /* Pass case to procedure. */
354   if (wc_data->case_func != NULL)
355     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
356       retval = TRNS_ERROR;
357
358  done:
359   clear_case (&wc_data->trns_case);
360   return retval != TRNS_ERROR;
361 }
362
363 /* Add C to the lag queue. */
364 static void
365 lag_case (const struct ccase *c)
366 {
367   if (lag_count < n_lag)
368     lag_count++;
369   case_destroy (&lag_queue[lag_head]);
370   case_clone (&lag_queue[lag_head], c);
371   if (++lag_head >= n_lag)
372     lag_head = 0;
373 }
374
375 /* Clears the variables in C that need to be cleared between
376    processing cases.  */
377 static void
378 clear_case (struct ccase *c)
379 {
380   size_t var_cnt = dict_get_var_cnt (default_dict);
381   size_t i;
382   
383   for (i = 0; i < var_cnt; i++) 
384     {
385       struct variable *v = dict_get_var (default_dict, i);
386       if (!v->leave) 
387         {
388           if (v->type == NUMERIC)
389             case_data_rw (c, v->fv)->f = SYSMIS;
390           else
391             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
392         } 
393     }
394 }
395
396 /* Closes the active file. */
397 static bool
398 close_active_file (void)
399 {
400   /* Free memory for lag queue, and turn off lagging. */
401   if (n_lag > 0)
402     {
403       int i;
404       
405       for (i = 0; i < n_lag; i++)
406         case_destroy (&lag_queue[i]);
407       free (lag_queue);
408       n_lag = 0;
409     }
410   
411   /* Dictionary from before TEMPORARY becomes permanent. */
412   proc_cancel_temporary_transformations ();
413
414   /* Finish compacting. */
415   if (compactor != NULL) 
416     {
417       dict_compactor_destroy (compactor);
418       dict_compact_values (default_dict);
419       compactor = NULL;
420     }
421     
422   /* Free data source. */
423   free_case_source (proc_source);
424   proc_source = NULL;
425
426   /* Old data sink becomes new data source. */
427   if (proc_sink->class->make_source != NULL)
428     proc_source = proc_sink->class->make_source (proc_sink);
429   free_case_sink (proc_sink);
430   proc_sink = NULL;
431
432   dict_clear_vectors (default_dict);
433   permanent_dict = NULL;
434   return proc_cancel_all_transformations ();
435 }
436 \f
437 /* Returns a pointer to the lagged case from N_BEFORE cases before the
438    current one, or NULL if there haven't been that many cases yet. */
439 struct ccase *
440 lagged_case (int n_before)
441 {
442   assert (n_before >= 1 );
443   assert (n_before <= n_lag);
444
445   if (n_before <= lag_count)
446     {
447       int index = lag_head - n_before;
448       if (index < 0)
449         index += n_lag;
450       return &lag_queue[index];
451     }
452   else
453     return NULL;
454 }
455 \f
456 /* Procedure that separates the data into SPLIT FILE groups. */
457
458 /* Represents auxiliary data for handling SPLIT FILE. */
459 struct split_aux_data 
460   {
461     struct ccase prev_case;     /* Data in previous case. */
462
463     /* Callback functions. */
464     void (*begin_func) (const struct ccase *, void *);
465     bool (*proc_func) (const struct ccase *, void *);
466     void (*end_func) (void *);
467     void *func_aux;
468   };
469
470 static int equal_splits (const struct ccase *, const struct ccase *);
471 static bool split_procedure_case_func (const struct ccase *c, void *);
472 static bool split_procedure_end_func (void *);
473
474 /* Like procedure(), but it automatically breaks the case stream
475    into SPLIT FILE break groups.  Before each group of cases with
476    identical SPLIT FILE variable values, BEGIN_FUNC is called
477    with the first case in the group.
478    Then PROC_FUNC is called for each case in the group (including
479    the first).
480    END_FUNC is called when the group is finished.  FUNC_AUX is
481    passed to each of the functions as auxiliary data.
482
483    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
484    and END_FUNC will be called at all. 
485
486    If SPLIT FILE is not in effect, then there is one break group
487    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
488    will be called once.
489    
490    Returns true if successful, false if an I/O error occurred. */
491 bool
492 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
493                        bool (*proc_func) (const struct ccase *, void *aux),
494                        void (*end_func) (void *aux),
495                        void *func_aux) 
496 {
497   struct split_aux_data split_aux;
498   bool ok;
499
500   case_nullify (&split_aux.prev_case);
501   split_aux.begin_func = begin_func;
502   split_aux.proc_func = proc_func;
503   split_aux.end_func = end_func;
504   split_aux.func_aux = func_aux;
505
506   ok = internal_procedure (split_procedure_case_func,
507                            split_procedure_end_func, &split_aux);
508
509   case_destroy (&split_aux.prev_case);
510
511   return ok;
512 }
513
514 /* Case callback used by procedure_with_splits(). */
515 static bool
516 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
517 {
518   struct split_aux_data *split_aux = split_aux_;
519
520   /* Start a new series if needed. */
521   if (case_is_null (&split_aux->prev_case)
522       || !equal_splits (c, &split_aux->prev_case))
523     {
524       if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
525         split_aux->end_func (split_aux->func_aux);
526
527       case_destroy (&split_aux->prev_case);
528       case_clone (&split_aux->prev_case, c);
529
530       if (split_aux->begin_func != NULL)
531         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
532     }
533
534   return (split_aux->proc_func == NULL
535           || split_aux->proc_func (c, split_aux->func_aux));
536 }
537
538 /* End-of-file callback used by procedure_with_splits(). */
539 static bool
540 split_procedure_end_func (void *split_aux_) 
541 {
542   struct split_aux_data *split_aux = split_aux_;
543
544   if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
545     split_aux->end_func (split_aux->func_aux);
546   return true;
547 }
548
549 /* Compares the SPLIT FILE variables in cases A and B and returns
550    nonzero only if they differ. */
551 static int
552 equal_splits (const struct ccase *a, const struct ccase *b) 
553 {
554   return case_compare (a, b,
555                        dict_get_split_vars (default_dict),
556                        dict_get_split_cnt (default_dict)) == 0;
557 }
558 \f
559 /* Multipass procedure that separates the data into SPLIT FILE
560    groups. */
561
562 /* Represents auxiliary data for handling SPLIT FILE in a
563    multipass procedure. */
564 struct multipass_split_aux_data 
565   {
566     struct ccase prev_case;     /* Data in previous case. */
567     struct casefile *casefile;  /* Accumulates data for a split. */
568
569     /* Function to call with the accumulated data. */
570     bool (*split_func) (const struct ccase *first, const struct casefile *,
571                         void *);
572     void *func_aux;                            /* Auxiliary data. */ 
573   };
574
575 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
576 static bool multipass_split_end_func (void *aux_);
577 static bool multipass_split_output (struct multipass_split_aux_data *);
578
579 /* Returns true if successful, false if an I/O error occurred. */
580 bool
581 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
582                                                      const struct casefile *,
583                                                      void *aux),
584                                  void *func_aux)
585 {
586   struct multipass_split_aux_data aux;
587   bool ok;
588
589   case_nullify (&aux.prev_case);
590   aux.casefile = NULL;
591   aux.split_func = split_func;
592   aux.func_aux = func_aux;
593
594   ok = internal_procedure (multipass_split_case_func,
595                            multipass_split_end_func, &aux);
596   case_destroy (&aux.prev_case);
597
598   return ok;
599 }
600
601 /* Case callback used by multipass_procedure_with_splits(). */
602 static bool
603 multipass_split_case_func (const struct ccase *c, void *aux_)
604 {
605   struct multipass_split_aux_data *aux = aux_;
606   bool ok = true;
607
608   /* Start a new series if needed. */
609   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
610     {
611       /* Record split values. */
612       case_destroy (&aux->prev_case);
613       case_clone (&aux->prev_case, c);
614
615       /* Pass any cases to split_func. */
616       if (aux->casefile != NULL)
617         ok = multipass_split_output (aux);
618
619       /* Start a new casefile. */
620       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
621     }
622
623   return casefile_append (aux->casefile, c) && ok;
624 }
625
626 /* End-of-file callback used by multipass_procedure_with_splits(). */
627 static bool
628 multipass_split_end_func (void *aux_)
629 {
630   struct multipass_split_aux_data *aux = aux_;
631   return (aux->casefile == NULL || multipass_split_output (aux));
632 }
633
634 static bool
635 multipass_split_output (struct multipass_split_aux_data *aux)
636 {
637   bool ok;
638   
639   assert (aux->casefile != NULL);
640   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
641   casefile_destroy (aux->casefile);
642   aux->casefile = NULL;
643
644   return ok;
645 }
646 \f
647 /* Discards all the current state in preparation for a data-input
648    command like DATA LIST or GET. */
649 void
650 discard_variables (void)
651 {
652   dict_clear (default_dict);
653   fh_set_default_handle (NULL);
654
655   n_lag = 0;
656   
657   free_case_source (proc_source);
658   proc_source = NULL;
659
660   proc_cancel_all_transformations ();
661 }
662 \f
663 /* Returns the current set of permanent transformations,
664    and clears the permanent transformations.
665    For use by INPUT PROGRAM. */
666 struct trns_chain *
667 proc_capture_transformations (void) 
668 {
669   struct trns_chain *chain;
670   
671   assert (temporary_trns_chain == NULL);
672   chain = permanent_trns_chain;
673   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
674   return chain;
675 }
676
677 /* Adds a transformation that processes a case with PROC and
678    frees itself with FREE to the current set of transformations.
679    The functions are passed AUX as auxiliary data. */
680 void
681 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
682 {
683   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
684 }
685
686 /* Adds a transformation that processes a case with PROC and
687    frees itself with FREE to the current set of transformations.
688    When parsing of the block of transformations is complete,
689    FINALIZE will be called.
690    The functions are passed AUX as auxiliary data. */
691 void
692 add_transformation_with_finalizer (trns_finalize_func *finalize,
693                                    trns_proc_func *proc,
694                                    trns_free_func *free, void *aux)
695 {
696   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
697 }
698
699 /* Returns the index of the next transformation.
700    This value can be returned by a transformation procedure
701    function to indicate a "jump" to that transformation. */
702 size_t
703 next_transformation (void) 
704 {
705   return trns_chain_next (cur_trns_chain);
706 }
707
708 /* Returns true if the next call to add_transformation() will add
709    a temporary transformation, false if it will add a permanent
710    transformation. */
711 bool
712 proc_in_temporary_transformations (void) 
713 {
714   return temporary_trns_chain != NULL;
715 }
716
717 /* Marks the start of temporary transformations.
718    Further calls to add_transformation() will add temporary
719    transformations. */
720 void
721 proc_start_temporary_transformations (void) 
722 {
723   if (!proc_in_temporary_transformations ())
724     {
725       add_case_limit_trns ();
726
727       permanent_dict = dict_clone (default_dict);
728       trns_chain_finalize (permanent_trns_chain);
729       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
730     }
731 }
732
733 /* Converts all the temporary transformations, if any, to
734    permanent transformations.  Further transformations will be
735    permanent.
736    Returns true if anything changed, false otherwise. */
737 bool
738 proc_make_temporary_transformations_permanent (void) 
739 {
740   if (proc_in_temporary_transformations ()) 
741     {
742       trns_chain_finalize (temporary_trns_chain);
743       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
744       temporary_trns_chain = NULL;
745
746       dict_destroy (permanent_dict);
747       permanent_dict = NULL;
748
749       return true;
750     }
751   else
752     return false;
753 }
754
755 /* Cancels all temporary transformations, if any.  Further
756    transformations will be permanent.
757    Returns true if anything changed, false otherwise. */
758 bool
759 proc_cancel_temporary_transformations (void) 
760 {
761   if (proc_in_temporary_transformations ()) 
762     {
763       dict_destroy (default_dict);
764       default_dict = permanent_dict;
765       permanent_dict = NULL;
766
767       trns_chain_destroy (temporary_trns_chain);
768       temporary_trns_chain = NULL;
769
770       return true;
771     }
772   else
773     return false;
774 }
775
776 /* Cancels all transformations, if any.
777    Returns true if successful, false on I/O error. */
778 bool
779 proc_cancel_all_transformations (void)
780 {
781   bool ok;
782   ok = trns_chain_destroy (permanent_trns_chain);
783   ok = trns_chain_destroy (temporary_trns_chain) && ok;
784   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
785   temporary_trns_chain = NULL;
786   return ok;
787 }
788 \f
789 /* Initializes procedure handling. */
790 void
791 proc_init (void) 
792 {
793   default_dict = dict_create ();
794   proc_cancel_all_transformations ();
795 }
796
797 /* Finishes up procedure handling. */
798 void
799 proc_done (void)
800 {
801   discard_variables ();
802   dict_destroy (default_dict);
803 }
804
805 /* Sets SINK as the destination for procedure output from the
806    next procedure. */
807 void
808 proc_set_sink (struct case_sink *sink) 
809 {
810   assert (proc_sink == NULL);
811   proc_sink = sink;
812 }
813
814 /* Sets SOURCE as the source for procedure input for the next
815    procedure. */
816 void
817 proc_set_source (struct case_source *source) 
818 {
819   assert (proc_source == NULL);
820   proc_source = source;
821 }
822
823 /* Returns true if a source for the next procedure has been
824    configured, false otherwise. */
825 bool
826 proc_has_source (void) 
827 {
828   return proc_source != NULL;
829 }
830
831 /* Returns the output from the previous procedure.
832    For use only immediately after executing a procedure.
833    The returned casefile is owned by the caller; it will not be
834    automatically used for the next procedure's input. */
835 struct casefile *
836 proc_capture_output (void) 
837 {
838   struct casefile *casefile;
839
840   /* Try to make sure that this function is called immediately
841      after procedure() or a similar function. */
842   assert (proc_source != NULL);
843   assert (case_source_is_class (proc_source, &storage_source_class));
844   assert (trns_chain_is_empty (permanent_trns_chain));
845   assert (!proc_in_temporary_transformations ());
846
847   casefile = storage_source_decapsulate (proc_source);
848   proc_source = NULL;
849
850   return casefile;
851 }
852 \f
853 static trns_proc_func case_limit_trns_proc;
854 static trns_free_func case_limit_trns_free;
855
856 /* Adds a transformation that limits the number of cases that may
857    pass through, if default_dict has a case limit. */
858 static void
859 add_case_limit_trns (void) 
860 {
861   size_t case_limit = dict_get_case_limit (default_dict);
862   if (case_limit != 0)
863     {
864       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
865       *cases_remaining = case_limit;
866       add_transformation (case_limit_trns_proc, case_limit_trns_free,
867                           cases_remaining);
868       dict_set_case_limit (default_dict, 0);
869     }
870 }
871
872 /* Limits the maximum number of cases processed to
873    *CASES_REMAINING. */
874 static int
875 case_limit_trns_proc (void *cases_remaining_,
876                       struct ccase *c UNUSED, int case_nr UNUSED) 
877 {
878   size_t *cases_remaining = cases_remaining_;
879   if (*cases_remaining > 0) 
880     {
881       *cases_remaining--;
882       return TRNS_CONTINUE;
883     }
884   else
885     return TRNS_DROP_CASE;
886 }
887
888 /* Frees the data associated with a case limit transformation. */
889 static bool
890 case_limit_trns_free (void *cases_remaining_) 
891 {
892   size_t *cases_remaining = cases_remaining_;
893   free (cases_remaining);
894   return true;
895 }
896 \f
897 static trns_proc_func filter_trns_proc;
898
899 /* Adds a temporary transformation to filter data according to
900    the variable specified on FILTER, if any. */
901 static void
902 add_filter_trns (void) 
903 {
904   struct variable *filter_var = dict_get_filter (default_dict);
905   if (filter_var != NULL) 
906     {
907       proc_start_temporary_transformations ();
908       add_transformation (filter_trns_proc, NULL, filter_var);
909     }
910 }
911
912 /* FILTER transformation. */
913 static int
914 filter_trns_proc (void *filter_var_,
915                   struct ccase *c UNUSED, int case_nr UNUSED) 
916   
917 {
918   struct variable *filter_var = filter_var_;
919   double f = case_num (c, filter_var->fv);
920   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
921           ? TRNS_CONTINUE : TRNS_DROP_CASE);
922 }
923