422d0ecdc9b8b3e48483bb4216825c5a88fe99b9
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 /* Procedure execution data. */
43 struct write_case_data
44   {
45     /* Function to call for each case. */
46     case_func *proc;
47     void *aux;
48
49     struct dataset *dataset;    /* The dataset concerned */
50     struct ccase trns_case;     /* Case used for transformations. */
51     struct ccase sink_case;     /* Case written to sink, if
52                                    compacting is necessary. */
53     size_t cases_written;       /* Cases output so far. */
54   };
55
56 struct dataset {
57   /* Cases are read from proc_source,
58      pass through permanent_trns_chain (which transforms them into
59      the format described by permanent_dict),
60      are written to proc_sink,
61      pass through temporary_trns_chain (which transforms them into
62      the format described by dict),
63      and are finally passed to the procedure. */
64   struct case_source *proc_source;
65   struct trns_chain *permanent_trns_chain;
66   struct dictionary *permanent_dict;
67   struct case_sink *proc_sink;
68   struct trns_chain *temporary_trns_chain;
69   struct dictionary *dict;
70
71   /* The transformation chain that the next transformation will be
72      added to. */
73   struct trns_chain *cur_trns_chain;
74
75   /* The compactor used to compact a case, if necessary;
76      otherwise a null pointer. */
77   struct dict_compactor *compactor;
78
79   /* Time at which proc was last invoked. */
80   time_t last_proc_invocation;
81
82   /* Lag queue. */
83   int n_lag;                    /* Number of cases to lag. */
84   int lag_count;                /* Number of cases in lag_queue so far. */
85   int lag_head;         /* Index where next case will be added. */
86   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
87
88 }; /* struct dataset */
89
90
91 static void add_case_limit_trns (struct dataset *ds);
92 static void add_filter_trns (struct dataset *ds);
93
94 static bool internal_procedure (struct dataset *ds, case_func *,
95                                 end_func *,
96                                 void *aux);
97 static void update_last_proc_invocation (struct dataset *ds);
98 static void create_trns_case (struct ccase *, struct dictionary *);
99 static void open_active_file (struct dataset *ds);
100 static bool write_case (struct write_case_data *wc_data);
101 static void lag_case (struct dataset *ds, const struct ccase *c);
102 static void clear_case (const struct dataset *ds, struct ccase *c);
103 static bool close_active_file (struct dataset *ds);
104 \f
105 /* Public functions. */
106
107 /* Returns the last time the data was read. */
108 time_t
109 time_of_last_procedure (struct dataset *ds) 
110 {
111   if (ds->last_proc_invocation == 0)
112     update_last_proc_invocation (ds);
113   return ds->last_proc_invocation;
114 }
115 \f
116 /* Regular procedure. */
117
118
119
120 /* Reads the data from the input program and writes it to a new
121    active file.  For each case we read from the input program, we
122    do the following:
123
124    1. Execute permanent transformations.  If these drop the case,
125       start the next case from step 1.
126
127    2. Write case to replacement active file.
128    
129    3. Execute temporary transformations.  If these drop the case,
130       start the next case from step 1.
131       
132    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
133
134    Returns true if successful, false if an I/O error occurred. */
135 bool
136 procedure (struct dataset *ds, case_func *cf, void *aux)
137 {
138   return internal_procedure (ds, cf, NULL, aux);
139 }
140 \f
141 /* Multipass procedure. */
142
143 struct multipass_aux_data 
144   {
145     struct casefile *casefile;
146     
147     bool (*proc_func) (const struct casefile *, void *aux);
148     void *aux;
149   };
150
151 /* Case processing function for multipass_procedure(). */
152 static bool
153 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
154 {
155   struct multipass_aux_data *aux_data = aux_data_;
156   return casefile_append (aux_data->casefile, c);
157 }
158
159 /* End-of-file function for multipass_procedure(). */
160 static bool
161 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
162 {
163   struct multipass_aux_data *aux_data = aux_data_;
164   return (aux_data->proc_func == NULL
165           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
166 }
167
168 /* Procedure that allows multiple passes over the input data.
169    The entire active file is passed to PROC_FUNC, with the given
170    AUX as auxiliary data, as a unit. */
171 bool
172 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
173 {
174   struct multipass_aux_data aux_data;
175   bool ok;
176
177   aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
178   aux_data.proc_func = proc_func;
179   aux_data.aux = aux;
180
181   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
182   ok = !casefile_error (aux_data.casefile) && ok;
183
184   casefile_destroy (aux_data.casefile);
185
186   return ok;
187 }
188 \f
189 /* Procedure implementation. */
190
191
192 /* Executes a procedure.
193    Passes each case to CASE_FUNC.
194    Calls END_FUNC after the last case.
195    Returns true if successful, false if an I/O error occurred (or
196    if CASE_FUNC or END_FUNC ever returned false). */
197 static bool
198 internal_procedure (struct dataset *ds, case_func *proc,
199                     end_func *end,
200                     void *aux) 
201 {
202   struct write_case_data wc_data;
203   bool ok = true;
204
205   assert (ds->proc_source != NULL);
206
207   update_last_proc_invocation (ds);
208
209   /* Optimize the trivial case where we're not going to do
210      anything with the data, by not reading the data at all. */
211   if (proc == NULL && end == NULL
212       && case_source_is_class (ds->proc_source, &storage_source_class)
213       && ds->proc_sink == NULL
214       && (ds->temporary_trns_chain == NULL
215           || trns_chain_is_empty (ds->temporary_trns_chain))
216       && trns_chain_is_empty (ds->permanent_trns_chain))
217     {
218       ds->n_lag = 0;
219       dict_set_case_limit (ds->dict, 0);
220       dict_clear_vectors (ds->dict);
221       return true;
222     }
223   
224   open_active_file (ds);
225   
226   wc_data.proc = proc;
227   wc_data.aux = aux;
228   wc_data.dataset = ds;
229   create_trns_case (&wc_data.trns_case, ds->dict);
230   case_create (&wc_data.sink_case,
231                dict_get_compacted_value_cnt (ds->dict));
232   wc_data.cases_written = 0;
233
234   ok = ds->proc_source->class->read (ds->proc_source,
235                                  &wc_data.trns_case,
236                                  write_case, &wc_data) && ok;
237   if (end != NULL)
238     ok = end (aux, ds) && ok;
239
240   case_destroy (&wc_data.sink_case);
241   case_destroy (&wc_data.trns_case);
242
243   ok = close_active_file (ds) && ok;
244
245   return ok;
246 }
247
248 /* Updates last_proc_invocation. */
249 static void
250 update_last_proc_invocation (struct dataset *ds) 
251 {
252   ds->last_proc_invocation = time (NULL);
253 }
254
255 /* Creates and returns a case, initializing it from the vectors
256    that say which `value's need to be initialized just once, and
257    which ones need to be re-initialized before every case. */
258 static void
259 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
260 {
261   size_t var_cnt = dict_get_var_cnt (dict);
262   size_t i;
263
264   case_create (trns_case, dict_get_next_value_idx (dict));
265   for (i = 0; i < var_cnt; i++) 
266     {
267       struct variable *v = dict_get_var (dict, i);
268       union value *value = case_data_rw (trns_case, v);
269
270       if (var_is_numeric (v))
271         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
272       else
273         memset (value->s, ' ', var_get_width (v));
274     }
275 }
276
277 /* Makes all preparations for reading from the data source and writing
278    to the data sink. */
279 static void
280 open_active_file (struct dataset *ds)
281 {
282   add_case_limit_trns (ds);
283   add_filter_trns (ds);
284
285   /* Finalize transformations. */
286   trns_chain_finalize (ds->cur_trns_chain);
287
288   /* Make permanent_dict refer to the dictionary right before
289      data reaches the sink. */
290   if (ds->permanent_dict == NULL)
291     ds->permanent_dict = ds->dict;
292
293   /* Figure out whether to compact. */
294   ds->compactor = 
295     (dict_compacting_would_shrink (ds->permanent_dict)
296      ? dict_make_compactor (ds->permanent_dict)
297      : NULL);
298
299   /* Prepare sink. */
300   if (ds->proc_sink == NULL)
301     ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
302   if (ds->proc_sink->class->open != NULL)
303     ds->proc_sink->class->open (ds->proc_sink);
304
305   /* Allocate memory for lag queue. */
306   if (ds->n_lag > 0)
307     {
308       int i;
309   
310       ds->lag_count = 0;
311       ds->lag_head = 0;
312       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
313       for (i = 0; i < ds->n_lag; i++)
314         case_nullify (&ds->lag_queue[i]);
315     }
316 }
317
318 /* Transforms trns_case and writes it to the replacement active
319    file if advisable.  Returns true if more cases can be
320    accepted, false otherwise.  Do not call this function again
321    after it has returned false once.  */
322 static bool
323 write_case (struct write_case_data *wc_data)
324 {
325   enum trns_result retval;
326   size_t case_nr;
327
328   struct dataset *ds = wc_data->dataset;
329   
330   /* Execute permanent transformations.  */
331   case_nr = wc_data->cases_written + 1;
332   retval = trns_chain_execute (ds->permanent_trns_chain,
333                                &wc_data->trns_case, &case_nr);
334   if (retval != TRNS_CONTINUE)
335     goto done;
336
337   /* Write case to LAG queue. */
338   if (ds->n_lag)
339     lag_case (ds, &wc_data->trns_case);
340
341   /* Write case to replacement active file. */
342   wc_data->cases_written++;
343   if (ds->proc_sink->class->write != NULL) 
344     {
345       if (ds->compactor != NULL) 
346         {
347           dict_compactor_compact (ds->compactor, &wc_data->sink_case,
348                                   &wc_data->trns_case);
349           ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
350         }
351       else
352         ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
353     }
354   
355   /* Execute temporary transformations. */
356   if (ds->temporary_trns_chain != NULL) 
357     {
358       retval = trns_chain_execute (ds->temporary_trns_chain,
359                                    &wc_data->trns_case,
360                                    &wc_data->cases_written);
361       if (retval != TRNS_CONTINUE)
362         goto done;
363     }
364
365   /* Pass case to procedure. */
366   if (wc_data->proc != NULL)
367     if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
368       retval = TRNS_ERROR;
369
370  done:
371   clear_case (ds, &wc_data->trns_case);
372   return retval != TRNS_ERROR;
373 }
374
375 /* Add C to the lag queue. */
376 static void
377 lag_case (struct dataset *ds, const struct ccase *c)
378 {
379   if (ds->lag_count < ds->n_lag)
380     ds->lag_count++;
381   case_destroy (&ds->lag_queue[ds->lag_head]);
382   case_clone (&ds->lag_queue[ds->lag_head], c);
383   if (++ds->lag_head >= ds->n_lag)
384     ds->lag_head = 0;
385 }
386
387 /* Clears the variables in C that need to be cleared between
388    processing cases.  */
389 static void
390 clear_case (const struct dataset *ds, struct ccase *c)
391 {
392   size_t var_cnt = dict_get_var_cnt (ds->dict);
393   size_t i;
394   
395   for (i = 0; i < var_cnt; i++) 
396     {
397       struct variable *v = dict_get_var (ds->dict, i);
398       if (!var_get_leave (v)) 
399         {
400           if (var_is_numeric (v))
401             case_data_rw (c, v)->f = SYSMIS; 
402           else
403             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
404         } 
405     }
406 }
407
408 /* Closes the active file. */
409 static bool
410 close_active_file (struct dataset *ds)
411 {
412   /* Free memory for lag queue, and turn off lagging. */
413   if (ds->n_lag > 0)
414     {
415       int i;
416       
417       for (i = 0; i < ds->n_lag; i++)
418         case_destroy (&ds->lag_queue[i]);
419       free (ds->lag_queue);
420       ds->n_lag = 0;
421     }
422   
423   /* Dictionary from before TEMPORARY becomes permanent. */
424   proc_cancel_temporary_transformations (ds);
425
426   /* Finish compacting. */
427   if (ds->compactor != NULL) 
428     {
429       dict_compactor_destroy (ds->compactor);
430       dict_compact_values (ds->dict);
431       ds->compactor = NULL;
432     }
433     
434   /* Free data source. */
435   free_case_source (ds->proc_source);
436   ds->proc_source = NULL;
437
438   /* Old data sink becomes new data source. */
439   if (ds->proc_sink->class->make_source != NULL)
440     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
441   free_case_sink (ds->proc_sink);
442   ds->proc_sink = NULL;
443
444   dict_clear_vectors (ds->dict);
445   ds->permanent_dict = NULL;
446   return proc_cancel_all_transformations (ds);
447 }
448 \f
449 /* Returns a pointer to the lagged case from N_BEFORE cases before the
450    current one, or NULL if there haven't been that many cases yet. */
451 struct ccase *
452 lagged_case (const struct dataset *ds, int n_before)
453 {
454   assert (n_before >= 1 );
455   assert (n_before <= ds->n_lag);
456
457   if (n_before <= ds->lag_count)
458     {
459       int index = ds->lag_head - n_before;
460       if (index < 0)
461         index += ds->n_lag;
462       return &ds->lag_queue[index];
463     }
464   else
465     return NULL;
466 }
467 \f
468 /* Procedure that separates the data into SPLIT FILE groups. */
469
470 /* Represents auxiliary data for handling SPLIT FILE. */
471 struct split_aux_data 
472   {
473     struct dataset *dataset;    /* The dataset */
474     struct ccase prev_case;     /* Data in previous case. */
475
476     /* Callback functions. */
477     begin_func *begin; 
478     case_func *proc;
479     end_func *end;
480     void *func_aux;
481   };
482
483 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
484 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
485 static bool split_procedure_end_func (void *, const struct dataset *);
486
487 /* Like procedure(), but it automatically breaks the case stream
488    into SPLIT FILE break groups.  Before each group of cases with
489    identical SPLIT FILE variable values, BEGIN_FUNC is called
490    with the first case in the group.
491    Then PROC_FUNC is called for each case in the group (including
492    the first).
493    END_FUNC is called when the group is finished.  FUNC_AUX is
494    passed to each of the functions as auxiliary data.
495
496    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
497    and END_FUNC will be called at all. 
498
499    If SPLIT FILE is not in effect, then there is one break group
500    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
501    will be called once.
502    
503    Returns true if successful, false if an I/O error occurred. */
504 bool
505 procedure_with_splits (struct dataset *ds,
506                        begin_func begin, 
507                        case_func *proc,
508                        end_func *end,
509                        void *func_aux) 
510 {
511   struct split_aux_data split_aux;
512   bool ok;
513
514   case_nullify (&split_aux.prev_case);
515   split_aux.begin = begin;
516   split_aux.proc = proc;
517   split_aux.end = end;
518   split_aux.func_aux = func_aux;
519   split_aux.dataset = ds;
520
521   ok = internal_procedure (ds, split_procedure_case_func,
522                            split_procedure_end_func, &split_aux);
523
524   case_destroy (&split_aux.prev_case);
525
526   return ok;
527 }
528
529 /* Case callback used by procedure_with_splits(). */
530 static bool
531 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
532 {
533   struct split_aux_data *split_aux = split_aux_;
534
535   /* Start a new series if needed. */
536   if (case_is_null (&split_aux->prev_case)
537       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
538     {
539       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
540         split_aux->end (split_aux->func_aux, ds);
541
542       case_destroy (&split_aux->prev_case);
543       case_clone (&split_aux->prev_case, c);
544
545       if (split_aux->begin != NULL)
546         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
547     }
548
549   return (split_aux->proc == NULL
550           || split_aux->proc (c, split_aux->func_aux, ds));
551 }
552
553 /* End-of-file callback used by procedure_with_splits(). */
554 static bool
555 split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
556 {
557   struct split_aux_data *split_aux = split_aux_;
558
559   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
560     split_aux->end (split_aux->func_aux, ds);
561   return true;
562 }
563
564 /* Compares the SPLIT FILE variables in cases A and B and returns
565    nonzero only if they differ. */
566 static int
567 equal_splits (const struct ccase *a, const struct ccase *b, 
568               const struct dataset *ds) 
569 {
570   return case_compare (a, b,
571                        dict_get_split_vars (ds->dict),
572                        dict_get_split_cnt (ds->dict)) == 0;
573 }
574 \f
575 /* Multipass procedure that separates the data into SPLIT FILE
576    groups. */
577
578 /* Represents auxiliary data for handling SPLIT FILE in a
579    multipass procedure. */
580 struct multipass_split_aux_data 
581   {
582     struct dataset *dataset;    /* The dataset of the split */
583     struct ccase prev_case;     /* Data in previous case. */
584     struct casefile *casefile;  /* Accumulates data for a split. */
585     split_func *split;          /* Function to call with the accumulated 
586                                    data. */
587     void *func_aux;             /* Auxiliary data. */ 
588   };
589
590 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
591 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
592 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
593
594 /* Returns true if successful, false if an I/O error occurred. */
595 bool
596 multipass_procedure_with_splits (struct dataset *ds, 
597                                  split_func  *split,
598                                  void *func_aux)
599 {
600   struct multipass_split_aux_data aux;
601   bool ok;
602
603   case_nullify (&aux.prev_case);
604   aux.casefile = NULL;
605   aux.split = split;
606   aux.func_aux = func_aux;
607   aux.dataset = ds;
608
609   ok = internal_procedure (ds, multipass_split_case_func,
610                            multipass_split_end_func, &aux);
611   case_destroy (&aux.prev_case);
612
613   return ok;
614 }
615
616 /* Case callback used by multipass_procedure_with_splits(). */
617 static bool
618 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
619 {
620   struct multipass_split_aux_data *aux = aux_;
621   bool ok = true;
622
623   /* Start a new series if needed. */
624   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
625     {
626       /* Record split values. */
627       case_destroy (&aux->prev_case);
628       case_clone (&aux->prev_case, c);
629
630       /* Pass any cases to split_func. */
631       if (aux->casefile != NULL)
632         ok = multipass_split_output (aux, ds);
633
634       /* Start a new casefile. */
635       aux->casefile = 
636         fastfile_create (dict_get_next_value_idx (ds->dict));
637     }
638
639   return casefile_append (aux->casefile, c) && ok;
640 }
641
642 /* End-of-file callback used by multipass_procedure_with_splits(). */
643 static bool
644 multipass_split_end_func (void *aux_, const struct dataset *ds)
645 {
646   struct multipass_split_aux_data *aux = aux_;
647   return (aux->casefile == NULL || multipass_split_output (aux, ds));
648 }
649
650 static bool
651 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
652 {
653   bool ok;
654   
655   assert (aux->casefile != NULL);
656   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
657   casefile_destroy (aux->casefile);
658   aux->casefile = NULL;
659
660   return ok;
661 }
662 \f
663 /* Discards all the current state in preparation for a data-input
664    command like DATA LIST or GET. */
665 void
666 discard_variables (struct dataset *ds)
667 {
668   dict_clear (ds->dict);
669   fh_set_default_handle (NULL);
670
671   ds->n_lag = 0;
672   
673   free_case_source (ds->proc_source);
674   ds->proc_source = NULL;
675
676   proc_cancel_all_transformations (ds);
677 }
678 \f
679 /* Returns the current set of permanent transformations,
680    and clears the permanent transformations.
681    For use by INPUT PROGRAM. */
682 struct trns_chain *
683 proc_capture_transformations (struct dataset *ds) 
684 {
685   struct trns_chain *chain;
686   
687   assert (ds->temporary_trns_chain == NULL);
688   chain = ds->permanent_trns_chain;
689   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
690   return chain;
691 }
692
693 /* Adds a transformation that processes a case with PROC and
694    frees itself with FREE to the current set of transformations.
695    The functions are passed AUX as auxiliary data. */
696 void
697 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
698 {
699   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
700 }
701
702 /* Adds a transformation that processes a case with PROC and
703    frees itself with FREE to the current set of transformations.
704    When parsing of the block of transformations is complete,
705    FINALIZE will be called.
706    The functions are passed AUX as auxiliary data. */
707 void
708 add_transformation_with_finalizer (struct dataset *ds, 
709                                    trns_finalize_func *finalize,
710                                    trns_proc_func *proc,
711                                    trns_free_func *free, void *aux)
712 {
713   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
714 }
715
716 /* Returns the index of the next transformation.
717    This value can be returned by a transformation procedure
718    function to indicate a "jump" to that transformation. */
719 size_t
720 next_transformation (const struct dataset *ds) 
721 {
722   return trns_chain_next (ds->cur_trns_chain);
723 }
724
725 /* Returns true if the next call to add_transformation() will add
726    a temporary transformation, false if it will add a permanent
727    transformation. */
728 bool
729 proc_in_temporary_transformations (const struct dataset *ds) 
730 {
731   return ds->temporary_trns_chain != NULL;
732 }
733
734 /* Marks the start of temporary transformations.
735    Further calls to add_transformation() will add temporary
736    transformations. */
737 void
738 proc_start_temporary_transformations (struct dataset *ds) 
739 {
740   if (!proc_in_temporary_transformations (ds))
741     {
742       add_case_limit_trns (ds);
743
744       ds->permanent_dict = dict_clone (ds->dict);
745       trns_chain_finalize (ds->permanent_trns_chain);
746       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
747     }
748 }
749
750 /* Converts all the temporary transformations, if any, to
751    permanent transformations.  Further transformations will be
752    permanent.
753    Returns true if anything changed, false otherwise. */
754 bool
755 proc_make_temporary_transformations_permanent (struct dataset *ds) 
756 {
757   if (proc_in_temporary_transformations (ds)) 
758     {
759       trns_chain_finalize (ds->temporary_trns_chain);
760       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
761       ds->temporary_trns_chain = NULL;
762
763       dict_destroy (ds->permanent_dict);
764       ds->permanent_dict = NULL;
765
766       return true;
767     }
768   else
769     return false;
770 }
771
772 /* Cancels all temporary transformations, if any.  Further
773    transformations will be permanent.
774    Returns true if anything changed, false otherwise. */
775 bool
776 proc_cancel_temporary_transformations (struct dataset *ds) 
777 {
778   if (proc_in_temporary_transformations (ds)) 
779     {
780       dict_destroy (ds->dict);
781       ds->dict = ds->permanent_dict;
782       ds->permanent_dict = NULL;
783
784       trns_chain_destroy (ds->temporary_trns_chain);
785       ds->temporary_trns_chain = NULL;
786
787       return true;
788     }
789   else
790     return false;
791 }
792
793 /* Cancels all transformations, if any.
794    Returns true if successful, false on I/O error. */
795 bool
796 proc_cancel_all_transformations (struct dataset *ds)
797 {
798   bool ok;
799   ok = trns_chain_destroy (ds->permanent_trns_chain);
800   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
801   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
802   ds->temporary_trns_chain = NULL;
803   return ok;
804 }
805 \f
806 /* Initializes procedure handling. */
807 struct dataset *
808 create_dataset (void)
809 {
810   struct dataset *ds = xzalloc (sizeof(*ds));
811   ds->dict = dict_create ();
812   proc_cancel_all_transformations (ds);
813   return ds;
814 }
815
816 /* Finishes up procedure handling. */
817 void
818 destroy_dataset (struct dataset *ds)
819 {
820   discard_variables (ds);
821   dict_destroy (ds->dict);
822   trns_chain_destroy (ds->permanent_trns_chain);
823   free (ds);
824 }
825
826 /* Sets SINK as the destination for procedure output from the
827    next procedure. */
828 void
829 proc_set_sink (struct dataset *ds, struct case_sink *sink) 
830 {
831   assert (ds->proc_sink == NULL);
832   ds->proc_sink = sink;
833 }
834
835 /* Sets SOURCE as the source for procedure input for the next
836    procedure. */
837 void
838 proc_set_source (struct dataset *ds, struct case_source *source) 
839 {
840   assert (ds->proc_source == NULL);
841   ds->proc_source = source;
842 }
843
844 /* Returns true if a source for the next procedure has been
845    configured, false otherwise. */
846 bool
847 proc_has_source (const struct dataset *ds) 
848 {
849   return ds->proc_source != NULL;
850 }
851
852 /* Returns the output from the previous procedure.
853    For use only immediately after executing a procedure.
854    The returned casefile is owned by the caller; it will not be
855    automatically used for the next procedure's input. */
856 struct casefile *
857 proc_capture_output (struct dataset *ds) 
858 {
859   struct casefile *casefile;
860
861   /* Try to make sure that this function is called immediately
862      after procedure() or a similar function. */
863   assert (ds->proc_source != NULL);
864   assert (case_source_is_class (ds->proc_source, &storage_source_class));
865   assert (trns_chain_is_empty (ds->permanent_trns_chain));
866   assert (!proc_in_temporary_transformations (ds));
867
868   casefile = storage_source_decapsulate (ds->proc_source);
869   ds->proc_source = NULL;
870
871   return casefile;
872 }
873 \f
874 static trns_proc_func case_limit_trns_proc;
875 static trns_free_func case_limit_trns_free;
876
877 /* Adds a transformation that limits the number of cases that may
878    pass through, if DS->DICT has a case limit. */
879 static void
880 add_case_limit_trns (struct dataset *ds) 
881 {
882   size_t case_limit = dict_get_case_limit (ds->dict);
883   if (case_limit != 0)
884     {
885       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
886       *cases_remaining = case_limit;
887       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
888                           cases_remaining);
889       dict_set_case_limit (ds->dict, 0);
890     }
891 }
892
893 /* Limits the maximum number of cases processed to
894    *CASES_REMAINING. */
895 static int
896 case_limit_trns_proc (void *cases_remaining_,
897                       struct ccase *c UNUSED, casenumber case_nr UNUSED) 
898 {
899   size_t *cases_remaining = cases_remaining_;
900   if (*cases_remaining > 0) 
901     {
902       (*cases_remaining)--;
903       return TRNS_CONTINUE;
904     }
905   else
906     return TRNS_DROP_CASE;
907 }
908
909 /* Frees the data associated with a case limit transformation. */
910 static bool
911 case_limit_trns_free (void *cases_remaining_) 
912 {
913   size_t *cases_remaining = cases_remaining_;
914   free (cases_remaining);
915   return true;
916 }
917 \f
918 static trns_proc_func filter_trns_proc;
919
920 /* Adds a temporary transformation to filter data according to
921    the variable specified on FILTER, if any. */
922 static void
923 add_filter_trns (struct dataset *ds) 
924 {
925   struct variable *filter_var = dict_get_filter (ds->dict);
926   if (filter_var != NULL) 
927     {
928       proc_start_temporary_transformations (ds);
929       add_transformation (ds, filter_trns_proc, NULL, filter_var);
930     }
931 }
932
933 /* FILTER transformation. */
934 static int
935 filter_trns_proc (void *filter_var_,
936                   struct ccase *c UNUSED, casenumber case_nr UNUSED) 
937   
938 {
939   struct variable *filter_var = filter_var_;
940   double f = case_num (c, filter_var);
941   return (f != 0.0 && !var_is_num_missing (filter_var, f)
942           ? TRNS_CONTINUE : TRNS_DROP_CASE);
943 }
944
945
946 struct dictionary *
947 dataset_dict (const struct dataset *ds)
948 {
949   return ds->dict;
950 }
951
952
953 void 
954 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
955 {
956   ds->dict = dict;
957 }
958
959 int 
960 dataset_n_lag (const struct dataset *ds)
961 {
962   return ds->n_lag;
963 }
964
965 void 
966 dataset_set_n_lag (struct dataset *ds, int n_lag)
967 {
968   ds->n_lag = n_lag;
969 }
970
971