Remove "Written by Ben Pfaff <blp@gnu.org>" lines everywhere.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 /* Procedure execution data. */
42 struct write_case_data
43   {
44     /* Function to call for each case. */
45     case_func *proc;
46     void *aux;
47
48     struct dataset *dataset;    /* The dataset concerned */
49     struct ccase trns_case;     /* Case used for transformations. */
50     struct ccase sink_case;     /* Case written to sink, if
51                                    compacting is necessary. */
52     size_t cases_written;       /* Cases output so far. */
53   };
54
55 struct dataset {
56   /* Cases are read from proc_source,
57      pass through permanent_trns_chain (which transforms them into
58      the format described by permanent_dict),
59      are written to proc_sink,
60      pass through temporary_trns_chain (which transforms them into
61      the format described by dict),
62      and are finally passed to the procedure. */
63   struct case_source *proc_source;
64   struct trns_chain *permanent_trns_chain;
65   struct dictionary *permanent_dict;
66   struct case_sink *proc_sink;
67   struct trns_chain *temporary_trns_chain;
68   struct dictionary *dict;
69
70   /* The transformation chain that the next transformation will be
71      added to. */
72   struct trns_chain *cur_trns_chain;
73
74   /* The compactor used to compact a case, if necessary;
75      otherwise a null pointer. */
76   struct dict_compactor *compactor;
77
78   /* Time at which proc was last invoked. */
79   time_t last_proc_invocation;
80
81   /* Lag queue. */
82   int n_lag;                    /* Number of cases to lag. */
83   int lag_count;                /* Number of cases in lag_queue so far. */
84   int lag_head;         /* Index where next case will be added. */
85   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
86
87 }; /* struct dataset */
88
89
90 static void add_case_limit_trns (struct dataset *ds);
91 static void add_filter_trns (struct dataset *ds);
92
93 static bool internal_procedure (struct dataset *ds, case_func *,
94                                 end_func *,
95                                 void *aux);
96 static void update_last_proc_invocation (struct dataset *ds);
97 static void create_trns_case (struct ccase *, struct dictionary *);
98 static void open_active_file (struct dataset *ds);
99 static bool write_case (struct write_case_data *wc_data);
100 static void lag_case (struct dataset *ds, const struct ccase *c);
101 static void clear_case (const struct dataset *ds, struct ccase *c);
102 static bool close_active_file (struct dataset *ds);
103 \f
104 /* Public functions. */
105
106 /* Returns the last time the data was read. */
107 time_t
108 time_of_last_procedure (struct dataset *ds) 
109 {
110   if (ds->last_proc_invocation == 0)
111     update_last_proc_invocation (ds);
112   return ds->last_proc_invocation;
113 }
114 \f
115 /* Regular procedure. */
116
117
118
119 /* Reads the data from the input program and writes it to a new
120    active file.  For each case we read from the input program, we
121    do the following:
122
123    1. Execute permanent transformations.  If these drop the case,
124       start the next case from step 1.
125
126    2. Write case to replacement active file.
127    
128    3. Execute temporary transformations.  If these drop the case,
129       start the next case from step 1.
130       
131    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
132
133    Returns true if successful, false if an I/O error occurred. */
134 bool
135 procedure (struct dataset *ds, case_func *cf, void *aux)
136 {
137   return internal_procedure (ds, cf, NULL, aux);
138 }
139 \f
140 /* Multipass procedure. */
141
142 struct multipass_aux_data 
143   {
144     struct casefile *casefile;
145     
146     bool (*proc_func) (const struct casefile *, void *aux);
147     void *aux;
148   };
149
150 /* Case processing function for multipass_procedure(). */
151 static bool
152 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
153 {
154   struct multipass_aux_data *aux_data = aux_data_;
155   return casefile_append (aux_data->casefile, c);
156 }
157
158 /* End-of-file function for multipass_procedure(). */
159 static bool
160 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
161 {
162   struct multipass_aux_data *aux_data = aux_data_;
163   return (aux_data->proc_func == NULL
164           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
165 }
166
167 /* Procedure that allows multiple passes over the input data.
168    The entire active file is passed to PROC_FUNC, with the given
169    AUX as auxiliary data, as a unit. */
170 bool
171 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
172 {
173   struct multipass_aux_data aux_data;
174   bool ok;
175
176   aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
177   aux_data.proc_func = proc_func;
178   aux_data.aux = aux;
179
180   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
181   ok = !casefile_error (aux_data.casefile) && ok;
182
183   casefile_destroy (aux_data.casefile);
184
185   return ok;
186 }
187 \f
188 /* Procedure implementation. */
189
190
191 /* Executes a procedure.
192    Passes each case to CASE_FUNC.
193    Calls END_FUNC after the last case.
194    Returns true if successful, false if an I/O error occurred (or
195    if CASE_FUNC or END_FUNC ever returned false). */
196 static bool
197 internal_procedure (struct dataset *ds, case_func *proc,
198                     end_func *end,
199                     void *aux) 
200 {
201   struct write_case_data wc_data;
202   bool ok = true;
203
204   assert (ds->proc_source != NULL);
205
206   update_last_proc_invocation (ds);
207
208   /* Optimize the trivial case where we're not going to do
209      anything with the data, by not reading the data at all. */
210   if (proc == NULL && end == NULL
211       && case_source_is_class (ds->proc_source, &storage_source_class)
212       && ds->proc_sink == NULL
213       && (ds->temporary_trns_chain == NULL
214           || trns_chain_is_empty (ds->temporary_trns_chain))
215       && trns_chain_is_empty (ds->permanent_trns_chain))
216     {
217       ds->n_lag = 0;
218       dict_set_case_limit (ds->dict, 0);
219       dict_clear_vectors (ds->dict);
220       return true;
221     }
222   
223   open_active_file (ds);
224   
225   wc_data.proc = proc;
226   wc_data.aux = aux;
227   wc_data.dataset = ds;
228   create_trns_case (&wc_data.trns_case, ds->dict);
229   case_create (&wc_data.sink_case,
230                dict_get_compacted_value_cnt (ds->dict));
231   wc_data.cases_written = 0;
232
233   ok = ds->proc_source->class->read (ds->proc_source,
234                                  &wc_data.trns_case,
235                                  write_case, &wc_data) && ok;
236   if (end != NULL)
237     ok = end (aux, ds) && ok;
238
239   case_destroy (&wc_data.sink_case);
240   case_destroy (&wc_data.trns_case);
241
242   ok = close_active_file (ds) && ok;
243
244   return ok;
245 }
246
247 /* Updates last_proc_invocation. */
248 static void
249 update_last_proc_invocation (struct dataset *ds) 
250 {
251   ds->last_proc_invocation = time (NULL);
252 }
253
254 /* Creates and returns a case, initializing it from the vectors
255    that say which `value's need to be initialized just once, and
256    which ones need to be re-initialized before every case. */
257 static void
258 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
259 {
260   size_t var_cnt = dict_get_var_cnt (dict);
261   size_t i;
262
263   case_create (trns_case, dict_get_next_value_idx (dict));
264   for (i = 0; i < var_cnt; i++) 
265     {
266       struct variable *v = dict_get_var (dict, i);
267       union value *value = case_data_rw (trns_case, v);
268
269       if (var_is_numeric (v))
270         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
271       else
272         memset (value->s, ' ', var_get_width (v));
273     }
274 }
275
276 /* Makes all preparations for reading from the data source and writing
277    to the data sink. */
278 static void
279 open_active_file (struct dataset *ds)
280 {
281   add_case_limit_trns (ds);
282   add_filter_trns (ds);
283
284   /* Finalize transformations. */
285   trns_chain_finalize (ds->cur_trns_chain);
286
287   /* Make permanent_dict refer to the dictionary right before
288      data reaches the sink. */
289   if (ds->permanent_dict == NULL)
290     ds->permanent_dict = ds->dict;
291
292   /* Figure out whether to compact. */
293   ds->compactor = 
294     (dict_compacting_would_shrink (ds->permanent_dict)
295      ? dict_make_compactor (ds->permanent_dict)
296      : NULL);
297
298   /* Prepare sink. */
299   if (ds->proc_sink == NULL)
300     ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
301   if (ds->proc_sink->class->open != NULL)
302     ds->proc_sink->class->open (ds->proc_sink);
303
304   /* Allocate memory for lag queue. */
305   if (ds->n_lag > 0)
306     {
307       int i;
308   
309       ds->lag_count = 0;
310       ds->lag_head = 0;
311       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
312       for (i = 0; i < ds->n_lag; i++)
313         case_nullify (&ds->lag_queue[i]);
314     }
315 }
316
317 /* Transforms trns_case and writes it to the replacement active
318    file if advisable.  Returns true if more cases can be
319    accepted, false otherwise.  Do not call this function again
320    after it has returned false once.  */
321 static bool
322 write_case (struct write_case_data *wc_data)
323 {
324   enum trns_result retval;
325   size_t case_nr;
326
327   struct dataset *ds = wc_data->dataset;
328   
329   /* Execute permanent transformations.  */
330   case_nr = wc_data->cases_written + 1;
331   retval = trns_chain_execute (ds->permanent_trns_chain,
332                                &wc_data->trns_case, &case_nr);
333   if (retval != TRNS_CONTINUE)
334     goto done;
335
336   /* Write case to LAG queue. */
337   if (ds->n_lag)
338     lag_case (ds, &wc_data->trns_case);
339
340   /* Write case to replacement active file. */
341   wc_data->cases_written++;
342   if (ds->proc_sink->class->write != NULL) 
343     {
344       if (ds->compactor != NULL) 
345         {
346           dict_compactor_compact (ds->compactor, &wc_data->sink_case,
347                                   &wc_data->trns_case);
348           ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
349         }
350       else
351         ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
352     }
353   
354   /* Execute temporary transformations. */
355   if (ds->temporary_trns_chain != NULL) 
356     {
357       retval = trns_chain_execute (ds->temporary_trns_chain,
358                                    &wc_data->trns_case,
359                                    &wc_data->cases_written);
360       if (retval != TRNS_CONTINUE)
361         goto done;
362     }
363
364   /* Pass case to procedure. */
365   if (wc_data->proc != NULL)
366     if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
367       retval = TRNS_ERROR;
368
369  done:
370   clear_case (ds, &wc_data->trns_case);
371   return retval != TRNS_ERROR;
372 }
373
374 /* Add C to the lag queue. */
375 static void
376 lag_case (struct dataset *ds, const struct ccase *c)
377 {
378   if (ds->lag_count < ds->n_lag)
379     ds->lag_count++;
380   case_destroy (&ds->lag_queue[ds->lag_head]);
381   case_clone (&ds->lag_queue[ds->lag_head], c);
382   if (++ds->lag_head >= ds->n_lag)
383     ds->lag_head = 0;
384 }
385
386 /* Clears the variables in C that need to be cleared between
387    processing cases.  */
388 static void
389 clear_case (const struct dataset *ds, struct ccase *c)
390 {
391   size_t var_cnt = dict_get_var_cnt (ds->dict);
392   size_t i;
393   
394   for (i = 0; i < var_cnt; i++) 
395     {
396       struct variable *v = dict_get_var (ds->dict, i);
397       if (!var_get_leave (v)) 
398         {
399           if (var_is_numeric (v))
400             case_data_rw (c, v)->f = SYSMIS; 
401           else
402             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
403         } 
404     }
405 }
406
407 /* Closes the active file. */
408 static bool
409 close_active_file (struct dataset *ds)
410 {
411   /* Free memory for lag queue, and turn off lagging. */
412   if (ds->n_lag > 0)
413     {
414       int i;
415       
416       for (i = 0; i < ds->n_lag; i++)
417         case_destroy (&ds->lag_queue[i]);
418       free (ds->lag_queue);
419       ds->n_lag = 0;
420     }
421   
422   /* Dictionary from before TEMPORARY becomes permanent. */
423   proc_cancel_temporary_transformations (ds);
424
425   /* Finish compacting. */
426   if (ds->compactor != NULL) 
427     {
428       dict_compactor_destroy (ds->compactor);
429       dict_compact_values (ds->dict);
430       ds->compactor = NULL;
431     }
432     
433   /* Free data source. */
434   free_case_source (ds->proc_source);
435   ds->proc_source = NULL;
436
437   /* Old data sink becomes new data source. */
438   if (ds->proc_sink->class->make_source != NULL)
439     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
440   free_case_sink (ds->proc_sink);
441   ds->proc_sink = NULL;
442
443   dict_clear_vectors (ds->dict);
444   ds->permanent_dict = NULL;
445   return proc_cancel_all_transformations (ds);
446 }
447 \f
448 /* Returns a pointer to the lagged case from N_BEFORE cases before the
449    current one, or NULL if there haven't been that many cases yet. */
450 struct ccase *
451 lagged_case (const struct dataset *ds, int n_before)
452 {
453   assert (n_before >= 1 );
454   assert (n_before <= ds->n_lag);
455
456   if (n_before <= ds->lag_count)
457     {
458       int index = ds->lag_head - n_before;
459       if (index < 0)
460         index += ds->n_lag;
461       return &ds->lag_queue[index];
462     }
463   else
464     return NULL;
465 }
466 \f
467 /* Procedure that separates the data into SPLIT FILE groups. */
468
469 /* Represents auxiliary data for handling SPLIT FILE. */
470 struct split_aux_data 
471   {
472     struct dataset *dataset;    /* The dataset */
473     struct ccase prev_case;     /* Data in previous case. */
474
475     /* Callback functions. */
476     begin_func *begin; 
477     case_func *proc;
478     end_func *end;
479     void *func_aux;
480   };
481
482 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
483 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
484 static bool split_procedure_end_func (void *, const struct dataset *);
485
486 /* Like procedure(), but it automatically breaks the case stream
487    into SPLIT FILE break groups.  Before each group of cases with
488    identical SPLIT FILE variable values, BEGIN_FUNC is called
489    with the first case in the group.
490    Then PROC_FUNC is called for each case in the group (including
491    the first).
492    END_FUNC is called when the group is finished.  FUNC_AUX is
493    passed to each of the functions as auxiliary data.
494
495    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
496    and END_FUNC will be called at all. 
497
498    If SPLIT FILE is not in effect, then there is one break group
499    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
500    will be called once.
501    
502    Returns true if successful, false if an I/O error occurred. */
503 bool
504 procedure_with_splits (struct dataset *ds,
505                        begin_func begin, 
506                        case_func *proc,
507                        end_func *end,
508                        void *func_aux) 
509 {
510   struct split_aux_data split_aux;
511   bool ok;
512
513   case_nullify (&split_aux.prev_case);
514   split_aux.begin = begin;
515   split_aux.proc = proc;
516   split_aux.end = end;
517   split_aux.func_aux = func_aux;
518   split_aux.dataset = ds;
519
520   ok = internal_procedure (ds, split_procedure_case_func,
521                            split_procedure_end_func, &split_aux);
522
523   case_destroy (&split_aux.prev_case);
524
525   return ok;
526 }
527
528 /* Case callback used by procedure_with_splits(). */
529 static bool
530 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
531 {
532   struct split_aux_data *split_aux = split_aux_;
533
534   /* Start a new series if needed. */
535   if (case_is_null (&split_aux->prev_case)
536       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
537     {
538       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
539         split_aux->end (split_aux->func_aux, ds);
540
541       case_destroy (&split_aux->prev_case);
542       case_clone (&split_aux->prev_case, c);
543
544       if (split_aux->begin != NULL)
545         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
546     }
547
548   return (split_aux->proc == NULL
549           || split_aux->proc (c, split_aux->func_aux, ds));
550 }
551
552 /* End-of-file callback used by procedure_with_splits(). */
553 static bool
554 split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
555 {
556   struct split_aux_data *split_aux = split_aux_;
557
558   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
559     split_aux->end (split_aux->func_aux, ds);
560   return true;
561 }
562
563 /* Compares the SPLIT FILE variables in cases A and B and returns
564    nonzero only if they differ. */
565 static int
566 equal_splits (const struct ccase *a, const struct ccase *b, 
567               const struct dataset *ds) 
568 {
569   return case_compare (a, b,
570                        dict_get_split_vars (ds->dict),
571                        dict_get_split_cnt (ds->dict)) == 0;
572 }
573 \f
574 /* Multipass procedure that separates the data into SPLIT FILE
575    groups. */
576
577 /* Represents auxiliary data for handling SPLIT FILE in a
578    multipass procedure. */
579 struct multipass_split_aux_data 
580   {
581     struct dataset *dataset;    /* The dataset of the split */
582     struct ccase prev_case;     /* Data in previous case. */
583     struct casefile *casefile;  /* Accumulates data for a split. */
584     split_func *split;          /* Function to call with the accumulated 
585                                    data. */
586     void *func_aux;             /* Auxiliary data. */ 
587   };
588
589 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
590 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
591 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
592
593 /* Returns true if successful, false if an I/O error occurred. */
594 bool
595 multipass_procedure_with_splits (struct dataset *ds, 
596                                  split_func  *split,
597                                  void *func_aux)
598 {
599   struct multipass_split_aux_data aux;
600   bool ok;
601
602   case_nullify (&aux.prev_case);
603   aux.casefile = NULL;
604   aux.split = split;
605   aux.func_aux = func_aux;
606   aux.dataset = ds;
607
608   ok = internal_procedure (ds, multipass_split_case_func,
609                            multipass_split_end_func, &aux);
610   case_destroy (&aux.prev_case);
611
612   return ok;
613 }
614
615 /* Case callback used by multipass_procedure_with_splits(). */
616 static bool
617 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
618 {
619   struct multipass_split_aux_data *aux = aux_;
620   bool ok = true;
621
622   /* Start a new series if needed. */
623   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
624     {
625       /* Record split values. */
626       case_destroy (&aux->prev_case);
627       case_clone (&aux->prev_case, c);
628
629       /* Pass any cases to split_func. */
630       if (aux->casefile != NULL)
631         ok = multipass_split_output (aux, ds);
632
633       /* Start a new casefile. */
634       aux->casefile = 
635         fastfile_create (dict_get_next_value_idx (ds->dict));
636     }
637
638   return casefile_append (aux->casefile, c) && ok;
639 }
640
641 /* End-of-file callback used by multipass_procedure_with_splits(). */
642 static bool
643 multipass_split_end_func (void *aux_, const struct dataset *ds)
644 {
645   struct multipass_split_aux_data *aux = aux_;
646   return (aux->casefile == NULL || multipass_split_output (aux, ds));
647 }
648
649 static bool
650 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
651 {
652   bool ok;
653   
654   assert (aux->casefile != NULL);
655   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
656   casefile_destroy (aux->casefile);
657   aux->casefile = NULL;
658
659   return ok;
660 }
661 \f
662 /* Discards all the current state in preparation for a data-input
663    command like DATA LIST or GET. */
664 void
665 discard_variables (struct dataset *ds)
666 {
667   dict_clear (ds->dict);
668   fh_set_default_handle (NULL);
669
670   ds->n_lag = 0;
671   
672   free_case_source (ds->proc_source);
673   ds->proc_source = NULL;
674
675   proc_cancel_all_transformations (ds);
676 }
677 \f
678 /* Returns the current set of permanent transformations,
679    and clears the permanent transformations.
680    For use by INPUT PROGRAM. */
681 struct trns_chain *
682 proc_capture_transformations (struct dataset *ds) 
683 {
684   struct trns_chain *chain;
685   
686   assert (ds->temporary_trns_chain == NULL);
687   chain = ds->permanent_trns_chain;
688   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
689   return chain;
690 }
691
692 /* Adds a transformation that processes a case with PROC and
693    frees itself with FREE to the current set of transformations.
694    The functions are passed AUX as auxiliary data. */
695 void
696 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
697 {
698   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
699 }
700
701 /* Adds a transformation that processes a case with PROC and
702    frees itself with FREE to the current set of transformations.
703    When parsing of the block of transformations is complete,
704    FINALIZE will be called.
705    The functions are passed AUX as auxiliary data. */
706 void
707 add_transformation_with_finalizer (struct dataset *ds, 
708                                    trns_finalize_func *finalize,
709                                    trns_proc_func *proc,
710                                    trns_free_func *free, void *aux)
711 {
712   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
713 }
714
715 /* Returns the index of the next transformation.
716    This value can be returned by a transformation procedure
717    function to indicate a "jump" to that transformation. */
718 size_t
719 next_transformation (const struct dataset *ds) 
720 {
721   return trns_chain_next (ds->cur_trns_chain);
722 }
723
724 /* Returns true if the next call to add_transformation() will add
725    a temporary transformation, false if it will add a permanent
726    transformation. */
727 bool
728 proc_in_temporary_transformations (const struct dataset *ds) 
729 {
730   return ds->temporary_trns_chain != NULL;
731 }
732
733 /* Marks the start of temporary transformations.
734    Further calls to add_transformation() will add temporary
735    transformations. */
736 void
737 proc_start_temporary_transformations (struct dataset *ds) 
738 {
739   if (!proc_in_temporary_transformations (ds))
740     {
741       add_case_limit_trns (ds);
742
743       ds->permanent_dict = dict_clone (ds->dict);
744       trns_chain_finalize (ds->permanent_trns_chain);
745       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
746     }
747 }
748
749 /* Converts all the temporary transformations, if any, to
750    permanent transformations.  Further transformations will be
751    permanent.
752    Returns true if anything changed, false otherwise. */
753 bool
754 proc_make_temporary_transformations_permanent (struct dataset *ds) 
755 {
756   if (proc_in_temporary_transformations (ds)) 
757     {
758       trns_chain_finalize (ds->temporary_trns_chain);
759       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
760       ds->temporary_trns_chain = NULL;
761
762       dict_destroy (ds->permanent_dict);
763       ds->permanent_dict = NULL;
764
765       return true;
766     }
767   else
768     return false;
769 }
770
771 /* Cancels all temporary transformations, if any.  Further
772    transformations will be permanent.
773    Returns true if anything changed, false otherwise. */
774 bool
775 proc_cancel_temporary_transformations (struct dataset *ds) 
776 {
777   if (proc_in_temporary_transformations (ds)) 
778     {
779       dict_destroy (ds->dict);
780       ds->dict = ds->permanent_dict;
781       ds->permanent_dict = NULL;
782
783       trns_chain_destroy (ds->temporary_trns_chain);
784       ds->temporary_trns_chain = NULL;
785
786       return true;
787     }
788   else
789     return false;
790 }
791
792 /* Cancels all transformations, if any.
793    Returns true if successful, false on I/O error. */
794 bool
795 proc_cancel_all_transformations (struct dataset *ds)
796 {
797   bool ok;
798   ok = trns_chain_destroy (ds->permanent_trns_chain);
799   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
800   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
801   ds->temporary_trns_chain = NULL;
802   return ok;
803 }
804 \f
805 /* Initializes procedure handling. */
806 struct dataset *
807 create_dataset (void)
808 {
809   struct dataset *ds = xzalloc (sizeof(*ds));
810   ds->dict = dict_create ();
811   proc_cancel_all_transformations (ds);
812   return ds;
813 }
814
815 /* Finishes up procedure handling. */
816 void
817 destroy_dataset (struct dataset *ds)
818 {
819   discard_variables (ds);
820   dict_destroy (ds->dict);
821   trns_chain_destroy (ds->permanent_trns_chain);
822   free (ds);
823 }
824
825 /* Sets SINK as the destination for procedure output from the
826    next procedure. */
827 void
828 proc_set_sink (struct dataset *ds, struct case_sink *sink) 
829 {
830   assert (ds->proc_sink == NULL);
831   ds->proc_sink = sink;
832 }
833
834 /* Sets SOURCE as the source for procedure input for the next
835    procedure. */
836 void
837 proc_set_source (struct dataset *ds, struct case_source *source) 
838 {
839   assert (ds->proc_source == NULL);
840   ds->proc_source = source;
841 }
842
843 /* Returns true if a source for the next procedure has been
844    configured, false otherwise. */
845 bool
846 proc_has_source (const struct dataset *ds) 
847 {
848   return ds->proc_source != NULL;
849 }
850
851 /* Returns the output from the previous procedure.
852    For use only immediately after executing a procedure.
853    The returned casefile is owned by the caller; it will not be
854    automatically used for the next procedure's input. */
855 struct casefile *
856 proc_capture_output (struct dataset *ds) 
857 {
858   struct casefile *casefile;
859
860   /* Try to make sure that this function is called immediately
861      after procedure() or a similar function. */
862   assert (ds->proc_source != NULL);
863   assert (case_source_is_class (ds->proc_source, &storage_source_class));
864   assert (trns_chain_is_empty (ds->permanent_trns_chain));
865   assert (!proc_in_temporary_transformations (ds));
866
867   casefile = storage_source_decapsulate (ds->proc_source);
868   ds->proc_source = NULL;
869
870   return casefile;
871 }
872 \f
873 static trns_proc_func case_limit_trns_proc;
874 static trns_free_func case_limit_trns_free;
875
876 /* Adds a transformation that limits the number of cases that may
877    pass through, if DS->DICT has a case limit. */
878 static void
879 add_case_limit_trns (struct dataset *ds) 
880 {
881   size_t case_limit = dict_get_case_limit (ds->dict);
882   if (case_limit != 0)
883     {
884       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
885       *cases_remaining = case_limit;
886       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
887                           cases_remaining);
888       dict_set_case_limit (ds->dict, 0);
889     }
890 }
891
892 /* Limits the maximum number of cases processed to
893    *CASES_REMAINING. */
894 static int
895 case_limit_trns_proc (void *cases_remaining_,
896                       struct ccase *c UNUSED, casenumber case_nr UNUSED) 
897 {
898   size_t *cases_remaining = cases_remaining_;
899   if (*cases_remaining > 0) 
900     {
901       (*cases_remaining)--;
902       return TRNS_CONTINUE;
903     }
904   else
905     return TRNS_DROP_CASE;
906 }
907
908 /* Frees the data associated with a case limit transformation. */
909 static bool
910 case_limit_trns_free (void *cases_remaining_) 
911 {
912   size_t *cases_remaining = cases_remaining_;
913   free (cases_remaining);
914   return true;
915 }
916 \f
917 static trns_proc_func filter_trns_proc;
918
919 /* Adds a temporary transformation to filter data according to
920    the variable specified on FILTER, if any. */
921 static void
922 add_filter_trns (struct dataset *ds) 
923 {
924   struct variable *filter_var = dict_get_filter (ds->dict);
925   if (filter_var != NULL) 
926     {
927       proc_start_temporary_transformations (ds);
928       add_transformation (ds, filter_trns_proc, NULL, filter_var);
929     }
930 }
931
932 /* FILTER transformation. */
933 static int
934 filter_trns_proc (void *filter_var_,
935                   struct ccase *c UNUSED, casenumber case_nr UNUSED) 
936   
937 {
938   struct variable *filter_var = filter_var_;
939   double f = case_num (c, filter_var);
940   return (f != 0.0 && !var_is_num_missing (filter_var, f)
941           ? TRNS_CONTINUE : TRNS_DROP_CASE);
942 }
943
944
945 struct dictionary *
946 dataset_dict (const struct dataset *ds)
947 {
948   return ds->dict;
949 }
950
951
952 void 
953 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
954 {
955   ds->dict = dict;
956 }
957
958 int 
959 dataset_n_lag (const struct dataset *ds)
960 {
961   return ds->n_lag;
962 }
963
964 void 
965 dataset_set_n_lag (struct dataset *ds, int n_lag)
966 {
967   ds->n_lag = n_lag;
968 }
969
970