Patch #5672
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 struct dataset {
42
43   /* An abstract factory which creates casefiles */
44   struct casefile_factory *cf_factory;
45
46   /* Callback which occurs when a procedure provides a new source for
47      the dataset */
48   replace_source_callback *replace_source ;
49
50   /* Callback which occurs whenever the DICT is replaced by a new one */
51   replace_dictionary_callback *replace_dict;
52
53   /* Cases are read from proc_source,
54      pass through permanent_trns_chain (which transforms them into
55      the format described by permanent_dict),
56      are written to proc_sink,
57      pass through temporary_trns_chain (which transforms them into
58      the format described by dict),
59      and are finally passed to the procedure. */
60   struct case_source *proc_source;
61   struct trns_chain *permanent_trns_chain;
62   struct dictionary *permanent_dict;
63   struct case_sink *proc_sink;
64   struct trns_chain *temporary_trns_chain;
65   struct dictionary *dict;
66
67   /* The transformation chain that the next transformation will be
68      added to. */
69   struct trns_chain *cur_trns_chain;
70
71   /* The compactor used to compact a case, if necessary;
72      otherwise a null pointer. */
73   struct dict_compactor *compactor;
74
75   /* Time at which proc was last invoked. */
76   time_t last_proc_invocation;
77
78   /* Lag queue. */
79   int n_lag;                    /* Number of cases to lag. */
80   int lag_count;                /* Number of cases in lag_queue so far. */
81   int lag_head;         /* Index where next case will be added. */
82   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
83
84   /* Procedure data. */
85   bool is_open;               /* Procedure open? */
86   struct ccase trns_case;     /* Case used for transformations. */
87   struct ccase sink_case;     /* Case written to sink, if
88                                  compacting is necessary. */
89   size_t cases_written;       /* Cases output so far. */
90   bool ok;
91 }; /* struct dataset */
92
93
94 static void add_case_limit_trns (struct dataset *ds);
95 static void add_filter_trns (struct dataset *ds);
96
97 static bool internal_procedure (struct dataset *ds, case_func *,
98                                 end_func *,
99                                 void *aux);
100 static void update_last_proc_invocation (struct dataset *ds);
101 static void create_trns_case (struct ccase *, struct dictionary *);
102 static void open_active_file (struct dataset *ds);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
106 \f
107 /* Public functions. */
108
109 /* Returns the last time the data was read. */
110 time_t
111 time_of_last_procedure (struct dataset *ds)
112 {
113   if (ds->last_proc_invocation == 0)
114     update_last_proc_invocation (ds);
115   return ds->last_proc_invocation;
116 }
117 \f
118 /* Regular procedure. */
119
120
121
122 /* Reads the data from the input program and writes it to a new
123    active file.  For each case we read from the input program, we
124    do the following:
125
126    1. Execute permanent transformations.  If these drop the case,
127       start the next case from step 1.
128
129    2. Write case to replacement active file.
130
131    3. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133
134    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
135
136    Returns true if successful, false if an I/O error occurred. */
137 bool
138 procedure (struct dataset *ds, case_func *cf, void *aux)
139 {
140   update_last_proc_invocation (ds);
141
142   /* Optimize the trivial case where we're not going to do
143      anything with the data, by not reading the data at all. */
144   if (cf == NULL
145       && case_source_is_class (ds->proc_source, &storage_source_class)
146       && ds->proc_sink == NULL
147       && (ds->temporary_trns_chain == NULL
148           || trns_chain_is_empty (ds->temporary_trns_chain))
149       && trns_chain_is_empty (ds->permanent_trns_chain))
150     {
151       ds->n_lag = 0;
152       dict_set_case_limit (ds->dict, 0);
153       dict_clear_vectors (ds->dict);
154       return true;
155     }
156
157   return internal_procedure (ds, cf, NULL, aux);
158 }
159 \f
160 /* Multipass procedure. */
161
162 struct multipass_aux_data
163   {
164     struct casefile *casefile;
165
166     bool (*proc_func) (const struct casefile *, void *aux);
167     void *aux;
168   };
169
170 /* Case processing function for multipass_procedure(). */
171 static bool
172 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
173 {
174   struct multipass_aux_data *aux_data = aux_data_;
175   return casefile_append (aux_data->casefile, c);
176 }
177
178 /* End-of-file function for multipass_procedure(). */
179 static bool
180 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
181 {
182   struct multipass_aux_data *aux_data = aux_data_;
183   return (aux_data->proc_func == NULL
184           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
185 }
186
187 /* Procedure that allows multiple passes over the input data.
188    The entire active file is passed to PROC_FUNC, with the given
189    AUX as auxiliary data, as a unit. */
190 bool
191 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
192 {
193   struct multipass_aux_data aux_data;
194   bool ok;
195
196   aux_data.casefile =
197     ds->cf_factory->create_casefile (ds->cf_factory,
198                                      dict_get_next_value_idx (ds->dict));
199
200   aux_data.proc_func = proc_func;
201   aux_data.aux = aux;
202
203   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
204   ok = !casefile_error (aux_data.casefile) && ok;
205
206   casefile_destroy (aux_data.casefile);
207
208   return ok;
209 }
210 \f
211
212 /* Procedure implementation. */
213
214 /* Executes a procedure.
215    Passes each case to CASE_FUNC.
216    Calls END_FUNC after the last case.
217    Returns true if successful, false if an I/O error occurred (or
218    if CASE_FUNC or END_FUNC ever returned false). */
219 static bool
220 internal_procedure (struct dataset *ds, case_func *proc,
221                     end_func *end,
222                     void *aux)
223 {
224   struct ccase *c;
225   bool ok = true;
226
227   proc_open (ds);
228   while (ok && proc_read (ds, &c))
229     if (proc != NULL)
230       ok = proc (c, aux, ds) && ok;
231   if (end != NULL)
232     ok = end (aux, ds) && ok;
233
234   if ( proc_close (ds) && ok )
235     {
236
237       return true;
238     }
239
240   return false;
241 }
242
243 /* Opens dataset DS for reading cases with proc_read.
244    proc_close must be called when done. */
245 void
246 proc_open (struct dataset *ds)
247 {
248   assert (ds->proc_source != NULL);
249   assert (!ds->is_open);
250
251   update_last_proc_invocation (ds);
252
253   open_active_file (ds);
254
255   ds->is_open = true;
256   create_trns_case (&ds->trns_case, ds->dict);
257   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
258   ds->cases_written = 0;
259   ds->ok = true;
260 }
261
262 /* Reads the next case from dataset DS, which must have been
263    opened for reading with proc_open.
264    Returns true if successful, in which case a pointer to the
265    case is stored in *C.
266    Return false at end of file or if a read error occurs.  In
267    this case a null pointer is stored in *C. */
268 bool
269 proc_read (struct dataset *ds, struct ccase **c)
270 {
271   enum trns_result retval = TRNS_DROP_CASE;
272
273   assert (ds->is_open);
274   *c = NULL;
275   for (;;)
276     {
277       size_t case_nr;
278
279       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
280       if (retval == TRNS_ERROR)
281         ds->ok = false;
282       if (!ds->ok)
283         return false;
284
285       /* Read a case from proc_source. */
286       clear_case (ds, &ds->trns_case);
287       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
288         return false;
289
290       /* Execute permanent transformations.  */
291       case_nr = ds->cases_written + 1;
292       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
293                                    &ds->trns_case, &case_nr);
294       if (retval != TRNS_CONTINUE)
295         continue;
296
297       /* Write case to LAG queue. */
298       if (ds->n_lag)
299         lag_case (ds, &ds->trns_case);
300
301       /* Write case to replacement active file. */
302       ds->cases_written++;
303       if (ds->proc_sink->class->write != NULL)
304         {
305           if (ds->compactor != NULL)
306             {
307               dict_compactor_compact (ds->compactor, &ds->sink_case,
308                                       &ds->trns_case);
309               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
310             }
311           else
312             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
313         }
314
315       /* Execute temporary transformations. */
316       if (ds->temporary_trns_chain != NULL)
317         {
318           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
319                                        &ds->trns_case, &ds->cases_written);
320           if (retval != TRNS_CONTINUE)
321             continue;
322         }
323
324       *c = &ds->trns_case;
325       return true;
326     }
327 }
328
329 /* Closes dataset DS for reading.
330    Returns true if successful, false if an I/O error occurred
331    while reading or closing the data set.
332    If DS has not been opened, returns true without doing
333    anything else. */
334 bool
335 proc_close (struct dataset *ds)
336 {
337   if (!ds->is_open)
338     return true;
339
340   /* Drain any remaining cases. */
341   while (ds->ok)
342     {
343       struct ccase *c;
344       if (!proc_read (ds, &c))
345         break;
346     }
347   ds->ok = free_case_source (ds->proc_source) && ds->ok;
348   proc_set_source (ds, NULL);
349
350   case_destroy (&ds->sink_case);
351   case_destroy (&ds->trns_case);
352
353   ds->ok = close_active_file (ds) && ds->ok;
354   ds->is_open = false;
355
356   return ds->ok;
357 }
358
359 /* Updates last_proc_invocation. */
360 static void
361 update_last_proc_invocation (struct dataset *ds)
362 {
363   ds->last_proc_invocation = time (NULL);
364 }
365
366 /* Creates and returns a case, initializing it from the vectors
367    that say which `value's need to be initialized just once, and
368    which ones need to be re-initialized before every case. */
369 static void
370 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
371 {
372   size_t var_cnt = dict_get_var_cnt (dict);
373   size_t i;
374
375   case_create (trns_case, dict_get_next_value_idx (dict));
376   for (i = 0; i < var_cnt; i++)
377     {
378       struct variable *v = dict_get_var (dict, i);
379       union value *value = case_data_rw (trns_case, v);
380
381       if (var_is_numeric (v))
382         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
383       else
384         memset (value->s, ' ', var_get_width (v));
385     }
386 }
387
388 /* Makes all preparations for reading from the data source and writing
389    to the data sink. */
390 static void
391 open_active_file (struct dataset *ds)
392 {
393   add_case_limit_trns (ds);
394   add_filter_trns (ds);
395
396   /* Finalize transformations. */
397   trns_chain_finalize (ds->cur_trns_chain);
398
399   /* Make permanent_dict refer to the dictionary right before
400      data reaches the sink. */
401   if (ds->permanent_dict == NULL)
402     ds->permanent_dict = ds->dict;
403
404   /* Figure out whether to compact. */
405   ds->compactor =
406     (dict_compacting_would_shrink (ds->permanent_dict)
407      ? dict_make_compactor (ds->permanent_dict)
408      : NULL);
409
410   /* Prepare sink. */
411   if (ds->proc_sink == NULL)
412     ds->proc_sink = create_case_sink (&storage_sink_class,
413                                       ds->permanent_dict,
414                                       ds->cf_factory,
415                                       NULL);
416   if (ds->proc_sink->class->open != NULL)
417     ds->proc_sink->class->open (ds->proc_sink);
418
419   /* Allocate memory for lag queue. */
420   if (ds->n_lag > 0)
421     {
422       int i;
423
424       ds->lag_count = 0;
425       ds->lag_head = 0;
426       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
427       for (i = 0; i < ds->n_lag; i++)
428         case_nullify (&ds->lag_queue[i]);
429     }
430 }
431
432 /* Add C to the lag queue. */
433 static void
434 lag_case (struct dataset *ds, const struct ccase *c)
435 {
436   if (ds->lag_count < ds->n_lag)
437     ds->lag_count++;
438   case_destroy (&ds->lag_queue[ds->lag_head]);
439   case_clone (&ds->lag_queue[ds->lag_head], c);
440   if (++ds->lag_head >= ds->n_lag)
441     ds->lag_head = 0;
442 }
443
444 /* Clears the variables in C that need to be cleared between
445    processing cases.  */
446 static void
447 clear_case (const struct dataset *ds, struct ccase *c)
448 {
449   size_t var_cnt = dict_get_var_cnt (ds->dict);
450   size_t i;
451
452   for (i = 0; i < var_cnt; i++)
453     {
454       struct variable *v = dict_get_var (ds->dict, i);
455       if (!var_get_leave (v))
456         {
457           if (var_is_numeric (v))
458             case_data_rw (c, v)->f = SYSMIS;
459           else
460             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
461         }
462     }
463 }
464
465 /* Closes the active file. */
466 static bool
467 close_active_file (struct dataset *ds)
468 {
469   /* Free memory for lag queue, and turn off lagging. */
470   if (ds->n_lag > 0)
471     {
472       int i;
473
474       for (i = 0; i < ds->n_lag; i++)
475         case_destroy (&ds->lag_queue[i]);
476       free (ds->lag_queue);
477       ds->n_lag = 0;
478     }
479
480   /* Dictionary from before TEMPORARY becomes permanent. */
481   proc_cancel_temporary_transformations (ds);
482
483   /* Finish compacting. */
484   if (ds->compactor != NULL)
485     {
486       dict_compactor_destroy (ds->compactor);
487       dict_compact_values (ds->dict);
488       ds->compactor = NULL;
489     }
490
491   /* Old data sink becomes new data source. */
492   if (ds->proc_sink->class->make_source != NULL)
493     proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
494   free_case_sink (ds->proc_sink);
495   ds->proc_sink = NULL;
496
497   dict_clear_vectors (ds->dict);
498   ds->permanent_dict = NULL;
499   return proc_cancel_all_transformations (ds);
500 }
501 \f
502 /* Returns a pointer to the lagged case from N_BEFORE cases before the
503    current one, or NULL if there haven't been that many cases yet. */
504 struct ccase *
505 lagged_case (const struct dataset *ds, int n_before)
506 {
507   assert (n_before >= 1 );
508   assert (n_before <= ds->n_lag);
509
510   if (n_before <= ds->lag_count)
511     {
512       int index = ds->lag_head - n_before;
513       if (index < 0)
514         index += ds->n_lag;
515       return &ds->lag_queue[index];
516     }
517   else
518     return NULL;
519 }
520 \f
521 /* Procedure that separates the data into SPLIT FILE groups. */
522
523 /* Represents auxiliary data for handling SPLIT FILE. */
524 struct split_aux_data
525   {
526     struct dataset *dataset;    /* The dataset */
527     struct ccase prev_case;     /* Data in previous case. */
528
529     /* Callback functions. */
530     begin_func *begin;
531     case_func *proc;
532     end_func *end;
533     void *func_aux;
534   };
535
536 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
537 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
538 static bool split_procedure_end_func (void *, const struct dataset *);
539
540 /* Like procedure(), but it automatically breaks the case stream
541    into SPLIT FILE break groups.  Before each group of cases with
542    identical SPLIT FILE variable values, BEGIN_FUNC is called
543    with the first case in the group.
544    Then PROC_FUNC is called for each case in the group (including
545    the first).
546    END_FUNC is called when the group is finished.  FUNC_AUX is
547    passed to each of the functions as auxiliary data.
548
549    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
550    and END_FUNC will be called at all.
551
552    If SPLIT FILE is not in effect, then there is one break group
553    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
554    will be called once.
555
556    Returns true if successful, false if an I/O error occurred. */
557 bool
558 procedure_with_splits (struct dataset *ds,
559                        begin_func begin,
560                        case_func *proc,
561                        end_func *end,
562                        void *func_aux)
563 {
564   struct split_aux_data split_aux;
565   bool ok;
566
567   case_nullify (&split_aux.prev_case);
568   split_aux.begin = begin;
569   split_aux.proc = proc;
570   split_aux.end = end;
571   split_aux.func_aux = func_aux;
572   split_aux.dataset = ds;
573
574   ok = internal_procedure (ds, split_procedure_case_func,
575                            split_procedure_end_func, &split_aux);
576
577   case_destroy (&split_aux.prev_case);
578
579   return ok;
580 }
581
582 /* Case callback used by procedure_with_splits(). */
583 static bool
584 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
585 {
586   struct split_aux_data *split_aux = split_aux_;
587
588   /* Start a new series if needed. */
589   if (case_is_null (&split_aux->prev_case)
590       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
591     {
592       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
593         split_aux->end (split_aux->func_aux, ds);
594
595       case_destroy (&split_aux->prev_case);
596       case_clone (&split_aux->prev_case, c);
597
598       if (split_aux->begin != NULL)
599         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
600     }
601
602   return (split_aux->proc == NULL
603           || split_aux->proc (c, split_aux->func_aux, ds));
604 }
605
606 /* End-of-file callback used by procedure_with_splits(). */
607 static bool
608 split_procedure_end_func (void *split_aux_, const struct dataset *ds)
609 {
610   struct split_aux_data *split_aux = split_aux_;
611
612   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
613     split_aux->end (split_aux->func_aux, ds);
614   return true;
615 }
616
617 /* Compares the SPLIT FILE variables in cases A and B and returns
618    nonzero only if they differ. */
619 static int
620 equal_splits (const struct ccase *a, const struct ccase *b,
621               const struct dataset *ds)
622 {
623   return case_compare (a, b,
624                        dict_get_split_vars (ds->dict),
625                        dict_get_split_cnt (ds->dict)) == 0;
626 }
627 \f
628 /* Multipass procedure that separates the data into SPLIT FILE
629    groups. */
630
631 /* Represents auxiliary data for handling SPLIT FILE in a
632    multipass procedure. */
633 struct multipass_split_aux_data
634   {
635     struct dataset *dataset;    /* The dataset of the split */
636     struct ccase prev_case;     /* Data in previous case. */
637     struct casefile *casefile;  /* Accumulates data for a split. */
638     split_func *split;          /* Function to call with the accumulated
639                                    data. */
640     void *func_aux;             /* Auxiliary data. */
641   };
642
643 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
644 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
645 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
646
647 /* Returns true if successful, false if an I/O error occurred. */
648 bool
649 multipass_procedure_with_splits (struct dataset *ds,
650                                  split_func  *split,
651                                  void *func_aux)
652 {
653   struct multipass_split_aux_data aux;
654   bool ok;
655
656   case_nullify (&aux.prev_case);
657   aux.casefile = NULL;
658   aux.split = split;
659   aux.func_aux = func_aux;
660   aux.dataset = ds;
661
662   ok = internal_procedure (ds, multipass_split_case_func,
663                            multipass_split_end_func, &aux);
664   case_destroy (&aux.prev_case);
665
666   return ok;
667 }
668
669 /* Case callback used by multipass_procedure_with_splits(). */
670 static bool
671 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
672 {
673   struct multipass_split_aux_data *aux = aux_;
674   bool ok = true;
675
676   /* Start a new series if needed. */
677   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
678     {
679       /* Record split values. */
680       case_destroy (&aux->prev_case);
681       case_clone (&aux->prev_case, c);
682
683       /* Pass any cases to split_func. */
684       if (aux->casefile != NULL)
685         ok = multipass_split_output (aux, ds);
686
687       /* Start a new casefile. */
688       aux->casefile =
689         ds->cf_factory->create_casefile (ds->cf_factory,
690                                          dict_get_next_value_idx (ds->dict));
691     }
692
693   return casefile_append (aux->casefile, c) && ok;
694 }
695
696 /* End-of-file callback used by multipass_procedure_with_splits(). */
697 static bool
698 multipass_split_end_func (void *aux_, const struct dataset *ds)
699 {
700   struct multipass_split_aux_data *aux = aux_;
701   return (aux->casefile == NULL || multipass_split_output (aux, ds));
702 }
703
704 static bool
705 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
706 {
707   bool ok;
708
709   assert (aux->casefile != NULL);
710   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
711   casefile_destroy (aux->casefile);
712   aux->casefile = NULL;
713
714   return ok;
715 }
716 \f
717 /* Discards all the current state in preparation for a data-input
718    command like DATA LIST or GET. */
719 void
720 discard_variables (struct dataset *ds)
721 {
722   dict_clear (ds->dict);
723   fh_set_default_handle (NULL);
724
725   ds->n_lag = 0;
726
727   free_case_source (ds->proc_source);
728   proc_set_source (ds, NULL);
729
730   proc_cancel_all_transformations (ds);
731 }
732 \f
733 /* Returns the current set of permanent transformations,
734    and clears the permanent transformations.
735    For use by INPUT PROGRAM. */
736 struct trns_chain *
737 proc_capture_transformations (struct dataset *ds)
738 {
739   struct trns_chain *chain;
740
741   assert (ds->temporary_trns_chain == NULL);
742   chain = ds->permanent_trns_chain;
743   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
744   return chain;
745 }
746
747 /* Adds a transformation that processes a case with PROC and
748    frees itself with FREE to the current set of transformations.
749    The functions are passed AUX as auxiliary data. */
750 void
751 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
752 {
753   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
754 }
755
756 /* Adds a transformation that processes a case with PROC and
757    frees itself with FREE to the current set of transformations.
758    When parsing of the block of transformations is complete,
759    FINALIZE will be called.
760    The functions are passed AUX as auxiliary data. */
761 void
762 add_transformation_with_finalizer (struct dataset *ds,
763                                    trns_finalize_func *finalize,
764                                    trns_proc_func *proc,
765                                    trns_free_func *free, void *aux)
766 {
767   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
768 }
769
770 /* Returns the index of the next transformation.
771    This value can be returned by a transformation procedure
772    function to indicate a "jump" to that transformation. */
773 size_t
774 next_transformation (const struct dataset *ds)
775 {
776   return trns_chain_next (ds->cur_trns_chain);
777 }
778
779 /* Returns true if the next call to add_transformation() will add
780    a temporary transformation, false if it will add a permanent
781    transformation. */
782 bool
783 proc_in_temporary_transformations (const struct dataset *ds)
784 {
785   return ds->temporary_trns_chain != NULL;
786 }
787
788 /* Marks the start of temporary transformations.
789    Further calls to add_transformation() will add temporary
790    transformations. */
791 void
792 proc_start_temporary_transformations (struct dataset *ds)
793 {
794   if (!proc_in_temporary_transformations (ds))
795     {
796       add_case_limit_trns (ds);
797
798       ds->permanent_dict = dict_clone (ds->dict);
799
800       trns_chain_finalize (ds->permanent_trns_chain);
801       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
802     }
803 }
804
805 /* Converts all the temporary transformations, if any, to
806    permanent transformations.  Further transformations will be
807    permanent.
808    Returns true if anything changed, false otherwise. */
809 bool
810 proc_make_temporary_transformations_permanent (struct dataset *ds)
811 {
812   if (proc_in_temporary_transformations (ds))
813     {
814       trns_chain_finalize (ds->temporary_trns_chain);
815       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
816       ds->temporary_trns_chain = NULL;
817
818       dict_destroy (ds->permanent_dict);
819       ds->permanent_dict = NULL;
820
821       return true;
822     }
823   else
824     return false;
825 }
826
827 /* Cancels all temporary transformations, if any.  Further
828    transformations will be permanent.
829    Returns true if anything changed, false otherwise. */
830 bool
831 proc_cancel_temporary_transformations (struct dataset *ds)
832 {
833   if (proc_in_temporary_transformations (ds))
834     {
835       dataset_set_dict (ds, ds->permanent_dict);
836       ds->permanent_dict = NULL;
837
838       trns_chain_destroy (ds->temporary_trns_chain);
839       ds->temporary_trns_chain = NULL;
840
841       return true;
842     }
843   else
844     return false;
845 }
846
847 /* Cancels all transformations, if any.
848    Returns true if successful, false on I/O error. */
849 bool
850 proc_cancel_all_transformations (struct dataset *ds)
851 {
852   bool ok;
853   ok = trns_chain_destroy (ds->permanent_trns_chain);
854   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
855   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
856   ds->temporary_trns_chain = NULL;
857   return ok;
858 }
859 \f
860 /* Initializes procedure handling. */
861 struct dataset *
862 create_dataset (struct casefile_factory *fact,
863                 replace_source_callback *rps,
864                 replace_dictionary_callback *rds
865                 )
866 {
867   struct dataset *ds = xzalloc (sizeof(*ds));
868   ds->dict = dict_create ();
869   ds->cf_factory = fact;
870   ds->replace_source = rps;
871   ds->replace_dict = rds;
872   proc_cancel_all_transformations (ds);
873   return ds;
874 }
875
876 /* Finishes up procedure handling. */
877 void
878 destroy_dataset (struct dataset *ds)
879 {
880   discard_variables (ds);
881   dict_destroy (ds->dict);
882   trns_chain_destroy (ds->permanent_trns_chain);
883   free (ds);
884 }
885
886 /* Sets SINK as the destination for procedure output from the
887    next procedure. */
888 void
889 proc_set_sink (struct dataset *ds, struct case_sink *sink)
890 {
891   assert (ds->proc_sink == NULL);
892   ds->proc_sink = sink;
893 }
894
895 /* Sets SOURCE as the source for procedure input for the next
896    procedure. */
897 void
898 proc_set_source (struct dataset *ds, struct case_source *source)
899 {
900   ds->proc_source = source;
901
902   if ( ds->replace_source )
903     ds->replace_source (ds->proc_source);
904 }
905
906 /* Returns true if a source for the next procedure has been
907    configured, false otherwise. */
908 bool
909 proc_has_source (const struct dataset *ds)
910 {
911   return ds->proc_source != NULL;
912 }
913
914 /* Returns the output from the previous procedure.
915    For use only immediately after executing a procedure.
916    The returned casefile is owned by the caller; it will not be
917    automatically used for the next procedure's input. */
918 struct casefile *
919 proc_capture_output (struct dataset *ds)
920 {
921   struct casefile *casefile;
922
923   /* Try to make sure that this function is called immediately
924      after procedure() or a similar function. */
925   assert (ds->proc_source != NULL);
926   assert (case_source_is_class (ds->proc_source, &storage_source_class));
927   assert (trns_chain_is_empty (ds->permanent_trns_chain));
928   assert (!proc_in_temporary_transformations (ds));
929
930   casefile = storage_source_decapsulate (ds->proc_source);
931   proc_set_source (ds, NULL);
932
933   return casefile;
934 }
935 \f
936 static trns_proc_func case_limit_trns_proc;
937 static trns_free_func case_limit_trns_free;
938
939 /* Adds a transformation that limits the number of cases that may
940    pass through, if DS->DICT has a case limit. */
941 static void
942 add_case_limit_trns (struct dataset *ds)
943 {
944   size_t case_limit = dict_get_case_limit (ds->dict);
945   if (case_limit != 0)
946     {
947       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
948       *cases_remaining = case_limit;
949       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
950                           cases_remaining);
951       dict_set_case_limit (ds->dict, 0);
952     }
953 }
954
955 /* Limits the maximum number of cases processed to
956    *CASES_REMAINING. */
957 static int
958 case_limit_trns_proc (void *cases_remaining_,
959                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
960 {
961   size_t *cases_remaining = cases_remaining_;
962   if (*cases_remaining > 0)
963     {
964       (*cases_remaining)--;
965       return TRNS_CONTINUE;
966     }
967   else
968     return TRNS_DROP_CASE;
969 }
970
971 /* Frees the data associated with a case limit transformation. */
972 static bool
973 case_limit_trns_free (void *cases_remaining_)
974 {
975   size_t *cases_remaining = cases_remaining_;
976   free (cases_remaining);
977   return true;
978 }
979 \f
980 static trns_proc_func filter_trns_proc;
981
982 /* Adds a temporary transformation to filter data according to
983    the variable specified on FILTER, if any. */
984 static void
985 add_filter_trns (struct dataset *ds)
986 {
987   struct variable *filter_var = dict_get_filter (ds->dict);
988   if (filter_var != NULL)
989     {
990       proc_start_temporary_transformations (ds);
991       add_transformation (ds, filter_trns_proc, NULL, filter_var);
992     }
993 }
994
995 /* FILTER transformation. */
996 static int
997 filter_trns_proc (void *filter_var_,
998                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
999
1000 {
1001   struct variable *filter_var = filter_var_;
1002   double f = case_num (c, filter_var);
1003   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
1004           ? TRNS_CONTINUE : TRNS_DROP_CASE);
1005 }
1006
1007
1008 struct dictionary *
1009 dataset_dict (const struct dataset *ds)
1010 {
1011   return ds->dict;
1012 }
1013
1014
1015 /* Set or replace dataset DS's dictionary with DICT.
1016    The old dictionary is destroyed */
1017 void
1018 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
1019 {
1020   struct dictionary *old_dict = ds->dict;
1021
1022   dict_copy_callbacks (dict, ds->dict);
1023   ds->dict = dict;
1024
1025   if ( ds->replace_dict )
1026     ds->replace_dict (dict);
1027
1028   dict_destroy (old_dict);
1029 }
1030
1031 int
1032 dataset_n_lag (const struct dataset *ds)
1033 {
1034   return ds->n_lag;
1035 }
1036
1037 void
1038 dataset_set_n_lag (struct dataset *ds, int n_lag)
1039 {
1040   ds->n_lag = n_lag;
1041 }
1042
1043
1044 struct casefile_factory *
1045 dataset_get_casefile_factory (const struct dataset *ds)
1046 {
1047   return ds->cf_factory;
1048 }
1049