Applied patch #5653, which adds callbacks to dataset whenever its dictionary or
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 struct dataset {
42
43   /* An abstract factory which creates casefiles */
44   struct casefile_factory *cf_factory;
45
46   /* Callback which occurs when a procedure provides a new source for
47      the dataset */
48   replace_source_callback *replace_source ;
49
50   /* Callback which occurs whenever the DICT is replaced by a new one */
51   replace_dictionary_callback *replace_dict;
52
53   /* Cases are read from proc_source,
54      pass through permanent_trns_chain (which transforms them into
55      the format described by permanent_dict),
56      are written to proc_sink,
57      pass through temporary_trns_chain (which transforms them into
58      the format described by dict),
59      and are finally passed to the procedure. */
60   struct case_source *proc_source;
61   struct trns_chain *permanent_trns_chain;
62   struct dictionary *permanent_dict;
63   struct case_sink *proc_sink;
64   struct trns_chain *temporary_trns_chain;
65   struct dictionary *dict;
66
67   /* The transformation chain that the next transformation will be
68      added to. */
69   struct trns_chain *cur_trns_chain;
70
71   /* The compactor used to compact a case, if necessary;
72      otherwise a null pointer. */
73   struct dict_compactor *compactor;
74
75   /* Time at which proc was last invoked. */
76   time_t last_proc_invocation;
77
78   /* Lag queue. */
79   int n_lag;                    /* Number of cases to lag. */
80   int lag_count;                /* Number of cases in lag_queue so far. */
81   int lag_head;         /* Index where next case will be added. */
82   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
83
84   /* Procedure data. */
85   bool is_open;               /* Procedure open? */
86   struct ccase trns_case;     /* Case used for transformations. */
87   struct ccase sink_case;     /* Case written to sink, if
88                                  compacting is necessary. */
89   size_t cases_written;       /* Cases output so far. */
90   bool ok;
91 }; /* struct dataset */
92
93
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
96
97 static bool internal_procedure (struct dataset *ds, case_func *,
98                                 end_func *,
99                                 void *aux);
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
106 \f
107 /* Public functions. */
108
109 /* Returns the last time the data was read. */
110 time_t
111 time_of_last_procedure (struct dataset *ds) 
112 {
113   if (ds->last_proc_invocation == 0)
114     update_last_proc_invocation (ds);
115   return ds->last_proc_invocation;
116 }
117 \f
118 /* Regular procedure. */
119
120
121
122 /* Reads the data from the input program and writes it to a new
123    active file.  For each case we read from the input program, we
124    do the following:
125
126    1. Execute permanent transformations.  If these drop the case,
127       start the next case from step 1.
128
129    2. Write case to replacement active file.
130   
131    3. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133      
134    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
135
136    Returns true if successful, false if an I/O error occurred. */
137 bool
138 procedure (struct dataset *ds, case_func *cf, void *aux)
139 {
140   update_last_proc_invocation (ds);
141
142   /* Optimize the trivial case where we're not going to do
143      anything with the data, by not reading the data at all. */
144   if (cf == NULL
145       && case_source_is_class (ds->proc_source, &storage_source_class)
146       && ds->proc_sink == NULL
147       && (ds->temporary_trns_chain == NULL
148           || trns_chain_is_empty (ds->temporary_trns_chain))
149       && trns_chain_is_empty (ds->permanent_trns_chain))
150     {
151       ds->n_lag = 0;
152       dict_set_case_limit (ds->dict, 0);
153       dict_clear_vectors (ds->dict);
154       return true;
155     }
156
157   return internal_procedure (ds, cf, NULL, aux);
158 }
159 \f
160 /* Multipass procedure. */
161
162 struct multipass_aux_data
163   {
164     struct casefile *casefile;
165    
166     bool (*proc_func) (const struct casefile *, void *aux);
167     void *aux;
168   };
169
170 /* Case processing function for multipass_procedure(). */
171 static bool
172 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
173 {
174   struct multipass_aux_data *aux_data = aux_data_;
175   return casefile_append (aux_data->casefile, c);
176 }
177
178 /* End-of-file function for multipass_procedure(). */
179 static bool
180 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
181 {
182   struct multipass_aux_data *aux_data = aux_data_;
183   return (aux_data->proc_func == NULL
184           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
185 }
186
187 /* Procedure that allows multiple passes over the input data.
188    The entire active file is passed to PROC_FUNC, with the given
189    AUX as auxiliary data, as a unit. */
190 bool
191 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
192 {
193   struct multipass_aux_data aux_data;
194   bool ok;
195
196   aux_data.casefile =
197     ds->cf_factory->create_casefile (ds->cf_factory,
198                                      dict_get_next_value_idx (ds->dict));
199
200   aux_data.proc_func = proc_func;
201   aux_data.aux = aux;
202
203   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
204   ok = !casefile_error (aux_data.casefile) && ok;
205
206   casefile_destroy (aux_data.casefile);
207
208   return ok;
209 }
210 \f
211
212 /* Procedure implementation. */
213
214 /* Executes a procedure.
215    Passes each case to CASE_FUNC.
216    Calls END_FUNC after the last case.
217    Returns true if successful, false if an I/O error occurred (or
218    if CASE_FUNC or END_FUNC ever returned false). */
219 static bool
220 internal_procedure (struct dataset *ds, case_func *proc,
221                     end_func *end,
222                     void *aux)
223 {
224   struct ccase *c;
225   bool ok = true;
226
227   proc_open (ds);
228   while (ok && proc_read (ds, &c))
229     if (proc != NULL)
230       ok = proc (c, aux, ds) && ok;
231   if (end != NULL)
232     ok = end (aux, ds) && ok;
233
234   if ( proc_close (ds) && ok )
235     {
236       if ( ds->replace_source )
237         ds->replace_source (ds->proc_source);
238
239       return true;
240     }
241
242   return false;
243 }
244
245 /* Opens dataset DS for reading cases with proc_read.
246    proc_close must be called when done. */
247 void
248 proc_open (struct dataset *ds)
249 {
250   assert (ds->proc_source != NULL);
251   assert (!ds->is_open);
252
253   update_last_proc_invocation (ds);
254
255   open_active_file (ds);
256
257   ds->is_open = true;
258   create_trns_case (&ds->trns_case, ds->dict);
259   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
260   ds->cases_written = 0;
261   ds->ok = true;
262 }
263
264 /* Reads the next case from dataset DS, which must have been
265    opened for reading with proc_open.
266    Returns true if successful, in which case a pointer to the
267    case is stored in *C.
268    Return false at end of file or if a read error occurs.  In
269    this case a null pointer is stored in *C. */
270 bool
271 proc_read (struct dataset *ds, struct ccase **c)
272 {
273   enum trns_result retval = TRNS_DROP_CASE;
274
275   assert (ds->is_open);
276   *c = NULL;
277   for (;;)
278     {
279       size_t case_nr;
280
281       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
282       if (retval == TRNS_ERROR)
283         ds->ok = false;
284       if (!ds->ok)
285         return false;
286
287       /* Read a case from proc_source. */
288       clear_case (ds, &ds->trns_case);
289       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
290         return false;
291
292       /* Execute permanent transformations.  */
293       case_nr = ds->cases_written + 1;
294       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
295                                    &ds->trns_case, &case_nr);
296       if (retval != TRNS_CONTINUE)
297         continue;
298  
299       /* Write case to LAG queue. */
300       if (ds->n_lag)
301         lag_case (ds, &ds->trns_case);
302
303       /* Write case to replacement active file. */
304       ds->cases_written++;
305       if (ds->proc_sink->class->write != NULL)
306         {
307           if (ds->compactor != NULL)
308             {
309               dict_compactor_compact (ds->compactor, &ds->sink_case,
310                                       &ds->trns_case);
311               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
312             }
313           else
314             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
315         }
316  
317       /* Execute temporary transformations. */
318       if (ds->temporary_trns_chain != NULL)
319         {
320           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
321                                        &ds->trns_case, &ds->cases_written);
322           if (retval != TRNS_CONTINUE)
323             continue;
324         }
325
326       *c = &ds->trns_case;
327       return true;
328     }
329 }
330
331 /* Closes dataset DS for reading.
332    Returns true if successful, false if an I/O error occurred
333    while reading or closing the data set.
334    If DS has not been opened, returns true without doing
335    anything else. */
336 bool
337 proc_close (struct dataset *ds)
338 {
339   if (!ds->is_open)
340     return true;
341
342   /* Drain any remaining cases. */
343   while (ds->ok)
344     {
345       struct ccase *c;
346       if (!proc_read (ds, &c))
347         break;
348     }
349   ds->ok = free_case_source (ds->proc_source) && ds->ok;
350   ds->proc_source = NULL;
351
352   case_destroy (&ds->sink_case);
353   case_destroy (&ds->trns_case);
354
355   ds->ok = close_active_file (ds) && ds->ok;
356   ds->is_open = false;
357
358   return ds->ok;
359 }
360
361 /* Updates last_proc_invocation. */
362 static void
363 update_last_proc_invocation (struct dataset *ds)
364 {
365   ds->last_proc_invocation = time (NULL);
366 }
367
368 /* Creates and returns a case, initializing it from the vectors
369    that say which `value's need to be initialized just once, and
370    which ones need to be re-initialized before every case. */
371 static void
372 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
373 {
374   size_t var_cnt = dict_get_var_cnt (dict);
375   size_t i;
376
377   case_create (trns_case, dict_get_next_value_idx (dict));
378   for (i = 0; i < var_cnt; i++)
379     {
380       struct variable *v = dict_get_var (dict, i);
381       union value *value = case_data_rw (trns_case, v);
382
383       if (var_is_numeric (v))
384         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
385       else
386         memset (value->s, ' ', var_get_width (v));
387     }
388 }
389
390 /* Makes all preparations for reading from the data source and writing
391    to the data sink. */
392 static void
393 open_active_file (struct dataset *ds)
394 {
395   add_case_limit_trns (ds);
396   add_filter_trns (ds);
397
398   /* Finalize transformations. */
399   trns_chain_finalize (ds->cur_trns_chain);
400
401   /* Make permanent_dict refer to the dictionary right before
402      data reaches the sink. */
403   if (ds->permanent_dict == NULL)
404     ds->permanent_dict = ds->dict;
405
406   /* Figure out whether to compact. */
407   ds->compactor =
408     (dict_compacting_would_shrink (ds->permanent_dict)
409      ? dict_make_compactor (ds->permanent_dict)
410      : NULL);
411
412   /* Prepare sink. */
413   if (ds->proc_sink == NULL)
414     ds->proc_sink = create_case_sink (&storage_sink_class,
415                                       ds->permanent_dict,
416                                       ds->cf_factory,
417                                       NULL);
418   if (ds->proc_sink->class->open != NULL)
419     ds->proc_sink->class->open (ds->proc_sink);
420
421   /* Allocate memory for lag queue. */
422   if (ds->n_lag > 0)
423     {
424       int i;
425  
426       ds->lag_count = 0;
427       ds->lag_head = 0;
428       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
429       for (i = 0; i < ds->n_lag; i++)
430         case_nullify (&ds->lag_queue[i]);
431     }
432 }
433
434 /* Add C to the lag queue. */
435 static void
436 lag_case (struct dataset *ds, const struct ccase *c)
437 {
438   if (ds->lag_count < ds->n_lag)
439     ds->lag_count++;
440   case_destroy (&ds->lag_queue[ds->lag_head]);
441   case_clone (&ds->lag_queue[ds->lag_head], c);
442   if (++ds->lag_head >= ds->n_lag)
443     ds->lag_head = 0;
444 }
445
446 /* Clears the variables in C that need to be cleared between
447    processing cases.  */
448 static void
449 clear_case (const struct dataset *ds, struct ccase *c)
450 {
451   size_t var_cnt = dict_get_var_cnt (ds->dict);
452   size_t i;
453  
454   for (i = 0; i < var_cnt; i++)
455     {
456       struct variable *v = dict_get_var (ds->dict, i);
457       if (!var_get_leave (v))
458         {
459           if (var_is_numeric (v))
460             case_data_rw (c, v)->f = SYSMIS;
461           else
462             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
463         }
464     }
465 }
466
467 /* Closes the active file. */
468 static bool
469 close_active_file (struct dataset *ds)
470 {
471   /* Free memory for lag queue, and turn off lagging. */
472   if (ds->n_lag > 0)
473     {
474       int i;
475      
476       for (i = 0; i < ds->n_lag; i++)
477         case_destroy (&ds->lag_queue[i]);
478       free (ds->lag_queue);
479       ds->n_lag = 0;
480     }
481  
482   /* Dictionary from before TEMPORARY becomes permanent. */
483   proc_cancel_temporary_transformations (ds);
484
485   /* Finish compacting. */
486   if (ds->compactor != NULL)
487     {
488       dict_compactor_destroy (ds->compactor);
489       dict_compact_values (ds->dict);
490       ds->compactor = NULL;
491     }
492    
493   /* Old data sink becomes new data source. */
494   if (ds->proc_sink->class->make_source != NULL)
495     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
496   free_case_sink (ds->proc_sink);
497   ds->proc_sink = NULL;
498
499   dict_clear_vectors (ds->dict);
500   ds->permanent_dict = NULL;
501   return proc_cancel_all_transformations (ds);
502 }
503 \f
504 /* Returns a pointer to the lagged case from N_BEFORE cases before the
505    current one, or NULL if there haven't been that many cases yet. */
506 struct ccase *
507 lagged_case (const struct dataset *ds, int n_before)
508 {
509   assert (n_before >= 1 );
510   assert (n_before <= ds->n_lag);
511
512   if (n_before <= ds->lag_count)
513     {
514       int index = ds->lag_head - n_before;
515       if (index < 0)
516         index += ds->n_lag;
517       return &ds->lag_queue[index];
518     }
519   else
520     return NULL;
521 }
522 \f
523 /* Procedure that separates the data into SPLIT FILE groups. */
524
525 /* Represents auxiliary data for handling SPLIT FILE. */
526 struct split_aux_data
527   {
528     struct dataset *dataset;    /* The dataset */
529     struct ccase prev_case;     /* Data in previous case. */
530
531     /* Callback functions. */
532     begin_func *begin;
533     case_func *proc;
534     end_func *end;
535     void *func_aux;
536   };
537
538 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
539 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
540 static bool split_procedure_end_func (void *, const struct dataset *);
541
542 /* Like procedure(), but it automatically breaks the case stream
543    into SPLIT FILE break groups.  Before each group of cases with
544    identical SPLIT FILE variable values, BEGIN_FUNC is called
545    with the first case in the group.
546    Then PROC_FUNC is called for each case in the group (including
547    the first).
548    END_FUNC is called when the group is finished.  FUNC_AUX is
549    passed to each of the functions as auxiliary data.
550
551    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
552    and END_FUNC will be called at all.
553
554    If SPLIT FILE is not in effect, then there is one break group
555    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
556    will be called once.
557   
558    Returns true if successful, false if an I/O error occurred. */
559 bool
560 procedure_with_splits (struct dataset *ds,
561                        begin_func begin,
562                        case_func *proc,
563                        end_func *end,
564                        void *func_aux)
565 {
566   struct split_aux_data split_aux;
567   bool ok;
568
569   case_nullify (&split_aux.prev_case);
570   split_aux.begin = begin;
571   split_aux.proc = proc;
572   split_aux.end = end;
573   split_aux.func_aux = func_aux;
574   split_aux.dataset = ds;
575
576   ok = internal_procedure (ds, split_procedure_case_func,
577                            split_procedure_end_func, &split_aux);
578
579   case_destroy (&split_aux.prev_case);
580
581   return ok;
582 }
583
584 /* Case callback used by procedure_with_splits(). */
585 static bool
586 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
587 {
588   struct split_aux_data *split_aux = split_aux_;
589
590   /* Start a new series if needed. */
591   if (case_is_null (&split_aux->prev_case)
592       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
593     {
594       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
595         split_aux->end (split_aux->func_aux, ds);
596
597       case_destroy (&split_aux->prev_case);
598       case_clone (&split_aux->prev_case, c);
599
600       if (split_aux->begin != NULL)
601         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
602     }
603
604   return (split_aux->proc == NULL
605           || split_aux->proc (c, split_aux->func_aux, ds));
606 }
607
608 /* End-of-file callback used by procedure_with_splits(). */
609 static bool
610 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
611 {
612   struct split_aux_data *split_aux = split_aux_;
613
614   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
615     split_aux->end (split_aux->func_aux, ds);
616   return true;
617 }
618
619 /* Compares the SPLIT FILE variables in cases A and B and returns
620    nonzero only if they differ. */
621 static int
622 equal_splits (const struct ccase *a, const struct ccase *b,
623               const struct dataset *ds)
624 {
625   return case_compare (a, b,
626                        dict_get_split_vars (ds->dict),
627                        dict_get_split_cnt (ds->dict)) == 0;
628 }
629 \f
630 /* Multipass procedure that separates the data into SPLIT FILE
631    groups. */
632
633 /* Represents auxiliary data for handling SPLIT FILE in a
634    multipass procedure. */
635 struct multipass_split_aux_data
636   {
637     struct dataset *dataset;    /* The dataset of the split */
638     struct ccase prev_case;     /* Data in previous case. */
639     struct casefile *casefile;  /* Accumulates data for a split. */
640     split_func *split;          /* Function to call with the accumulated
641                                    data. */
642     void *func_aux;             /* Auxiliary data. */
643   };
644
645 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
646 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
647 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
648
649 /* Returns true if successful, false if an I/O error occurred. */
650 bool
651 multipass_procedure_with_splits (struct dataset *ds,
652                                  split_func  *split,
653                                  void *func_aux)
654 {
655   struct multipass_split_aux_data aux;
656   bool ok;
657
658   case_nullify (&aux.prev_case);
659   aux.casefile = NULL;
660   aux.split = split;
661   aux.func_aux = func_aux;
662   aux.dataset = ds;
663
664   ok = internal_procedure (ds, multipass_split_case_func,
665                            multipass_split_end_func, &aux);
666   case_destroy (&aux.prev_case);
667
668   return ok;
669 }
670
671 /* Case callback used by multipass_procedure_with_splits(). */
672 static bool
673 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
674 {
675   struct multipass_split_aux_data *aux = aux_;
676   bool ok = true;
677
678   /* Start a new series if needed. */
679   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
680     {
681       /* Record split values. */
682       case_destroy (&aux->prev_case);
683       case_clone (&aux->prev_case, c);
684
685       /* Pass any cases to split_func. */
686       if (aux->casefile != NULL)
687         ok = multipass_split_output (aux, ds);
688
689       /* Start a new casefile. */
690       aux->casefile =
691         ds->cf_factory->create_casefile (ds->cf_factory,
692                                          dict_get_next_value_idx (ds->dict));
693     }
694
695   return casefile_append (aux->casefile, c) && ok;
696 }
697
698 /* End-of-file callback used by multipass_procedure_with_splits(). */
699 static bool
700 multipass_split_end_func (void *aux_, const struct dataset *ds)
701 {
702   struct multipass_split_aux_data *aux = aux_;
703   return (aux->casefile == NULL || multipass_split_output (aux, ds));
704 }
705
706 static bool
707 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
708 {
709   bool ok;
710  
711   assert (aux->casefile != NULL);
712   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
713   casefile_destroy (aux->casefile);
714   aux->casefile = NULL;
715
716   return ok;
717 }
718 \f
719 /* Discards all the current state in preparation for a data-input
720    command like DATA LIST or GET. */
721 void
722 discard_variables (struct dataset *ds)
723 {
724   dict_clear (ds->dict);
725   fh_set_default_handle (NULL);
726
727   ds->n_lag = 0;
728  
729   free_case_source (ds->proc_source);
730   ds->proc_source = NULL;
731   if ( ds->replace_source )
732     ds->replace_source (ds->proc_source);
733
734
735   proc_cancel_all_transformations (ds);
736 }
737 \f
738 /* Returns the current set of permanent transformations,
739    and clears the permanent transformations.
740    For use by INPUT PROGRAM. */
741 struct trns_chain *
742 proc_capture_transformations (struct dataset *ds)
743 {
744   struct trns_chain *chain;
745  
746   assert (ds->temporary_trns_chain == NULL);
747   chain = ds->permanent_trns_chain;
748   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
749   return chain;
750 }
751
752 /* Adds a transformation that processes a case with PROC and
753    frees itself with FREE to the current set of transformations.
754    The functions are passed AUX as auxiliary data. */
755 void
756 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
757 {
758   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
759 }
760
761 /* Adds a transformation that processes a case with PROC and
762    frees itself with FREE to the current set of transformations.
763    When parsing of the block of transformations is complete,
764    FINALIZE will be called.
765    The functions are passed AUX as auxiliary data. */
766 void
767 add_transformation_with_finalizer (struct dataset *ds,
768                                    trns_finalize_func *finalize,
769                                    trns_proc_func *proc,
770                                    trns_free_func *free, void *aux)
771 {
772   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
773 }
774
775 /* Returns the index of the next transformation.
776    This value can be returned by a transformation procedure
777    function to indicate a "jump" to that transformation. */
778 size_t
779 next_transformation (const struct dataset *ds)
780 {
781   return trns_chain_next (ds->cur_trns_chain);
782 }
783
784 /* Returns true if the next call to add_transformation() will add
785    a temporary transformation, false if it will add a permanent
786    transformation. */
787 bool
788 proc_in_temporary_transformations (const struct dataset *ds)
789 {
790   return ds->temporary_trns_chain != NULL;
791 }
792
793 /* Marks the start of temporary transformations.
794    Further calls to add_transformation() will add temporary
795    transformations. */
796 void
797 proc_start_temporary_transformations (struct dataset *ds)
798 {
799   if (!proc_in_temporary_transformations (ds))
800     {
801       add_case_limit_trns (ds);
802
803       ds->permanent_dict = dict_clone (ds->dict);
804       trns_chain_finalize (ds->permanent_trns_chain);
805       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
806     }
807 }
808
809 /* Converts all the temporary transformations, if any, to
810    permanent transformations.  Further transformations will be
811    permanent.
812    Returns true if anything changed, false otherwise. */
813 bool
814 proc_make_temporary_transformations_permanent (struct dataset *ds)
815 {
816   if (proc_in_temporary_transformations (ds))
817     {
818       trns_chain_finalize (ds->temporary_trns_chain);
819       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
820       ds->temporary_trns_chain = NULL;
821
822       dict_destroy (ds->permanent_dict);
823       ds->permanent_dict = NULL;
824
825       return true;
826     }
827   else
828     return false;
829 }
830
831 /* Cancels all temporary transformations, if any.  Further
832    transformations will be permanent.
833    Returns true if anything changed, false otherwise. */
834 bool
835 proc_cancel_temporary_transformations (struct dataset *ds)
836 {
837   if (proc_in_temporary_transformations (ds))
838     {
839       dict_destroy (ds->dict);
840       ds->dict = ds->permanent_dict;
841       ds->permanent_dict = NULL;
842
843       trns_chain_destroy (ds->temporary_trns_chain);
844       ds->temporary_trns_chain = NULL;
845
846       return true;
847     }
848   else
849     return false;
850 }
851
852 /* Cancels all transformations, if any.
853    Returns true if successful, false on I/O error. */
854 bool
855 proc_cancel_all_transformations (struct dataset *ds)
856 {
857   bool ok;
858   ok = trns_chain_destroy (ds->permanent_trns_chain);
859   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
860   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
861   ds->temporary_trns_chain = NULL;
862   return ok;
863 }
864 \f
865 /* Initializes procedure handling. */
866 struct dataset *
867 create_dataset (struct casefile_factory *fact,
868                 replace_source_callback *rps,
869                 replace_dictionary_callback *rds
870                 )
871 {
872   struct dataset *ds = xzalloc (sizeof(*ds));
873   ds->dict = dict_create ();
874   ds->cf_factory = fact;
875   ds->replace_source = rps;
876   ds->replace_dict = rds;
877   proc_cancel_all_transformations (ds);
878   return ds;
879 }
880
881 /* Finishes up procedure handling. */
882 void
883 destroy_dataset (struct dataset *ds)
884 {
885   discard_variables (ds);
886   dict_destroy (ds->dict);
887   trns_chain_destroy (ds->permanent_trns_chain);
888   free (ds);
889 }
890
891 /* Sets SINK as the destination for procedure output from the
892    next procedure. */
893 void
894 proc_set_sink (struct dataset *ds, struct case_sink *sink)
895 {
896   assert (ds->proc_sink == NULL);
897   ds->proc_sink = sink;
898 }
899
900 /* Sets SOURCE as the source for procedure input for the next
901    procedure. */
902 void
903 proc_set_source (struct dataset *ds, struct case_source *source)
904 {
905   assert (ds->proc_source == NULL);
906   ds->proc_source = source;
907 }
908
909 /* Returns true if a source for the next procedure has been
910    configured, false otherwise. */
911 bool
912 proc_has_source (const struct dataset *ds)
913 {
914   return ds->proc_source != NULL;
915 }
916
917 /* Returns the output from the previous procedure.
918    For use only immediately after executing a procedure.
919    The returned casefile is owned by the caller; it will not be
920    automatically used for the next procedure's input. */
921 struct casefile *
922 proc_capture_output (struct dataset *ds)
923 {
924   struct casefile *casefile;
925
926   /* Try to make sure that this function is called immediately
927      after procedure() or a similar function. */
928   assert (ds->proc_source != NULL);
929   assert (case_source_is_class (ds->proc_source, &storage_source_class));
930   assert (trns_chain_is_empty (ds->permanent_trns_chain));
931   assert (!proc_in_temporary_transformations (ds));
932
933   casefile = storage_source_decapsulate (ds->proc_source);
934   ds->proc_source = NULL;
935
936   return casefile;
937 }
938 \f
939 static trns_proc_func case_limit_trns_proc;
940 static trns_free_func case_limit_trns_free;
941
942 /* Adds a transformation that limits the number of cases that may
943    pass through, if DS->DICT has a case limit. */
944 static void
945 add_case_limit_trns (struct dataset *ds)
946 {
947   size_t case_limit = dict_get_case_limit (ds->dict);
948   if (case_limit != 0)
949     {
950       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
951       *cases_remaining = case_limit;
952       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
953                           cases_remaining);
954       dict_set_case_limit (ds->dict, 0);
955     }
956 }
957
958 /* Limits the maximum number of cases processed to
959    *CASES_REMAINING. */
960 static int
961 case_limit_trns_proc (void *cases_remaining_,
962                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
963 {
964   size_t *cases_remaining = cases_remaining_;
965   if (*cases_remaining > 0)
966     {
967       (*cases_remaining)--;
968       return TRNS_CONTINUE;
969     }
970   else
971     return TRNS_DROP_CASE;
972 }
973
974 /* Frees the data associated with a case limit transformation. */
975 static bool
976 case_limit_trns_free (void *cases_remaining_)
977 {
978   size_t *cases_remaining = cases_remaining_;
979   free (cases_remaining);
980   return true;
981 }
982 \f
983 static trns_proc_func filter_trns_proc;
984
985 /* Adds a temporary transformation to filter data according to
986    the variable specified on FILTER, if any. */
987 static void
988 add_filter_trns (struct dataset *ds)
989 {
990   struct variable *filter_var = dict_get_filter (ds->dict);
991   if (filter_var != NULL)
992     {
993       proc_start_temporary_transformations (ds);
994       add_transformation (ds, filter_trns_proc, NULL, filter_var);
995     }
996 }
997
998 /* FILTER transformation. */
999 static int
1000 filter_trns_proc (void *filter_var_,
1001                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
1002
1003 {
1004   struct variable *filter_var = filter_var_;
1005   double f = case_num (c, filter_var);
1006   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
1007           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1008 }
1009
1010
1011 struct dictionary *
1012 dataset_dict (const struct dataset *ds)
1013 {
1014   return ds->dict;
1015 }
1016
1017
1018 void
1019 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
1020 {
1021   dict_copy_callbacks (dict, ds->dict);
1022   ds->dict = dict;
1023
1024   if ( ds->replace_dict )
1025     ds->replace_dict (dict);
1026 }
1027
1028 int
1029 dataset_n_lag (const struct dataset *ds)
1030 {
1031   return ds->n_lag;
1032 }
1033
1034 void
1035 dataset_set_n_lag (struct dataset *ds, int n_lag)
1036 {
1037   ds->n_lag = n_lag;
1038 }
1039
1040
1041 struct casefile_factory *
1042 dataset_get_casefile_factory (const struct dataset *ds)
1043 {
1044   return ds->cf_factory;
1045 }
1046