Add a deque, implemented as a circular queue, to libpspp.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casedeque.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 struct dataset {
43
44   /* An abstract factory which creates casefiles */
45   struct casefile_factory *cf_factory;
46
47   /* Callback which occurs when a procedure provides a new source for
48      the dataset */
49   replace_source_callback *replace_source ;
50
51   /* Callback which occurs whenever the DICT is replaced by a new one */
52   replace_dictionary_callback *replace_dict;
53
54   /* Cases are read from proc_source,
55      pass through permanent_trns_chain (which transforms them into
56      the format described by permanent_dict),
57      are written to proc_sink,
58      pass through temporary_trns_chain (which transforms them into
59      the format described by dict),
60      and are finally passed to the procedure. */
61   struct case_source *proc_source;
62   struct trns_chain *permanent_trns_chain;
63   struct dictionary *permanent_dict;
64   struct case_sink *proc_sink;
65   struct trns_chain *temporary_trns_chain;
66   struct dictionary *dict;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The compactor used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct dict_compactor *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct casedeque lagged_cases; /* Lagged cases. */
82
83   /* Procedure data. */
84   bool is_open;               /* Procedure open? */
85   struct ccase trns_case;     /* Case used for transformations. */
86   struct ccase sink_case;     /* Case written to sink, if
87                                  compacting is necessary. */
88   size_t cases_written;       /* Cases output so far. */
89   bool ok;
90 }; /* struct dataset */
91
92
93 static void add_case_limit_trns (struct dataset *ds);
94 static void add_filter_trns (struct dataset *ds);
95
96 static bool internal_procedure (struct dataset *ds, case_func *,
97                                 end_func *,
98                                 void *aux);
99 static void update_last_proc_invocation (struct dataset *ds);
100 static void create_trns_case (struct ccase *, struct dictionary *);
101 static void open_active_file (struct dataset *ds);
102 static void clear_case (const struct dataset *ds, struct ccase *c);
103 static bool close_active_file (struct dataset *ds);
104 \f
105 /* Public functions. */
106
107 /* Returns the last time the data was read. */
108 time_t
109 time_of_last_procedure (struct dataset *ds)
110 {
111   if (ds->last_proc_invocation == 0)
112     update_last_proc_invocation (ds);
113   return ds->last_proc_invocation;
114 }
115 \f
116 /* Regular procedure. */
117
118
119
120 /* Reads the data from the input program and writes it to a new
121    active file.  For each case we read from the input program, we
122    do the following:
123
124    1. Execute permanent transformations.  If these drop the case,
125       start the next case from step 1.
126
127    2. Write case to replacement active file.
128
129    3. Execute temporary transformations.  If these drop the case,
130       start the next case from step 1.
131
132    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
133
134    Returns true if successful, false if an I/O error occurred. */
135 bool
136 procedure (struct dataset *ds, case_func *cf, void *aux)
137 {
138   update_last_proc_invocation (ds);
139
140   /* Optimize the trivial case where we're not going to do
141      anything with the data, by not reading the data at all. */
142   if (cf == NULL
143       && case_source_is_class (ds->proc_source, &storage_source_class)
144       && ds->proc_sink == NULL
145       && (ds->temporary_trns_chain == NULL
146           || trns_chain_is_empty (ds->temporary_trns_chain))
147       && trns_chain_is_empty (ds->permanent_trns_chain))
148     {
149       ds->n_lag = 0;
150       dict_set_case_limit (ds->dict, 0);
151       dict_clear_vectors (ds->dict);
152       return true;
153     }
154
155   return internal_procedure (ds, cf, NULL, aux);
156 }
157 \f
158 /* Multipass procedure. */
159
160 struct multipass_aux_data
161   {
162     struct casefile *casefile;
163
164     bool (*proc_func) (const struct casefile *, void *aux);
165     void *aux;
166   };
167
168 /* Case processing function for multipass_procedure(). */
169 static bool
170 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
171 {
172   struct multipass_aux_data *aux_data = aux_data_;
173   return casefile_append (aux_data->casefile, c);
174 }
175
176 /* End-of-file function for multipass_procedure(). */
177 static bool
178 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
179 {
180   struct multipass_aux_data *aux_data = aux_data_;
181   return (aux_data->proc_func == NULL
182           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
183 }
184
185 /* Procedure that allows multiple passes over the input data.
186    The entire active file is passed to PROC_FUNC, with the given
187    AUX as auxiliary data, as a unit. */
188 bool
189 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
190 {
191   struct multipass_aux_data aux_data;
192   bool ok;
193
194   aux_data.casefile =
195     ds->cf_factory->create_casefile (ds->cf_factory,
196                                      dict_get_next_value_idx (ds->dict));
197
198   aux_data.proc_func = proc_func;
199   aux_data.aux = aux;
200
201   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
202   ok = !casefile_error (aux_data.casefile) && ok;
203
204   casefile_destroy (aux_data.casefile);
205
206   return ok;
207 }
208 \f
209
210 /* Procedure implementation. */
211
212 /* Executes a procedure.
213    Passes each case to CASE_FUNC.
214    Calls END_FUNC after the last case.
215    Returns true if successful, false if an I/O error occurred (or
216    if CASE_FUNC or END_FUNC ever returned false). */
217 static bool
218 internal_procedure (struct dataset *ds, case_func *proc,
219                     end_func *end,
220                     void *aux)
221 {
222   struct ccase *c;
223   bool ok = true;
224
225   proc_open (ds);
226   while (ok && proc_read (ds, &c))
227     if (proc != NULL)
228       ok = proc (c, aux, ds) && ok;
229   if (end != NULL)
230     ok = end (aux, ds) && ok;
231
232   if ( proc_close (ds) && ok )
233     {
234
235       return true;
236     }
237
238   return false;
239 }
240
241 /* Opens dataset DS for reading cases with proc_read.
242    proc_close must be called when done. */
243 void
244 proc_open (struct dataset *ds)
245 {
246   assert (ds->proc_source != NULL);
247   assert (!ds->is_open);
248
249   update_last_proc_invocation (ds);
250
251   open_active_file (ds);
252
253   ds->is_open = true;
254   create_trns_case (&ds->trns_case, ds->dict);
255   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
256   ds->cases_written = 0;
257   ds->ok = true;
258 }
259
260 /* Reads the next case from dataset DS, which must have been
261    opened for reading with proc_open.
262    Returns true if successful, in which case a pointer to the
263    case is stored in *C.
264    Return false at end of file or if a read error occurs.  In
265    this case a null pointer is stored in *C. */
266 bool
267 proc_read (struct dataset *ds, struct ccase **c)
268 {
269   enum trns_result retval = TRNS_DROP_CASE;
270
271   assert (ds->is_open);
272   *c = NULL;
273   for (;;)
274     {
275       size_t case_nr;
276
277       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
278       if (retval == TRNS_ERROR)
279         ds->ok = false;
280       if (!ds->ok)
281         return false;
282
283       /* Read a case from proc_source. */
284       clear_case (ds, &ds->trns_case);
285       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
286         return false;
287
288       /* Execute permanent transformations.  */
289       case_nr = ds->cases_written + 1;
290       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
291                                    &ds->trns_case, &case_nr);
292       if (retval != TRNS_CONTINUE)
293         continue;
294
295       /* Write case to collection of lagged cases. */
296       if (ds->n_lag > 0) 
297         {
298           while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
299             case_destroy (casedeque_pop_back (&ds->lagged_cases));
300           case_clone (casedeque_push_front (&ds->lagged_cases),
301                       &ds->trns_case);
302         }
303
304       /* Write case to replacement active file. */
305       ds->cases_written++;
306       if (ds->proc_sink->class->write != NULL)
307         {
308           if (ds->compactor != NULL)
309             {
310               dict_compactor_compact (ds->compactor, &ds->sink_case,
311                                       &ds->trns_case);
312               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
313             }
314           else
315             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
316         }
317
318       /* Execute temporary transformations. */
319       if (ds->temporary_trns_chain != NULL)
320         {
321           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
322                                        &ds->trns_case, &ds->cases_written);
323           if (retval != TRNS_CONTINUE)
324             continue;
325         }
326
327       *c = &ds->trns_case;
328       return true;
329     }
330 }
331
332 /* Closes dataset DS for reading.
333    Returns true if successful, false if an I/O error occurred
334    while reading or closing the data set.
335    If DS has not been opened, returns true without doing
336    anything else. */
337 bool
338 proc_close (struct dataset *ds)
339 {
340   if (!ds->is_open)
341     return true;
342
343   /* Drain any remaining cases. */
344   while (ds->ok)
345     {
346       struct ccase *c;
347       if (!proc_read (ds, &c))
348         break;
349     }
350   ds->ok = free_case_source (ds->proc_source) && ds->ok;
351   proc_set_source (ds, NULL);
352
353   case_destroy (&ds->sink_case);
354   case_destroy (&ds->trns_case);
355
356   ds->ok = close_active_file (ds) && ds->ok;
357   ds->is_open = false;
358
359   return ds->ok;
360 }
361
362 /* Updates last_proc_invocation. */
363 static void
364 update_last_proc_invocation (struct dataset *ds)
365 {
366   ds->last_proc_invocation = time (NULL);
367 }
368
369 /* Creates and returns a case, initializing it from the vectors
370    that say which `value's need to be initialized just once, and
371    which ones need to be re-initialized before every case. */
372 static void
373 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
374 {
375   size_t var_cnt = dict_get_var_cnt (dict);
376   size_t i;
377
378   case_create (trns_case, dict_get_next_value_idx (dict));
379   for (i = 0; i < var_cnt; i++)
380     {
381       struct variable *v = dict_get_var (dict, i);
382       union value *value = case_data_rw (trns_case, v);
383
384       if (var_is_numeric (v))
385         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
386       else
387         memset (value->s, ' ', var_get_width (v));
388     }
389 }
390
391 /* Makes all preparations for reading from the data source and writing
392    to the data sink. */
393 static void
394 open_active_file (struct dataset *ds)
395 {
396   add_case_limit_trns (ds);
397   add_filter_trns (ds);
398
399   /* Finalize transformations. */
400   trns_chain_finalize (ds->cur_trns_chain);
401
402   /* Make permanent_dict refer to the dictionary right before
403      data reaches the sink. */
404   if (ds->permanent_dict == NULL)
405     ds->permanent_dict = ds->dict;
406
407   /* Figure out whether to compact. */
408   ds->compactor =
409     (dict_compacting_would_shrink (ds->permanent_dict)
410      ? dict_make_compactor (ds->permanent_dict)
411      : NULL);
412
413   /* Prepare sink. */
414   if (ds->proc_sink == NULL)
415     ds->proc_sink = create_case_sink (&storage_sink_class,
416                                       ds->permanent_dict,
417                                       ds->cf_factory,
418                                       NULL);
419   if (ds->proc_sink->class->open != NULL)
420     ds->proc_sink->class->open (ds->proc_sink);
421
422   /* Allocate memory for lagged cases. */
423   casedeque_init (&ds->lagged_cases, ds->n_lag);
424 }
425
426 /* Clears the variables in C that need to be cleared between
427    processing cases.  */
428 static void
429 clear_case (const struct dataset *ds, struct ccase *c)
430 {
431   size_t var_cnt = dict_get_var_cnt (ds->dict);
432   size_t i;
433
434   for (i = 0; i < var_cnt; i++)
435     {
436       struct variable *v = dict_get_var (ds->dict, i);
437       if (!var_get_leave (v))
438         {
439           if (var_is_numeric (v))
440             case_data_rw (c, v)->f = SYSMIS;
441           else
442             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
443         }
444     }
445 }
446
447 /* Closes the active file. */
448 static bool
449 close_active_file (struct dataset *ds)
450 {
451   /* Free memory for lagged cases. */
452   while (!casedeque_is_empty (&ds->lagged_cases))
453     case_destroy (casedeque_pop_back (&ds->lagged_cases));
454   casedeque_destroy (&ds->lagged_cases);
455
456   /* Dictionary from before TEMPORARY becomes permanent. */
457   proc_cancel_temporary_transformations (ds);
458
459   /* Finish compacting. */
460   if (ds->compactor != NULL)
461     {
462       dict_compactor_destroy (ds->compactor);
463       dict_compact_values (ds->dict);
464       ds->compactor = NULL;
465     }
466
467   /* Old data sink becomes new data source. */
468   if (ds->proc_sink->class->make_source != NULL)
469     proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
470   free_case_sink (ds->proc_sink);
471   ds->proc_sink = NULL;
472
473   dict_clear_vectors (ds->dict);
474   ds->permanent_dict = NULL;
475   return proc_cancel_all_transformations (ds);
476 }
477 \f
478 /* Returns a pointer to the lagged case from N_BEFORE cases before the
479    current one, or NULL if there haven't been that many cases yet. */
480 struct ccase *
481 lagged_case (const struct dataset *ds, int n_before)
482 {
483   assert (n_before >= 1);
484   assert (n_before <= ds->n_lag);
485
486   if (n_before <= casedeque_count (&ds->lagged_cases))
487     return casedeque_front (&ds->lagged_cases, n_before - 1);
488   else
489     return NULL;
490 }
491 \f
492 /* Procedure that separates the data into SPLIT FILE groups. */
493
494 /* Represents auxiliary data for handling SPLIT FILE. */
495 struct split_aux_data
496   {
497     struct dataset *dataset;    /* The dataset */
498     struct ccase prev_case;     /* Data in previous case. */
499
500     /* Callback functions. */
501     begin_func *begin;
502     case_func *proc;
503     end_func *end;
504     void *func_aux;
505   };
506
507 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
508 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
509 static bool split_procedure_end_func (void *, const struct dataset *);
510
511 /* Like procedure(), but it automatically breaks the case stream
512    into SPLIT FILE break groups.  Before each group of cases with
513    identical SPLIT FILE variable values, BEGIN_FUNC is called
514    with the first case in the group.
515    Then PROC_FUNC is called for each case in the group (including
516    the first).
517    END_FUNC is called when the group is finished.  FUNC_AUX is
518    passed to each of the functions as auxiliary data.
519
520    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
521    and END_FUNC will be called at all.
522
523    If SPLIT FILE is not in effect, then there is one break group
524    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
525    will be called once.
526
527    Returns true if successful, false if an I/O error occurred. */
528 bool
529 procedure_with_splits (struct dataset *ds,
530                        begin_func begin,
531                        case_func *proc,
532                        end_func *end,
533                        void *func_aux)
534 {
535   struct split_aux_data split_aux;
536   bool ok;
537
538   case_nullify (&split_aux.prev_case);
539   split_aux.begin = begin;
540   split_aux.proc = proc;
541   split_aux.end = end;
542   split_aux.func_aux = func_aux;
543   split_aux.dataset = ds;
544
545   ok = internal_procedure (ds, split_procedure_case_func,
546                            split_procedure_end_func, &split_aux);
547
548   case_destroy (&split_aux.prev_case);
549
550   return ok;
551 }
552
553 /* Case callback used by procedure_with_splits(). */
554 static bool
555 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
556 {
557   struct split_aux_data *split_aux = split_aux_;
558
559   /* Start a new series if needed. */
560   if (case_is_null (&split_aux->prev_case)
561       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
562     {
563       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
564         split_aux->end (split_aux->func_aux, ds);
565
566       case_destroy (&split_aux->prev_case);
567       case_clone (&split_aux->prev_case, c);
568
569       if (split_aux->begin != NULL)
570         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
571     }
572
573   return (split_aux->proc == NULL
574           || split_aux->proc (c, split_aux->func_aux, ds));
575 }
576
577 /* End-of-file callback used by procedure_with_splits(). */
578 static bool
579 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
580 {
581   struct split_aux_data *split_aux = split_aux_;
582
583   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
584     split_aux->end (split_aux->func_aux, ds);
585   return true;
586 }
587
588 /* Compares the SPLIT FILE variables in cases A and B and returns
589    nonzero only if they differ. */
590 static int
591 equal_splits (const struct ccase *a, const struct ccase *b,
592               const struct dataset *ds)
593 {
594   return case_compare (a, b,
595                        dict_get_split_vars (ds->dict),
596                        dict_get_split_cnt (ds->dict)) == 0;
597 }
598 \f
599 /* Multipass procedure that separates the data into SPLIT FILE
600    groups. */
601
602 /* Represents auxiliary data for handling SPLIT FILE in a
603    multipass procedure. */
604 struct multipass_split_aux_data
605   {
606     struct dataset *dataset;    /* The dataset of the split */
607     struct ccase prev_case;     /* Data in previous case. */
608     struct casefile *casefile;  /* Accumulates data for a split. */
609     split_func *split;          /* Function to call with the accumulated
610                                    data. */
611     void *func_aux;             /* Auxiliary data. */
612   };
613
614 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
615 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
616 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
617
618 /* Returns true if successful, false if an I/O error occurred. */
619 bool
620 multipass_procedure_with_splits (struct dataset *ds,
621                                  split_func  *split,
622                                  void *func_aux)
623 {
624   struct multipass_split_aux_data aux;
625   bool ok;
626
627   case_nullify (&aux.prev_case);
628   aux.casefile = NULL;
629   aux.split = split;
630   aux.func_aux = func_aux;
631   aux.dataset = ds;
632
633   ok = internal_procedure (ds, multipass_split_case_func,
634                            multipass_split_end_func, &aux);
635   case_destroy (&aux.prev_case);
636
637   return ok;
638 }
639
640 /* Case callback used by multipass_procedure_with_splits(). */
641 static bool
642 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
643 {
644   struct multipass_split_aux_data *aux = aux_;
645   bool ok = true;
646
647   /* Start a new series if needed. */
648   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
649     {
650       /* Record split values. */
651       case_destroy (&aux->prev_case);
652       case_clone (&aux->prev_case, c);
653
654       /* Pass any cases to split_func. */
655       if (aux->casefile != NULL)
656         ok = multipass_split_output (aux, ds);
657
658       /* Start a new casefile. */
659       aux->casefile =
660         ds->cf_factory->create_casefile (ds->cf_factory,
661                                          dict_get_next_value_idx (ds->dict));
662     }
663
664   return casefile_append (aux->casefile, c) && ok;
665 }
666
667 /* End-of-file callback used by multipass_procedure_with_splits(). */
668 static bool
669 multipass_split_end_func (void *aux_, const struct dataset *ds)
670 {
671   struct multipass_split_aux_data *aux = aux_;
672   return (aux->casefile == NULL || multipass_split_output (aux, ds));
673 }
674
675 static bool
676 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
677 {
678   bool ok;
679
680   assert (aux->casefile != NULL);
681   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
682   casefile_destroy (aux->casefile);
683   aux->casefile = NULL;
684
685   return ok;
686 }
687 \f
688 /* Discards all the current state in preparation for a data-input
689    command like DATA LIST or GET. */
690 void
691 discard_variables (struct dataset *ds)
692 {
693   dict_clear (ds->dict);
694   fh_set_default_handle (NULL);
695
696   ds->n_lag = 0;
697
698   free_case_source (ds->proc_source);
699   proc_set_source (ds, NULL);
700
701   proc_cancel_all_transformations (ds);
702 }
703 \f
704 /* Returns the current set of permanent transformations,
705    and clears the permanent transformations.
706    For use by INPUT PROGRAM. */
707 struct trns_chain *
708 proc_capture_transformations (struct dataset *ds)
709 {
710   struct trns_chain *chain;
711
712   assert (ds->temporary_trns_chain == NULL);
713   chain = ds->permanent_trns_chain;
714   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
715   return chain;
716 }
717
718 /* Adds a transformation that processes a case with PROC and
719    frees itself with FREE to the current set of transformations.
720    The functions are passed AUX as auxiliary data. */
721 void
722 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
723 {
724   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
725 }
726
727 /* Adds a transformation that processes a case with PROC and
728    frees itself with FREE to the current set of transformations.
729    When parsing of the block of transformations is complete,
730    FINALIZE will be called.
731    The functions are passed AUX as auxiliary data. */
732 void
733 add_transformation_with_finalizer (struct dataset *ds,
734                                    trns_finalize_func *finalize,
735                                    trns_proc_func *proc,
736                                    trns_free_func *free, void *aux)
737 {
738   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
739 }
740
741 /* Returns the index of the next transformation.
742    This value can be returned by a transformation procedure
743    function to indicate a "jump" to that transformation. */
744 size_t
745 next_transformation (const struct dataset *ds)
746 {
747   return trns_chain_next (ds->cur_trns_chain);
748 }
749
750 /* Returns true if the next call to add_transformation() will add
751    a temporary transformation, false if it will add a permanent
752    transformation. */
753 bool
754 proc_in_temporary_transformations (const struct dataset *ds)
755 {
756   return ds->temporary_trns_chain != NULL;
757 }
758
759 /* Marks the start of temporary transformations.
760    Further calls to add_transformation() will add temporary
761    transformations. */
762 void
763 proc_start_temporary_transformations (struct dataset *ds)
764 {
765   if (!proc_in_temporary_transformations (ds))
766     {
767       add_case_limit_trns (ds);
768
769       ds->permanent_dict = dict_clone (ds->dict);
770
771       trns_chain_finalize (ds->permanent_trns_chain);
772       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
773     }
774 }
775
776 /* Converts all the temporary transformations, if any, to
777    permanent transformations.  Further transformations will be
778    permanent.
779    Returns true if anything changed, false otherwise. */
780 bool
781 proc_make_temporary_transformations_permanent (struct dataset *ds)
782 {
783   if (proc_in_temporary_transformations (ds))
784     {
785       trns_chain_finalize (ds->temporary_trns_chain);
786       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
787       ds->temporary_trns_chain = NULL;
788
789       dict_destroy (ds->permanent_dict);
790       ds->permanent_dict = NULL;
791
792       return true;
793     }
794   else
795     return false;
796 }
797
798 /* Cancels all temporary transformations, if any.  Further
799    transformations will be permanent.
800    Returns true if anything changed, false otherwise. */
801 bool
802 proc_cancel_temporary_transformations (struct dataset *ds)
803 {
804   if (proc_in_temporary_transformations (ds))
805     {
806       dataset_set_dict (ds, ds->permanent_dict);
807       ds->permanent_dict = NULL;
808
809       trns_chain_destroy (ds->temporary_trns_chain);
810       ds->temporary_trns_chain = NULL;
811
812       return true;
813     }
814   else
815     return false;
816 }
817
818 /* Cancels all transformations, if any.
819    Returns true if successful, false on I/O error. */
820 bool
821 proc_cancel_all_transformations (struct dataset *ds)
822 {
823   bool ok;
824   ok = trns_chain_destroy (ds->permanent_trns_chain);
825   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
826   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
827   ds->temporary_trns_chain = NULL;
828   return ok;
829 }
830 \f
831 /* Initializes procedure handling. */
832 struct dataset *
833 create_dataset (struct casefile_factory *fact,
834                 replace_source_callback *rps,
835                 replace_dictionary_callback *rds
836                 )
837 {
838   struct dataset *ds = xzalloc (sizeof(*ds));
839   ds->dict = dict_create ();
840   ds->cf_factory = fact;
841   ds->replace_source = rps;
842   ds->replace_dict = rds;
843   proc_cancel_all_transformations (ds);
844   return ds;
845 }
846
847 /* Finishes up procedure handling. */
848 void
849 destroy_dataset (struct dataset *ds)
850 {
851   discard_variables (ds);
852   dict_destroy (ds->dict);
853   trns_chain_destroy (ds->permanent_trns_chain);
854   free (ds);
855 }
856
857 /* Sets SINK as the destination for procedure output from the
858    next procedure. */
859 void
860 proc_set_sink (struct dataset *ds, struct case_sink *sink)
861 {
862   assert (ds->proc_sink == NULL);
863   ds->proc_sink = sink;
864 }
865
866 /* Sets SOURCE as the source for procedure input for the next
867    procedure. */
868 void
869 proc_set_source (struct dataset *ds, struct case_source *source)
870 {
871   ds->proc_source = source;
872
873   if ( ds->replace_source )
874     ds->replace_source (ds->proc_source);
875 }
876
877 /* Returns true if a source for the next procedure has been
878    configured, false otherwise. */
879 bool
880 proc_has_source (const struct dataset *ds)
881 {
882   return ds->proc_source != NULL;
883 }
884
885 /* Returns the output from the previous procedure.
886    For use only immediately after executing a procedure.
887    The returned casefile is owned by the caller; it will not be
888    automatically used for the next procedure's input. */
889 struct casefile *
890 proc_capture_output (struct dataset *ds)
891 {
892   struct casefile *casefile;
893
894   /* Try to make sure that this function is called immediately
895      after procedure() or a similar function. */
896   assert (ds->proc_source != NULL);
897   assert (case_source_is_class (ds->proc_source, &storage_source_class));
898   assert (trns_chain_is_empty (ds->permanent_trns_chain));
899   assert (!proc_in_temporary_transformations (ds));
900
901   casefile = storage_source_decapsulate (ds->proc_source);
902   proc_set_source (ds, NULL);
903
904   return casefile;
905 }
906 \f
907 static trns_proc_func case_limit_trns_proc;
908 static trns_free_func case_limit_trns_free;
909
910 /* Adds a transformation that limits the number of cases that may
911    pass through, if DS->DICT has a case limit. */
912 static void
913 add_case_limit_trns (struct dataset *ds)
914 {
915   size_t case_limit = dict_get_case_limit (ds->dict);
916   if (case_limit != 0)
917     {
918       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
919       *cases_remaining = case_limit;
920       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
921                           cases_remaining);
922       dict_set_case_limit (ds->dict, 0);
923     }
924 }
925
926 /* Limits the maximum number of cases processed to
927    *CASES_REMAINING. */
928 static int
929 case_limit_trns_proc (void *cases_remaining_,
930                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
931 {
932   size_t *cases_remaining = cases_remaining_;
933   if (*cases_remaining > 0)
934     {
935       (*cases_remaining)--;
936       return TRNS_CONTINUE;
937     }
938   else
939     return TRNS_DROP_CASE;
940 }
941
942 /* Frees the data associated with a case limit transformation. */
943 static bool
944 case_limit_trns_free (void *cases_remaining_)
945 {
946   size_t *cases_remaining = cases_remaining_;
947   free (cases_remaining);
948   return true;
949 }
950 \f
951 static trns_proc_func filter_trns_proc;
952
953 /* Adds a temporary transformation to filter data according to
954    the variable specified on FILTER, if any. */
955 static void
956 add_filter_trns (struct dataset *ds)
957 {
958   struct variable *filter_var = dict_get_filter (ds->dict);
959   if (filter_var != NULL)
960     {
961       proc_start_temporary_transformations (ds);
962       add_transformation (ds, filter_trns_proc, NULL, filter_var);
963     }
964 }
965
966 /* FILTER transformation. */
967 static int
968 filter_trns_proc (void *filter_var_,
969                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
970
971 {
972   struct variable *filter_var = filter_var_;
973   double f = case_num (c, filter_var);
974   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
975           ? TRNS_CONTINUE : TRNS_DROP_CASE);
976 }
977
978
979 struct dictionary *
980 dataset_dict (const struct dataset *ds)
981 {
982   return ds->dict;
983 }
984
985
986 /* Set or replace dataset DS's dictionary with DICT.
987    The old dictionary is destroyed */
988 void
989 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
990 {
991   struct dictionary *old_dict = ds->dict;
992
993   dict_copy_callbacks (dict, ds->dict);
994   ds->dict = dict;
995
996   if ( ds->replace_dict )
997     ds->replace_dict (dict);
998
999   dict_destroy (old_dict);
1000 }
1001
1002 void 
1003 dataset_need_lag (struct dataset *ds, int n_before)
1004 {
1005   ds->n_lag = MAX (ds->n_lag, n_before);
1006 }
1007
1008 struct casefile_factory *
1009 dataset_get_casefile_factory (const struct dataset *ds)
1010 {
1011   return ds->cf_factory;
1012 }
1013