Added casereader_clone function.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 /* Procedure execution data. */
43 struct write_case_data
44   {
45     /* Function to call for each case. */
46     bool (*case_func) (const struct ccase *, void *);
47     void *aux;
48
49     struct ccase trns_case;     /* Case used for transformations. */
50     struct ccase sink_case;     /* Case written to sink, if
51                                    compacting is necessary. */
52     size_t cases_written;       /* Cases output so far. */
53   };
54
55 /* Cases are read from proc_source,
56    pass through permanent_trns_chain (which transforms them into
57    the format described by permanent_dict),
58    are written to proc_sink,
59    pass through temporary_trns_chain (which transforms them into
60    the format described by default_dict),
61    and are finally passed to the procedure. */
62 static struct case_source *proc_source;
63 static struct trns_chain *permanent_trns_chain;
64 static struct dictionary *permanent_dict;
65 static struct case_sink *proc_sink;
66 static struct trns_chain *temporary_trns_chain;
67 struct dictionary *default_dict;
68
69 /* The transformation chain that the next transformation will be
70    added to. */
71 static struct trns_chain *cur_trns_chain;
72
73 /* The compactor used to compact a case, if necessary;
74    otherwise a null pointer. */
75 static struct dict_compactor *compactor;
76
77 /* Time at which proc was last invoked. */
78 static time_t last_proc_invocation;
79
80 /* Lag queue. */
81 int n_lag;                      /* Number of cases to lag. */
82 static int lag_count;           /* Number of cases in lag_queue so far. */
83 static int lag_head;            /* Index where next case will be added. */
84 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
85
86 static void add_case_limit_trns (void);
87 static void add_filter_trns (void);
88
89 static bool internal_procedure (bool (*case_func) (const struct ccase *,
90                                                    void *),
91                                 bool (*end_func) (void *),
92                                 void *aux);
93 static void update_last_proc_invocation (void);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (void);
96 static bool write_case (struct write_case_data *wc_data);
97 static void lag_case (const struct ccase *c);
98 static void clear_case (struct ccase *c);
99 static bool close_active_file (void);
100 \f
101 /* Public functions. */
102
103 /* Returns the last time the data was read. */
104 time_t
105 time_of_last_procedure (void) 
106 {
107   if (last_proc_invocation == 0)
108     update_last_proc_invocation ();
109   return last_proc_invocation;
110 }
111 \f
112 /* Regular procedure. */
113
114 /* Reads the data from the input program and writes it to a new
115    active file.  For each case we read from the input program, we
116    do the following:
117
118    1. Execute permanent transformations.  If these drop the case,
119       start the next case from step 1.
120
121    2. Write case to replacement active file.
122    
123    3. Execute temporary transformations.  If these drop the case,
124       start the next case from step 1.
125       
126    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
127
128    Returns true if successful, false if an I/O error occurred. */
129 bool
130 procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
131 {
132   return internal_procedure (proc_func, NULL, aux);
133 }
134 \f
135 /* Multipass procedure. */
136
137 struct multipass_aux_data 
138   {
139     struct casefile *casefile;
140     
141     bool (*proc_func) (const struct casefile *, void *aux);
142     void *aux;
143   };
144
145 /* Case processing function for multipass_procedure(). */
146 static bool
147 multipass_case_func (const struct ccase *c, void *aux_data_) 
148 {
149   struct multipass_aux_data *aux_data = aux_data_;
150   return casefile_append (aux_data->casefile, c);
151 }
152
153 /* End-of-file function for multipass_procedure(). */
154 static bool
155 multipass_end_func (void *aux_data_) 
156 {
157   struct multipass_aux_data *aux_data = aux_data_;
158   return (aux_data->proc_func == NULL
159           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
160 }
161
162 /* Procedure that allows multiple passes over the input data.
163    The entire active file is passed to PROC_FUNC, with the given
164    AUX as auxiliary data, as a unit. */
165 bool
166 multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
167                      void *aux) 
168 {
169   struct multipass_aux_data aux_data;
170   bool ok;
171
172   aux_data.casefile = fastfile_create (dict_get_next_value_idx (default_dict));
173   aux_data.proc_func = proc_func;
174   aux_data.aux = aux;
175
176   ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
177   ok = !casefile_error (aux_data.casefile) && ok;
178
179   casefile_destroy (aux_data.casefile);
180
181   return ok;
182 }
183 \f
184 /* Procedure implementation. */
185
186 /* Executes a procedure.
187    Passes each case to CASE_FUNC.
188    Calls END_FUNC after the last case.
189    Returns true if successful, false if an I/O error occurred (or
190    if CASE_FUNC or END_FUNC ever returned false). */
191 static bool
192 internal_procedure (bool (*case_func) (const struct ccase *, void *),
193                     bool (*end_func) (void *),
194                     void *aux) 
195 {
196   struct write_case_data wc_data;
197   bool ok = true;
198
199   assert (proc_source != NULL);
200
201   update_last_proc_invocation ();
202
203   /* Optimize the trivial case where we're not going to do
204      anything with the data, by not reading the data at all. */
205   if (case_func == NULL && end_func == NULL
206       && case_source_is_class (proc_source, &storage_source_class)
207       && proc_sink == NULL
208       && (temporary_trns_chain == NULL
209           || trns_chain_is_empty (temporary_trns_chain))
210       && trns_chain_is_empty (permanent_trns_chain))
211     {
212       n_lag = 0;
213       dict_set_case_limit (default_dict, 0);
214       dict_clear_vectors (default_dict);
215       return true;
216     }
217   
218   open_active_file ();
219   
220   wc_data.case_func = case_func;
221   wc_data.aux = aux;
222   create_trns_case (&wc_data.trns_case, default_dict);
223   case_create (&wc_data.sink_case,
224                dict_get_compacted_value_cnt (default_dict));
225   wc_data.cases_written = 0;
226
227   ok = proc_source->class->read (proc_source,
228                                  &wc_data.trns_case,
229                                  write_case, &wc_data) && ok;
230   if (end_func != NULL)
231     ok = end_func (aux) && ok;
232
233   case_destroy (&wc_data.sink_case);
234   case_destroy (&wc_data.trns_case);
235
236   ok = close_active_file () && ok;
237
238   return ok;
239 }
240
241 /* Updates last_proc_invocation. */
242 static void
243 update_last_proc_invocation (void) 
244 {
245   last_proc_invocation = time (NULL);
246 }
247
248 /* Creates and returns a case, initializing it from the vectors
249    that say which `value's need to be initialized just once, and
250    which ones need to be re-initialized before every case. */
251 static void
252 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
253 {
254   size_t var_cnt = dict_get_var_cnt (dict);
255   size_t i;
256
257   case_create (trns_case, dict_get_next_value_idx (dict));
258   for (i = 0; i < var_cnt; i++) 
259     {
260       struct variable *v = dict_get_var (dict, i);
261       union value *value = case_data_rw (trns_case, v->fv);
262
263       if (v->type == NUMERIC)
264         value->f = v->leave ? 0.0 : SYSMIS;
265       else
266         memset (value->s, ' ', v->width);
267     }
268 }
269
270 /* Makes all preparations for reading from the data source and writing
271    to the data sink. */
272 static void
273 open_active_file (void)
274 {
275   add_case_limit_trns ();
276   add_filter_trns ();
277
278   /* Finalize transformations. */
279   trns_chain_finalize (cur_trns_chain);
280
281   /* Make permanent_dict refer to the dictionary right before
282      data reaches the sink. */
283   if (permanent_dict == NULL)
284     permanent_dict = default_dict;
285
286   /* Figure out whether to compact. */
287   compactor = (dict_compacting_would_shrink (permanent_dict)
288                ? dict_make_compactor (permanent_dict)
289                : NULL);
290
291   /* Prepare sink. */
292   if (proc_sink == NULL)
293     proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
294   if (proc_sink->class->open != NULL)
295     proc_sink->class->open (proc_sink);
296
297   /* Allocate memory for lag queue. */
298   if (n_lag > 0)
299     {
300       int i;
301   
302       lag_count = 0;
303       lag_head = 0;
304       lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
305       for (i = 0; i < n_lag; i++)
306         case_nullify (&lag_queue[i]);
307     }
308 }
309
310 /* Transforms trns_case and writes it to the replacement active
311    file if advisable.  Returns true if more cases can be
312    accepted, false otherwise.  Do not call this function again
313    after it has returned false once.  */
314 static bool
315 write_case (struct write_case_data *wc_data)
316 {
317   enum trns_result retval;
318   size_t case_nr;
319   
320   /* Execute permanent transformations.  */
321   case_nr = wc_data->cases_written + 1;
322   retval = trns_chain_execute (permanent_trns_chain,
323                                &wc_data->trns_case, &case_nr);
324   if (retval != TRNS_CONTINUE)
325     goto done;
326
327   /* Write case to LAG queue. */
328   if (n_lag)
329     lag_case (&wc_data->trns_case);
330
331   /* Write case to replacement active file. */
332   wc_data->cases_written++;
333   if (proc_sink->class->write != NULL) 
334     {
335       if (compactor != NULL) 
336         {
337           dict_compactor_compact (compactor, &wc_data->sink_case,
338                                   &wc_data->trns_case);
339           proc_sink->class->write (proc_sink, &wc_data->sink_case);
340         }
341       else
342         proc_sink->class->write (proc_sink, &wc_data->trns_case);
343     }
344   
345   /* Execute temporary transformations. */
346   if (temporary_trns_chain != NULL) 
347     {
348       retval = trns_chain_execute (temporary_trns_chain,
349                                    &wc_data->trns_case,
350                                    &wc_data->cases_written);
351       if (retval != TRNS_CONTINUE)
352         goto done;
353     }
354
355   /* Pass case to procedure. */
356   if (wc_data->case_func != NULL)
357     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
358       retval = TRNS_ERROR;
359
360  done:
361   clear_case (&wc_data->trns_case);
362   return retval != TRNS_ERROR;
363 }
364
365 /* Add C to the lag queue. */
366 static void
367 lag_case (const struct ccase *c)
368 {
369   if (lag_count < n_lag)
370     lag_count++;
371   case_destroy (&lag_queue[lag_head]);
372   case_clone (&lag_queue[lag_head], c);
373   if (++lag_head >= n_lag)
374     lag_head = 0;
375 }
376
377 /* Clears the variables in C that need to be cleared between
378    processing cases.  */
379 static void
380 clear_case (struct ccase *c)
381 {
382   size_t var_cnt = dict_get_var_cnt (default_dict);
383   size_t i;
384   
385   for (i = 0; i < var_cnt; i++) 
386     {
387       struct variable *v = dict_get_var (default_dict, i);
388       if (!v->leave) 
389         {
390           if (v->type == NUMERIC)
391             case_data_rw (c, v->fv)->f = SYSMIS;
392           else
393             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
394         } 
395     }
396 }
397
398 /* Closes the active file. */
399 static bool
400 close_active_file (void)
401 {
402   /* Free memory for lag queue, and turn off lagging. */
403   if (n_lag > 0)
404     {
405       int i;
406       
407       for (i = 0; i < n_lag; i++)
408         case_destroy (&lag_queue[i]);
409       free (lag_queue);
410       n_lag = 0;
411     }
412   
413   /* Dictionary from before TEMPORARY becomes permanent. */
414   proc_cancel_temporary_transformations ();
415
416   /* Finish compacting. */
417   if (compactor != NULL) 
418     {
419       dict_compactor_destroy (compactor);
420       dict_compact_values (default_dict);
421       compactor = NULL;
422     }
423     
424   /* Free data source. */
425   free_case_source (proc_source);
426   proc_source = NULL;
427
428   /* Old data sink becomes new data source. */
429   if (proc_sink->class->make_source != NULL)
430     proc_source = proc_sink->class->make_source (proc_sink);
431   free_case_sink (proc_sink);
432   proc_sink = NULL;
433
434   dict_clear_vectors (default_dict);
435   permanent_dict = NULL;
436   return proc_cancel_all_transformations ();
437 }
438 \f
439 /* Returns a pointer to the lagged case from N_BEFORE cases before the
440    current one, or NULL if there haven't been that many cases yet. */
441 struct ccase *
442 lagged_case (int n_before)
443 {
444   assert (n_before >= 1 );
445   assert (n_before <= n_lag);
446
447   if (n_before <= lag_count)
448     {
449       int index = lag_head - n_before;
450       if (index < 0)
451         index += n_lag;
452       return &lag_queue[index];
453     }
454   else
455     return NULL;
456 }
457 \f
458 /* Procedure that separates the data into SPLIT FILE groups. */
459
460 /* Represents auxiliary data for handling SPLIT FILE. */
461 struct split_aux_data 
462   {
463     struct ccase prev_case;     /* Data in previous case. */
464
465     /* Callback functions. */
466     void (*begin_func) (const struct ccase *, void *);
467     bool (*proc_func) (const struct ccase *, void *);
468     void (*end_func) (void *);
469     void *func_aux;
470   };
471
472 static int equal_splits (const struct ccase *, const struct ccase *);
473 static bool split_procedure_case_func (const struct ccase *c, void *);
474 static bool split_procedure_end_func (void *);
475
476 /* Like procedure(), but it automatically breaks the case stream
477    into SPLIT FILE break groups.  Before each group of cases with
478    identical SPLIT FILE variable values, BEGIN_FUNC is called
479    with the first case in the group.
480    Then PROC_FUNC is called for each case in the group (including
481    the first).
482    END_FUNC is called when the group is finished.  FUNC_AUX is
483    passed to each of the functions as auxiliary data.
484
485    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
486    and END_FUNC will be called at all. 
487
488    If SPLIT FILE is not in effect, then there is one break group
489    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
490    will be called once.
491    
492    Returns true if successful, false if an I/O error occurred. */
493 bool
494 procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
495                        bool (*proc_func) (const struct ccase *, void *aux),
496                        void (*end_func) (void *aux),
497                        void *func_aux) 
498 {
499   struct split_aux_data split_aux;
500   bool ok;
501
502   case_nullify (&split_aux.prev_case);
503   split_aux.begin_func = begin_func;
504   split_aux.proc_func = proc_func;
505   split_aux.end_func = end_func;
506   split_aux.func_aux = func_aux;
507
508   ok = internal_procedure (split_procedure_case_func,
509                            split_procedure_end_func, &split_aux);
510
511   case_destroy (&split_aux.prev_case);
512
513   return ok;
514 }
515
516 /* Case callback used by procedure_with_splits(). */
517 static bool
518 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
519 {
520   struct split_aux_data *split_aux = split_aux_;
521
522   /* Start a new series if needed. */
523   if (case_is_null (&split_aux->prev_case)
524       || !equal_splits (c, &split_aux->prev_case))
525     {
526       if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
527         split_aux->end_func (split_aux->func_aux);
528
529       case_destroy (&split_aux->prev_case);
530       case_clone (&split_aux->prev_case, c);
531
532       if (split_aux->begin_func != NULL)
533         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
534     }
535
536   return (split_aux->proc_func == NULL
537           || split_aux->proc_func (c, split_aux->func_aux));
538 }
539
540 /* End-of-file callback used by procedure_with_splits(). */
541 static bool
542 split_procedure_end_func (void *split_aux_) 
543 {
544   struct split_aux_data *split_aux = split_aux_;
545
546   if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
547     split_aux->end_func (split_aux->func_aux);
548   return true;
549 }
550
551 /* Compares the SPLIT FILE variables in cases A and B and returns
552    nonzero only if they differ. */
553 static int
554 equal_splits (const struct ccase *a, const struct ccase *b) 
555 {
556   return case_compare (a, b,
557                        dict_get_split_vars (default_dict),
558                        dict_get_split_cnt (default_dict)) == 0;
559 }
560 \f
561 /* Multipass procedure that separates the data into SPLIT FILE
562    groups. */
563
564 /* Represents auxiliary data for handling SPLIT FILE in a
565    multipass procedure. */
566 struct multipass_split_aux_data 
567   {
568     struct ccase prev_case;     /* Data in previous case. */
569     struct casefile *casefile;  /* Accumulates data for a split. */
570
571     /* Function to call with the accumulated data. */
572     bool (*split_func) (const struct ccase *first, const struct casefile *,
573                         void *);
574     void *func_aux;                            /* Auxiliary data. */ 
575   };
576
577 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
578 static bool multipass_split_end_func (void *aux_);
579 static bool multipass_split_output (struct multipass_split_aux_data *);
580
581 /* Returns true if successful, false if an I/O error occurred. */
582 bool
583 multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
584                                                      const struct casefile *,
585                                                      void *aux),
586                                  void *func_aux)
587 {
588   struct multipass_split_aux_data aux;
589   bool ok;
590
591   case_nullify (&aux.prev_case);
592   aux.casefile = NULL;
593   aux.split_func = split_func;
594   aux.func_aux = func_aux;
595
596   ok = internal_procedure (multipass_split_case_func,
597                            multipass_split_end_func, &aux);
598   case_destroy (&aux.prev_case);
599
600   return ok;
601 }
602
603 /* Case callback used by multipass_procedure_with_splits(). */
604 static bool
605 multipass_split_case_func (const struct ccase *c, void *aux_)
606 {
607   struct multipass_split_aux_data *aux = aux_;
608   bool ok = true;
609
610   /* Start a new series if needed. */
611   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
612     {
613       /* Record split values. */
614       case_destroy (&aux->prev_case);
615       case_clone (&aux->prev_case, c);
616
617       /* Pass any cases to split_func. */
618       if (aux->casefile != NULL)
619         ok = multipass_split_output (aux);
620
621       /* Start a new casefile. */
622       aux->casefile = fastfile_create (dict_get_next_value_idx (default_dict));
623     }
624
625   return casefile_append (aux->casefile, c) && ok;
626 }
627
628 /* End-of-file callback used by multipass_procedure_with_splits(). */
629 static bool
630 multipass_split_end_func (void *aux_)
631 {
632   struct multipass_split_aux_data *aux = aux_;
633   return (aux->casefile == NULL || multipass_split_output (aux));
634 }
635
636 static bool
637 multipass_split_output (struct multipass_split_aux_data *aux)
638 {
639   bool ok;
640   
641   assert (aux->casefile != NULL);
642   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
643   casefile_destroy (aux->casefile);
644   aux->casefile = NULL;
645
646   return ok;
647 }
648 \f
649 /* Discards all the current state in preparation for a data-input
650    command like DATA LIST or GET. */
651 void
652 discard_variables (void)
653 {
654   dict_clear (default_dict);
655   fh_set_default_handle (NULL);
656
657   n_lag = 0;
658   
659   free_case_source (proc_source);
660   proc_source = NULL;
661
662   proc_cancel_all_transformations ();
663 }
664 \f
665 /* Returns the current set of permanent transformations,
666    and clears the permanent transformations.
667    For use by INPUT PROGRAM. */
668 struct trns_chain *
669 proc_capture_transformations (void) 
670 {
671   struct trns_chain *chain;
672   
673   assert (temporary_trns_chain == NULL);
674   chain = permanent_trns_chain;
675   cur_trns_chain = permanent_trns_chain = trns_chain_create ();
676   return chain;
677 }
678
679 /* Adds a transformation that processes a case with PROC and
680    frees itself with FREE to the current set of transformations.
681    The functions are passed AUX as auxiliary data. */
682 void
683 add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
684 {
685   trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
686 }
687
688 /* Adds a transformation that processes a case with PROC and
689    frees itself with FREE to the current set of transformations.
690    When parsing of the block of transformations is complete,
691    FINALIZE will be called.
692    The functions are passed AUX as auxiliary data. */
693 void
694 add_transformation_with_finalizer (trns_finalize_func *finalize,
695                                    trns_proc_func *proc,
696                                    trns_free_func *free, void *aux)
697 {
698   trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
699 }
700
701 /* Returns the index of the next transformation.
702    This value can be returned by a transformation procedure
703    function to indicate a "jump" to that transformation. */
704 size_t
705 next_transformation (void) 
706 {
707   return trns_chain_next (cur_trns_chain);
708 }
709
710 /* Returns true if the next call to add_transformation() will add
711    a temporary transformation, false if it will add a permanent
712    transformation. */
713 bool
714 proc_in_temporary_transformations (void) 
715 {
716   return temporary_trns_chain != NULL;
717 }
718
719 /* Marks the start of temporary transformations.
720    Further calls to add_transformation() will add temporary
721    transformations. */
722 void
723 proc_start_temporary_transformations (void) 
724 {
725   if (!proc_in_temporary_transformations ())
726     {
727       add_case_limit_trns ();
728
729       permanent_dict = dict_clone (default_dict);
730       trns_chain_finalize (permanent_trns_chain);
731       temporary_trns_chain = cur_trns_chain = trns_chain_create ();
732     }
733 }
734
735 /* Converts all the temporary transformations, if any, to
736    permanent transformations.  Further transformations will be
737    permanent.
738    Returns true if anything changed, false otherwise. */
739 bool
740 proc_make_temporary_transformations_permanent (void) 
741 {
742   if (proc_in_temporary_transformations ()) 
743     {
744       trns_chain_finalize (temporary_trns_chain);
745       trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
746       temporary_trns_chain = NULL;
747
748       dict_destroy (permanent_dict);
749       permanent_dict = NULL;
750
751       return true;
752     }
753   else
754     return false;
755 }
756
757 /* Cancels all temporary transformations, if any.  Further
758    transformations will be permanent.
759    Returns true if anything changed, false otherwise. */
760 bool
761 proc_cancel_temporary_transformations (void) 
762 {
763   if (proc_in_temporary_transformations ()) 
764     {
765       dict_destroy (default_dict);
766       default_dict = permanent_dict;
767       permanent_dict = NULL;
768
769       trns_chain_destroy (temporary_trns_chain);
770       temporary_trns_chain = NULL;
771
772       return true;
773     }
774   else
775     return false;
776 }
777
778 /* Cancels all transformations, if any.
779    Returns true if successful, false on I/O error. */
780 bool
781 proc_cancel_all_transformations (void)
782 {
783   bool ok;
784   ok = trns_chain_destroy (permanent_trns_chain);
785   ok = trns_chain_destroy (temporary_trns_chain) && ok;
786   permanent_trns_chain = cur_trns_chain = trns_chain_create ();
787   temporary_trns_chain = NULL;
788   return ok;
789 }
790 \f
791 /* Initializes procedure handling. */
792 void
793 proc_init (void) 
794 {
795   default_dict = dict_create ();
796   proc_cancel_all_transformations ();
797 }
798
799 /* Finishes up procedure handling. */
800 void
801 proc_done (void)
802 {
803   discard_variables ();
804   dict_destroy (default_dict);
805 }
806
807 /* Sets SINK as the destination for procedure output from the
808    next procedure. */
809 void
810 proc_set_sink (struct case_sink *sink) 
811 {
812   assert (proc_sink == NULL);
813   proc_sink = sink;
814 }
815
816 /* Sets SOURCE as the source for procedure input for the next
817    procedure. */
818 void
819 proc_set_source (struct case_source *source) 
820 {
821   assert (proc_source == NULL);
822   proc_source = source;
823 }
824
825 /* Returns true if a source for the next procedure has been
826    configured, false otherwise. */
827 bool
828 proc_has_source (void) 
829 {
830   return proc_source != NULL;
831 }
832
833 /* Returns the output from the previous procedure.
834    For use only immediately after executing a procedure.
835    The returned casefile is owned by the caller; it will not be
836    automatically used for the next procedure's input. */
837 struct casefile *
838 proc_capture_output (void) 
839 {
840   struct casefile *casefile;
841
842   /* Try to make sure that this function is called immediately
843      after procedure() or a similar function. */
844   assert (proc_source != NULL);
845   assert (case_source_is_class (proc_source, &storage_source_class));
846   assert (trns_chain_is_empty (permanent_trns_chain));
847   assert (!proc_in_temporary_transformations ());
848
849   casefile = storage_source_decapsulate (proc_source);
850   proc_source = NULL;
851
852   return casefile;
853 }
854 \f
855 static trns_proc_func case_limit_trns_proc;
856 static trns_free_func case_limit_trns_free;
857
858 /* Adds a transformation that limits the number of cases that may
859    pass through, if default_dict has a case limit. */
860 static void
861 add_case_limit_trns (void) 
862 {
863   size_t case_limit = dict_get_case_limit (default_dict);
864   if (case_limit != 0)
865     {
866       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
867       *cases_remaining = case_limit;
868       add_transformation (case_limit_trns_proc, case_limit_trns_free,
869                           cases_remaining);
870       dict_set_case_limit (default_dict, 0);
871     }
872 }
873
874 /* Limits the maximum number of cases processed to
875    *CASES_REMAINING. */
876 static int
877 case_limit_trns_proc (void *cases_remaining_,
878                       struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
879 {
880   size_t *cases_remaining = cases_remaining_;
881   if (*cases_remaining > 0) 
882     {
883       *cases_remaining--;
884       return TRNS_CONTINUE;
885     }
886   else
887     return TRNS_DROP_CASE;
888 }
889
890 /* Frees the data associated with a case limit transformation. */
891 static bool
892 case_limit_trns_free (void *cases_remaining_) 
893 {
894   size_t *cases_remaining = cases_remaining_;
895   free (cases_remaining);
896   return true;
897 }
898 \f
899 static trns_proc_func filter_trns_proc;
900
901 /* Adds a temporary transformation to filter data according to
902    the variable specified on FILTER, if any. */
903 static void
904 add_filter_trns (void) 
905 {
906   struct variable *filter_var = dict_get_filter (default_dict);
907   if (filter_var != NULL) 
908     {
909       proc_start_temporary_transformations ();
910       add_transformation (filter_trns_proc, NULL, filter_var);
911     }
912 }
913
914 /* FILTER transformation. */
915 static int
916 filter_trns_proc (void *filter_var_,
917                   struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
918   
919 {
920   struct variable *filter_var = filter_var_;
921   double f = case_num (c, filter_var->fv);
922   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
923           ? TRNS_CONTINUE : TRNS_DROP_CASE);
924 }
925