Make dictionary compacting functions a little more general.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 /* Procedure execution data. */
42 struct write_case_data
43   {
44     /* Function to call for each case. */
45     bool (*case_func) (const struct ccase *, void *);
46     void *aux;
47
48     struct ccase trns_case;     /* Case used for transformations. */
49     struct ccase sink_case;     /* Case written to sink, if
50                                    compacting is necessary. */
51     size_t cases_written;       /* Cases output so far. */
52   };
53
54 /* Cases are read from proc_source,
55    pass through permanent_trns_chain (which transforms them into
56    the format described by permanent_dict),
57    are written to proc_sink,
58    pass through temporary_trns_chain (which transforms them into
59    the format described by default_dict),
60    and are finally passed to the procedure. */
61 static struct case_source *proc_source;
62 static struct trns_chain *permanent_trns_chain;
63 static struct dictionary *permanent_dict;
64 static struct case_sink *proc_sink;
65 static struct trns_chain *temporary_trns_chain;
66 struct dictionary *default_dict;
67
68 /* The transformation chain that the next transformation will be
69    added to. */
70 static struct trns_chain *cur_trns_chain;
71
72 /* The compactor used to compact a case, if necessary;
73    otherwise a null pointer. */
74 static struct dict_compactor *compactor;
75
76 /* Time at which proc was last invoked. */
77 static time_t last_proc_invocation;
78
79 /* Lag queue. */
80 int n_lag;                      /* Number of cases to lag. */
81 static int lag_count;           /* Number of cases in lag_queue so far. */
82 static int lag_head;            /* Index where next case will be added. */
83 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
84
85 static void add_case_limit_trns (void);
86 static void add_filter_trns (void);
87
88 static bool internal_procedure (bool (*case_func) (const struct ccase *,
89                                                    void *),
90                                 bool (*end_func) (void *),
91                                 void *aux);
92 static void update_last_proc_invocation (void);
93 static void create_trns_case (struct ccase *, struct dictionary *);
94 static void open_active_file (void);
95 static bool write_case (struct write_case_data *wc_data);
96 static void lag_case (const struct ccase *c);
97 static void clear_case (struct ccase *c);
98 static bool close_active_file (void);
99 \f
100 /* Public functions. */
101
102 /* Returns the last time the data was read. */
103 time_t
104 time_of_last_procedure (void) 
105 {
106   if (last_proc_invocation == 0)
107     update_last_proc_invocation ();
108   return last_proc_invocation;
109 }
110 \f
111 /* Regular procedure. */
112
113 /* Reads the data from the input program and writes it to a new
114    active file.  For each case we read from the input program, we
115    do the following:
116
117    1. Execute permanent transformations.  If these drop the case,
118       start the next case from step 1.
119
120    2. Write case to replacement active file.
121    
122    3. Execute temporary transformations.  If these drop the case,
123       start the next case from step 1.
124       
125    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
126
127    Returns true if successful, false if an I/O error occurred. */
128 bool
129 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
130 {
131   return internal_procedure (proc_func, NULL, aux);
132 }
133 \f
134 /* Multipass procedure. */
135
136 struct multipass_aux_data 
137   {
138     struct casefile *casefile;
139     
140     bool (*proc_func) (const struct casefile *, void *aux);
141     void *aux;
142   };
143
144 /* Case processing function for multipass_procedure(). */
145 static bool
146 multipass_case_func (const struct ccase *c, void *aux_data_) 
147 {
148   struct multipass_aux_data *aux_data = aux_data_;
149   return casefile_append (aux_data->casefile, c);
150 }
151
152 /* End-of-file function for multipass_procedure(). */
153 static bool
154 multipass_end_func (void *aux_data_) 
155 {
156   struct multipass_aux_data *aux_data = aux_data_;
157   return (aux_data->proc_func == NULL
158           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
159 }
160
161 /* Procedure that allows multiple passes over the input data.
162    The entire active file is passed to PROC_FUNC, with the given
163    AUX as auxiliary data, as a unit. */
164 bool
165 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
166                      void *aux) 
167 {
168   struct multipass_aux_data aux_data;
169   bool ok;
170
171   aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
172   aux_data.proc_func = proc_func;
173   aux_data.aux = aux;
174
175   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
176   ok = !casefile_error (aux_data.casefile) && ok;
177
178   casefile_destroy (aux_data.casefile);
179
180   return ok;
181 }
182 \f
183 /* Procedure implementation. */
184
185 /* Executes a procedure.
186    Passes each case to CASE_FUNC.
187    Calls END_FUNC after the last case.
188    Returns true if successful, false if an I/O error occurred (or
189    if CASE_FUNC or END_FUNC ever returned false). */
190 static bool
191 internal_procedure (bool (*case_func) (const struct ccase *, void *),
192                     bool (*end_func) (void *),
193                     void *aux) 
194 {
195   struct write_case_data wc_data;
196   bool ok = true;
197
198   assert (proc_source != NULL);
199
200   update_last_proc_invocation ();
201
202   /* Optimize the trivial case where we're not going to do
203      anything with the data, by not reading the data at all. */
204   if (case_func == NULL && end_func == NULL
205       && case_source_is_class (proc_source, &storage_source_class)
206       && proc_sink == NULL
207       && (temporary_trns_chain == NULL
208           || trns_chain_is_empty (temporary_trns_chain))
209       && trns_chain_is_empty (permanent_trns_chain))
210     {
211       n_lag = 0;
212       dict_set_case_limit (default_dict, 0);
213       dict_clear_vectors (default_dict);
214       return true;
215     }
216   
217   open_active_file ();
218   
219   wc_data.case_func = case_func;
220   wc_data.aux = aux;
221   create_trns_case (&wc_data.trns_case, default_dict);
222   case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
223   wc_data.cases_written = 0;
224
225   ok = proc_source->class->read (proc_source,
226                                  &wc_data.trns_case,
227                                  write_case, &wc_data) && ok;
228   if (end_func != NULL)
229     ok = end_func (aux) && ok;
230
231   case_destroy (&wc_data.sink_case);
232   case_destroy (&wc_data.trns_case);
233
234   ok = close_active_file () && ok;
235
236   return ok;
237 }
238
239 /* Updates last_proc_invocation. */
240 static void
241 update_last_proc_invocation (void) 
242 {
243   last_proc_invocation = time (NULL);
244 }
245
246 /* Creates and returns a case, initializing it from the vectors
247    that say which `value's need to be initialized just once, and
248    which ones need to be re-initialized before every case. */
249 static void
250 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
251 {
252   size_t var_cnt = dict_get_var_cnt (dict);
253   size_t i;
254
255   case_create (trns_case, dict_get_next_value_idx (dict));
256   for (i = 0; i < var_cnt; i++) 
257     {
258       struct variable *v = dict_get_var (dict, i);
259       union value *value = case_data_rw (trns_case, v->fv);
260
261       if (v->type == NUMERIC)
262         value->f = v->leave ? 0.0 : SYSMIS;
263       else
264         memset (value->s, ' ', v->width);
265     }
266 }
267
268 /* Makes all preparations for reading from the data source and writing
269    to the data sink. */
270 static void
271 open_active_file (void)
272 {
273   add_case_limit_trns ();
274   add_filter_trns ();
275
276   /* Finalize transformations. */
277   trns_chain_finalize (cur_trns_chain);
278
279   /* Make permanent_dict refer to the dictionary right before
280      data reaches the sink. */
281   if (permanent_dict == NULL)
282     permanent_dict = default_dict;
283
284   /* Figure out whether to compact. */
285   compactor = (dict_compacting_would_shrink (permanent_dict)
286                ? dict_make_compactor (permanent_dict)
287                : NULL);
288
289   /* Prepare sink. */
290   if (proc_sink == NULL)
291     proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
292   if (proc_sink->class->open != NULL)
293     proc_sink->class->open (proc_sink);
294
295   /* Allocate memory for lag queue. */
296   if (n_lag > 0)
297     {
298       int i;
299   
300       lag_count = 0;
301       lag_head = 0;
302       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
303       for (i = 0; i < n_lag; i++)
304         case_nullify (&lag_queue[i]);
305     }
306 }
307
308 /* Transforms trns_case and writes it to the replacement active
309    file if advisable.  Returns true if more cases can be
310    accepted, false otherwise.  Do not call this function again
311    after it has returned false once.  */
312 static bool
313 write_case (struct write_case_data *wc_data)
314 {
315   enum trns_result retval;
316   size_t case_nr;
317   
318   /* Execute permanent transformations.  */
319   case_nr = wc_data->cases_written + 1;
320   retval = trns_chain_execute (permanent_trns_chain,
321                                &wc_data->trns_case, &case_nr);
322   if (retval != TRNS_CONTINUE)
323     goto done;
324
325   /* Write case to LAG queue. */
326   if (n_lag)
327     lag_case (&wc_data->trns_case);
328
329   /* Write case to replacement active file. */
330   wc_data->cases_written++;
331   if (proc_sink->class->write != NULL) 
332     {
333       if (compactor != NULL) 
334         {
335           dict_compactor_compact (compactor, &wc_data->sink_case,
336                                   &wc_data->trns_case);
337           proc_sink->class->write (proc_sink, &wc_data->sink_case);
338         }
339       else
340         proc_sink->class->write (proc_sink, &wc_data->trns_case);
341     }
342   
343   /* Execute temporary transformations. */
344   if (temporary_trns_chain != NULL) 
345     {
346       retval = trns_chain_execute (temporary_trns_chain,
347                                    &wc_data->trns_case,
348                                    &wc_data->cases_written);
349       if (retval != TRNS_CONTINUE)
350         goto done;
351     }
352
353   /* Pass case to procedure. */
354   if (wc_data->case_func != NULL)
355     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
356       retval = TRNS_ERROR;
357
358  done:
359   clear_case (&wc_data->trns_case);
360   return retval != TRNS_ERROR;
361 }
362
363 /* Add C to the lag queue. */
364 static void
365 lag_case (const struct ccase *c)
366 {
367   if (lag_count < n_lag)
368     lag_count++;
369   case_destroy (&lag_queue[lag_head]);
370   case_clone (&lag_queue[lag_head], c);
371   if (++lag_head >= n_lag)
372     lag_head = 0;
373 }
374
375 /* Clears the variables in C that need to be cleared between
376    processing cases.  */
377 static void
378 clear_case (struct ccase *c)
379 {
380   size_t var_cnt = dict_get_var_cnt (default_dict);
381   size_t i;
382   
383   for (i = 0; i < var_cnt; i++) 
384     {
385       struct variable *v = dict_get_var (default_dict, i);
386       if (!v->leave) 
387         {
388           if (v->type == NUMERIC)
389             case_data_rw (c, v->fv)->f = SYSMIS;
390           else
391             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
392         } 
393     }
394 }
395
396 /* Closes the active file. */
397 static bool
398 close_active_file (void)
399 {
400   /* Free memory for lag queue, and turn off lagging. */
401   if (n_lag > 0)
402     {
403       int i;
404       
405       for (i = 0; i < n_lag; i++)
406         case_destroy (&lag_queue[i]);
407       free (lag_queue);
408       n_lag = 0;
409     }
410   
411   /* Dictionary from before TEMPORARY becomes permanent. */
412   proc_cancel_temporary_transformations ();
413
414   /* Finish compacting. */
415   if (compactor != NULL) 
416     {
417       dict_compactor_destroy (compactor);
418       dict_compact_values (default_dict);
419       compactor = NULL;
420     }
421     
422   /* Free data source. */
423   free_case_source (proc_source);
424   proc_source = NULL;
425
426   /* Old data sink becomes new data source. */
427   if (proc_sink->class->make_source != NULL)
428     proc_source = proc_sink->class->make_source (proc_sink);
429   free_case_sink (proc_sink);
430   proc_sink = NULL;
431
432   dict_clear_vectors (default_dict);
433   permanent_dict = NULL;
434   return proc_cancel_all_transformations ();
435 }
436 \f
437 /* Returns a pointer to the lagged case from N_BEFORE cases before the
438    current one, or NULL if there haven't been that many cases yet. */
439 struct ccase *
440 lagged_case (int n_before)
441 {
442   assert (n_before >= 1 );
443   assert (n_before <= n_lag);
444
445   if (n_before <= lag_count)
446     {
447       int index = lag_head - n_before;
448       if (index < 0)
449         index += n_lag;
450       return &lag_queue[index];
451     }
452   else
453     return NULL;
454 }
455 \f
456 /* Procedure that separates the data into SPLIT FILE groups. */
457
458 /* Represents auxiliary data for handling SPLIT FILE. */
459 struct split_aux_data 
460   {
461     size_t case_count;          /* Number of cases so far. */
462     struct ccase prev_case;     /* Data in previous case. */
463
464     /* Callback functions. */
465     void (*begin_func) (const struct ccase *, void *);
466     bool (*proc_func) (const struct ccase *, void *);
467     void (*end_func) (void *);
468     void *func_aux;
469   };
470
471 static int equal_splits (const struct ccase *, const struct ccase *);
472 static bool split_procedure_case_func (const struct ccase *c, void *);
473 static bool split_procedure_end_func (void *);
474
475 /* Like procedure(), but it automatically breaks the case stream
476    into SPLIT FILE break groups.  Before each group of cases with
477    identical SPLIT FILE variable values, BEGIN_FUNC is called
478    with the first case in the group.
479    Then PROC_FUNC is called for each case in the group (including
480    the first).
481    END_FUNC is called when the group is finished.  FUNC_AUX is
482    passed to each of the functions as auxiliary data.
483
484    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
485    and END_FUNC will be called at all. 
486
487    If SPLIT FILE is not in effect, then there is one break group
488    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
489    will be called once.
490    
491    Returns true if successful, false if an I/O error occurred. */
492 bool
493 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
494                        bool (*proc_func) (const struct ccase *, void *aux),
495                        void (*end_func) (void *aux),
496                        void *func_aux) 
497 {
498   struct split_aux_data split_aux;
499   bool ok;
500
501   split_aux.case_count = 0;
502   case_nullify (&split_aux.prev_case);
503   split_aux.begin_func = begin_func;
504   split_aux.proc_func = proc_func;
505   split_aux.end_func = end_func;
506   split_aux.func_aux = func_aux;
507
508   ok = internal_procedure (split_procedure_case_func,
509                            split_procedure_end_func, &split_aux);
510
511   case_destroy (&split_aux.prev_case);
512
513   return ok;
514 }
515
516 /* Case callback used by procedure_with_splits(). */
517 static bool
518 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
519 {
520   struct split_aux_data *split_aux = split_aux_;
521
522   /* Start a new series if needed. */
523   if (split_aux->case_count == 0
524       || !equal_splits (c, &split_aux->prev_case))
525     {
526       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
527         split_aux->end_func (split_aux->func_aux);
528
529       case_destroy (&split_aux->prev_case);
530       case_clone (&split_aux->prev_case, c);
531
532       if (split_aux->begin_func != NULL)
533         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
534     }
535
536   split_aux->case_count++;
537   return (split_aux->proc_func == NULL
538           || split_aux->proc_func (c, split_aux->func_aux));
539 }
540
541 /* End-of-file callback used by procedure_with_splits(). */
542 static bool
543 split_procedure_end_func (void *split_aux_) 
544 {
545   struct split_aux_data *split_aux = split_aux_;
546
547   if (split_aux->case_count > 0 && split_aux->end_func != NULL)
548     split_aux->end_func (split_aux->func_aux);
549   return true;
550 }
551
552 /* Compares the SPLIT FILE variables in cases A and B and returns
553    nonzero only if they differ. */
554 static int
555 equal_splits (const struct ccase *a, const struct ccase *b) 
556 {
557   return case_compare (a, b,
558                        dict_get_split_vars (default_dict),
559                        dict_get_split_cnt (default_dict)) == 0;
560 }
561 \f
562 /* Multipass procedure that separates the data into SPLIT FILE
563    groups. */
564
565 /* Represents auxiliary data for handling SPLIT FILE in a
566    multipass procedure. */
567 struct multipass_split_aux_data 
568   {
569     struct ccase prev_case;     /* Data in previous case. */
570     struct casefile *casefile;  /* Accumulates data for a split. */
571
572     /* Function to call with the accumulated data. */
573     bool (*split_func) (const struct ccase *first, const struct casefile *,
574                         void *);
575     void *func_aux;                            /* Auxiliary data. */ 
576   };
577
578 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
579 static bool multipass_split_end_func (void *aux_);
580 static bool multipass_split_output (struct multipass_split_aux_data *);
581
582 /* Returns true if successful, false if an I/O error occurred. */
583 bool
584 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
585                                                      const struct casefile *,
586                                                      void *aux),
587                                  void *func_aux)
588 {
589   struct multipass_split_aux_data aux;
590   bool ok;
591
592   case_nullify (&aux.prev_case);
593   aux.casefile = NULL;
594   aux.split_func = split_func;
595   aux.func_aux = func_aux;
596
597   ok = internal_procedure (multipass_split_case_func,
598                            multipass_split_end_func, &aux);
599   case_destroy (&aux.prev_case);
600
601   return ok;
602 }
603
604 /* Case callback used by multipass_procedure_with_splits(). */
605 static bool
606 multipass_split_case_func (const struct ccase *c, void *aux_)
607 {
608   struct multipass_split_aux_data *aux = aux_;
609   bool ok = true;
610
611   /* Start a new series if needed. */
612   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
613     {
614       /* Record split values. */
615       case_destroy (&aux->prev_case);
616       case_clone (&aux->prev_case, c);
617
618       /* Pass any cases to split_func. */
619       if (aux->casefile != NULL)
620         ok = multipass_split_output (aux);
621
622       /* Start a new casefile. */
623       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
624     }
625
626   return casefile_append (aux->casefile, c) && ok;
627 }
628
629 /* End-of-file callback used by multipass_procedure_with_splits(). */
630 static bool
631 multipass_split_end_func (void *aux_)
632 {
633   struct multipass_split_aux_data *aux = aux_;
634   return (aux->casefile == NULL || multipass_split_output (aux));
635 }
636
637 static bool
638 multipass_split_output (struct multipass_split_aux_data *aux)
639 {
640   bool ok;
641   
642   assert (aux->casefile != NULL);
643   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
644   casefile_destroy (aux->casefile);
645   aux->casefile = NULL;
646
647   return ok;
648 }
649 \f
650 /* Discards all the current state in preparation for a data-input
651    command like DATA LIST or GET. */
652 void
653 discard_variables (void)
654 {
655   dict_clear (default_dict);
656   fh_set_default_handle (NULL);
657
658   n_lag = 0;
659   
660   free_case_source (proc_source);
661   proc_source = NULL;
662
663   proc_cancel_all_transformations ();
664 }
665 \f
666 /* Returns the current set of permanent transformations,
667    and clears the permanent transformations.
668    For use by INPUT PROGRAM. */
669 struct trns_chain *
670 proc_capture_transformations (void) 
671 {
672   struct trns_chain *chain;
673   
674   assert (temporary_trns_chain == NULL);
675   chain = permanent_trns_chain;
676   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
677   return chain;
678 }
679
680 /* Adds a transformation that processes a case with PROC and
681    frees itself with FREE to the current set of transformations.
682    The functions are passed AUX as auxiliary data. */
683 void
684 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
685 {
686   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
687 }
688
689 /* Adds a transformation that processes a case with PROC and
690    frees itself with FREE to the current set of transformations.
691    When parsing of the block of transformations is complete,
692    FINALIZE will be called.
693    The functions are passed AUX as auxiliary data. */
694 void
695 add_transformation_with_finalizer (trns_finalize_func *finalize,
696                                    trns_proc_func *proc,
697                                    trns_free_func *free, void *aux)
698 {
699   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
700 }
701
702 /* Returns the index of the next transformation.
703    This value can be returned by a transformation procedure
704    function to indicate a "jump" to that transformation. */
705 size_t
706 next_transformation (void) 
707 {
708   return trns_chain_next (cur_trns_chain);
709 }
710
711 /* Returns true if the next call to add_transformation() will add
712    a temporary transformation, false if it will add a permanent
713    transformation. */
714 bool
715 proc_in_temporary_transformations (void) 
716 {
717   return temporary_trns_chain != NULL;
718 }
719
720 /* Marks the start of temporary transformations.
721    Further calls to add_transformation() will add temporary
722    transformations. */
723 void
724 proc_start_temporary_transformations (void) 
725 {
726   if (!proc_in_temporary_transformations ())
727     {
728       add_case_limit_trns ();
729
730       permanent_dict = dict_clone (default_dict);
731       trns_chain_finalize (permanent_trns_chain);
732       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
733     }
734 }
735
736 /* Converts all the temporary transformations, if any, to
737    permanent transformations.  Further transformations will be
738    permanent.
739    Returns true if anything changed, false otherwise. */
740 bool
741 proc_make_temporary_transformations_permanent (void) 
742 {
743   if (proc_in_temporary_transformations ()) 
744     {
745       trns_chain_finalize (temporary_trns_chain);
746       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
747       temporary_trns_chain = NULL;
748
749       dict_destroy (permanent_dict);
750       permanent_dict = NULL;
751
752       return true;
753     }
754   else
755     return false;
756 }
757
758 /* Cancels all temporary transformations, if any.  Further
759    transformations will be permanent.
760    Returns true if anything changed, false otherwise. */
761 bool
762 proc_cancel_temporary_transformations (void) 
763 {
764   if (proc_in_temporary_transformations ()) 
765     {
766       dict_destroy (default_dict);
767       default_dict = permanent_dict;
768       permanent_dict = NULL;
769
770       trns_chain_destroy (temporary_trns_chain);
771       temporary_trns_chain = NULL;
772
773       return true;
774     }
775   else
776     return false;
777 }
778
779 /* Cancels all transformations, if any.
780    Returns true if successful, false on I/O error. */
781 bool
782 proc_cancel_all_transformations (void)
783 {
784   bool ok;
785   ok = trns_chain_destroy (permanent_trns_chain);
786   ok = trns_chain_destroy (temporary_trns_chain) && ok;
787   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
788   temporary_trns_chain = NULL;
789   return ok;
790 }
791 \f
792 /* Initializes procedure handling. */
793 void
794 proc_init (void) 
795 {
796   default_dict = dict_create ();
797   proc_cancel_all_transformations ();
798 }
799
800 /* Finishes up procedure handling. */
801 void
802 proc_done (void)
803 {
804   discard_variables ();
805 }
806
807 /* Sets SINK as the destination for procedure output from the
808    next procedure. */
809 void
810 proc_set_sink (struct case_sink *sink) 
811 {
812   assert (proc_sink == NULL);
813   proc_sink = sink;
814 }
815
816 /* Sets SOURCE as the source for procedure input for the next
817    procedure. */
818 void
819 proc_set_source (struct case_source *source) 
820 {
821   assert (proc_source == NULL);
822   proc_source = source;
823 }
824
825 /* Returns true if a source for the next procedure has been
826    configured, false otherwise. */
827 bool
828 proc_has_source (void) 
829 {
830   return proc_source != NULL;
831 }
832
833 /* Returns the output from the previous procedure.
834    For use only immediately after executing a procedure.
835    The returned casefile is owned by the caller; it will not be
836    automatically used for the next procedure's input. */
837 struct casefile *
838 proc_capture_output (void) 
839 {
840   struct casefile *casefile;
841
842   /* Try to make sure that this function is called immediately
843      after procedure() or a similar function. */
844   assert (proc_source != NULL);
845   assert (case_source_is_class (proc_source, &storage_source_class));
846   assert (trns_chain_is_empty (permanent_trns_chain));
847   assert (!proc_in_temporary_transformations ());
848
849   casefile = storage_source_decapsulate (proc_source);
850   proc_source = NULL;
851
852   return casefile;
853 }
854 \f
855 static trns_proc_func case_limit_trns_proc;
856 static trns_free_func case_limit_trns_free;
857
858 /* Adds a transformation that limits the number of cases that may
859    pass through, if default_dict has a case limit. */
860 static void
861 add_case_limit_trns (void) 
862 {
863   size_t case_limit = dict_get_case_limit (default_dict);
864   if (case_limit != 0)
865     {
866       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
867       *cases_remaining = case_limit;
868       add_transformation (case_limit_trns_proc, case_limit_trns_free,
869                           cases_remaining);
870       dict_set_case_limit (default_dict, 0);
871     }
872 }
873
874 /* Limits the maximum number of cases processed to
875    *CASES_REMAINING. */
876 static int
877 case_limit_trns_proc (void *cases_remaining_,
878                       struct ccase *c UNUSED, int case_nr UNUSED) 
879 {
880   size_t *cases_remaining = cases_remaining_;
881   if (*cases_remaining > 0) 
882     {
883       *cases_remaining--;
884       return TRNS_CONTINUE;
885     }
886   else
887     return TRNS_DROP_CASE;
888 }
889
890 /* Frees the data associated with a case limit transformation. */
891 static bool
892 case_limit_trns_free (void *cases_remaining_) 
893 {
894   size_t *cases_remaining = cases_remaining_;
895   free (cases_remaining);
896   return true;
897 }
898 \f
899 static trns_proc_func filter_trns_proc;
900
901 /* Adds a temporary transformation to filter data according to
902    the variable specified on FILTER, if any. */
903 static void
904 add_filter_trns (void) 
905 {
906   struct variable *filter_var = dict_get_filter (default_dict);
907   if (filter_var != NULL) 
908     {
909       proc_start_temporary_transformations ();
910       add_transformation (filter_trns_proc, NULL, filter_var);
911     }
912 }
913
914 /* FILTER transformation. */
915 static int
916 filter_trns_proc (void *filter_var_,
917                   struct ccase *c UNUSED, int case_nr UNUSED) 
918   
919 {
920   struct variable *filter_var = filter_var_;
921   double f = case_num (c, filter_var->fv);
922   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
923           ? TRNS_CONTINUE : TRNS_DROP_CASE);
924 }
925