29dffbcaf8303dd9daf607d19eec7cfd76a1590a
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case-source.h>
27 #include <data/case-sink.h>
28 #include <data/case.h>
29 #include <data/casefile.h>
30 #include <data/fastfile.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/storage-stream.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40
41 struct dataset {
42
43   /* An abstract factory which creates casefiles */
44   struct casefile_factory *cf_factory;
45
46   /* Cases are read from proc_source,
47      pass through permanent_trns_chain (which transforms them into
48      the format described by permanent_dict),
49      are written to proc_sink,
50      pass through temporary_trns_chain (which transforms them into
51      the format described by dict),
52      and are finally passed to the procedure. */
53   struct case_source *proc_source;
54   struct trns_chain *permanent_trns_chain;
55   struct dictionary *permanent_dict;
56   struct case_sink *proc_sink;
57   struct trns_chain *temporary_trns_chain;
58   struct dictionary *dict;
59
60   /* The transformation chain that the next transformation will be
61      added to. */
62   struct trns_chain *cur_trns_chain;
63
64   /* The compactor used to compact a case, if necessary;
65      otherwise a null pointer. */
66   struct dict_compactor *compactor;
67
68   /* Time at which proc was last invoked. */
69   time_t last_proc_invocation;
70
71   /* Lag queue. */
72   int n_lag;                    /* Number of cases to lag. */
73   int lag_count;                /* Number of cases in lag_queue so far. */
74   int lag_head;         /* Index where next case will be added. */
75   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
76
77   /* Procedure data. */
78   bool is_open;               /* Procedure open? */
79   struct ccase trns_case;     /* Case used for transformations. */
80   struct ccase sink_case;     /* Case written to sink, if
81                                  compacting is necessary. */
82   size_t cases_written;       /* Cases output so far. */
83   bool ok;
84 }; /* struct dataset */
85
86
87 static void add_case_limit_trns (struct dataset *ds);
88 static void add_filter_trns (struct dataset *ds);
89
90 static bool internal_procedure (struct dataset *ds, case_func *,
91                                 end_func *,
92                                 void *aux);
93 static void update_last_proc_invocation (struct dataset *ds);
94 static void create_trns_case (struct ccase *, struct dictionary *);
95 static void open_active_file (struct dataset *ds);
96 static void lag_case (struct dataset *ds, const struct ccase *c);
97 static void clear_case (const struct dataset *ds, struct ccase *c);
98 static bool close_active_file (struct dataset *ds);
99 \f
100 /* Public functions. */
101
102 /* Returns the last time the data was read. */
103 time_t
104 time_of_last_procedure (struct dataset *ds) 
105 {
106   if (ds->last_proc_invocation == 0)
107     update_last_proc_invocation (ds);
108   return ds->last_proc_invocation;
109 }
110 \f
111 /* Regular procedure. */
112
113
114
115 /* Reads the data from the input program and writes it to a new
116    active file.  For each case we read from the input program, we
117    do the following:
118
119    1. Execute permanent transformations.  If these drop the case,
120       start the next case from step 1.
121
122    2. Write case to replacement active file.
123    
124    3. Execute temporary transformations.  If these drop the case,
125       start the next case from step 1.
126       
127    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
128
129    Returns true if successful, false if an I/O error occurred. */
130 bool
131 procedure (struct dataset *ds, case_func *cf, void *aux)
132 {
133   update_last_proc_invocation (ds);
134
135   /* Optimize the trivial case where we're not going to do
136      anything with the data, by not reading the data at all. */
137   if (cf == NULL
138       && case_source_is_class (ds->proc_source, &storage_source_class)
139       && ds->proc_sink == NULL
140       && (ds->temporary_trns_chain == NULL
141           || trns_chain_is_empty (ds->temporary_trns_chain))
142       && trns_chain_is_empty (ds->permanent_trns_chain))
143     {
144       ds->n_lag = 0;
145       dict_set_case_limit (ds->dict, 0);
146       dict_clear_vectors (ds->dict);
147       return true;
148     }
149
150   return internal_procedure (ds, cf, NULL, aux);
151 }
152 \f
153 /* Multipass procedure. */
154
155 struct multipass_aux_data 
156   {
157     struct casefile *casefile;
158     
159     bool (*proc_func) (const struct casefile *, void *aux);
160     void *aux;
161   };
162
163 /* Case processing function for multipass_procedure(). */
164 static bool
165 multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
166 {
167   struct multipass_aux_data *aux_data = aux_data_;
168   return casefile_append (aux_data->casefile, c);
169 }
170
171 /* End-of-file function for multipass_procedure(). */
172 static bool
173 multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
174 {
175   struct multipass_aux_data *aux_data = aux_data_;
176   return (aux_data->proc_func == NULL
177           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
178 }
179
180 /* Procedure that allows multiple passes over the input data.
181    The entire active file is passed to PROC_FUNC, with the given
182    AUX as auxiliary data, as a unit. */
183 bool
184 multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
185 {
186   struct multipass_aux_data aux_data;
187   bool ok;
188
189   aux_data.casefile =
190     ds->cf_factory->create_casefile (ds->cf_factory,
191                                      dict_get_next_value_idx (ds->dict));
192
193   aux_data.proc_func = proc_func;
194   aux_data.aux = aux;
195
196   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
197   ok = !casefile_error (aux_data.casefile) && ok;
198
199   casefile_destroy (aux_data.casefile);
200
201   return ok;
202 }
203 \f
204 /* Procedure implementation. */
205
206 /* Executes a procedure.
207    Passes each case to CASE_FUNC.
208    Calls END_FUNC after the last case.
209    Returns true if successful, false if an I/O error occurred (or
210    if CASE_FUNC or END_FUNC ever returned false). */
211 static bool
212 internal_procedure (struct dataset *ds, case_func *proc,
213                     end_func *end,
214                     void *aux) 
215 {
216   struct ccase *c;
217   bool ok = true;
218   
219   proc_open (ds);
220   while (ok && proc_read (ds, &c))
221     if (proc != NULL)
222       ok = proc (c, aux, ds) && ok;
223   if (end != NULL)
224     ok = end (aux, ds) && ok;
225   return proc_close (ds) && ok;
226 }
227
228 /* Opens dataset DS for reading cases with proc_read.
229    proc_close must be called when done. */
230 void
231 proc_open (struct dataset *ds)
232 {
233   assert (ds->proc_source != NULL);
234   assert (!ds->is_open);
235
236   update_last_proc_invocation (ds);
237
238   open_active_file (ds);
239
240   ds->is_open = true;
241   create_trns_case (&ds->trns_case, ds->dict);
242   case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
243   ds->cases_written = 0;
244   ds->ok = true;
245 }
246
247 /* Reads the next case from dataset DS, which must have been
248    opened for reading with proc_open.
249    Returns true if successful, in which case a pointer to the
250    case is stored in *C.
251    Return false at end of file or if a read error occurs.  In
252    this case a null pointer is stored in *C. */
253 bool
254 proc_read (struct dataset *ds, struct ccase **c) 
255 {
256   enum trns_result retval = TRNS_DROP_CASE;
257
258   assert (ds->is_open);
259   *c = NULL;
260   for (;;) 
261     {
262       size_t case_nr;
263
264       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
265       if (retval == TRNS_ERROR)
266         ds->ok = false;
267       if (!ds->ok)
268         return false;
269
270       /* Read a case from proc_source. */
271       clear_case (ds, &ds->trns_case);
272       if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
273         return false;
274
275       /* Execute permanent transformations.  */
276       case_nr = ds->cases_written + 1;
277       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
278                                    &ds->trns_case, &case_nr);
279       if (retval != TRNS_CONTINUE)
280         continue;
281   
282       /* Write case to LAG queue. */
283       if (ds->n_lag)
284         lag_case (ds, &ds->trns_case);
285
286       /* Write case to replacement active file. */
287       ds->cases_written++;
288       if (ds->proc_sink->class->write != NULL) 
289         {
290           if (ds->compactor != NULL) 
291             {
292               dict_compactor_compact (ds->compactor, &ds->sink_case,
293                                       &ds->trns_case);
294               ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
295             }
296           else
297             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
298         }
299   
300       /* Execute temporary transformations. */
301       if (ds->temporary_trns_chain != NULL) 
302         {
303           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
304                                        &ds->trns_case, &ds->cases_written);
305           if (retval != TRNS_CONTINUE)
306             continue;
307         }
308
309       *c = &ds->trns_case;
310       return true; 
311     }
312 }
313
314 /* Closes dataset DS for reading.
315    Returns true if successful, false if an I/O error occurred
316    while reading or closing the data set.
317    If DS has not been opened, returns true without doing
318    anything else. */
319 bool
320 proc_close (struct dataset *ds) 
321 {
322   if (!ds->is_open)
323     return true;
324
325   /* Drain any remaining cases. */
326   while (ds->ok) 
327     {
328       struct ccase *c;
329       if (!proc_read (ds, &c))
330         break; 
331     }
332   ds->ok = free_case_source (ds->proc_source) && ds->ok;
333   ds->proc_source = NULL;
334
335   case_destroy (&ds->sink_case);
336   case_destroy (&ds->trns_case);
337
338   ds->ok = close_active_file (ds) && ds->ok;
339   ds->is_open = false;
340
341   return ds->ok;
342 }
343
344 /* Updates last_proc_invocation. */
345 static void
346 update_last_proc_invocation (struct dataset *ds) 
347 {
348   ds->last_proc_invocation = time (NULL);
349 }
350
351 /* Creates and returns a case, initializing it from the vectors
352    that say which `value's need to be initialized just once, and
353    which ones need to be re-initialized before every case. */
354 static void
355 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
356 {
357   size_t var_cnt = dict_get_var_cnt (dict);
358   size_t i;
359
360   case_create (trns_case, dict_get_next_value_idx (dict));
361   for (i = 0; i < var_cnt; i++) 
362     {
363       struct variable *v = dict_get_var (dict, i);
364       union value *value = case_data_rw (trns_case, v);
365
366       if (var_is_numeric (v))
367         value->f = var_get_leave (v) ? 0.0 : SYSMIS;
368       else
369         memset (value->s, ' ', var_get_width (v));
370     }
371 }
372
373 /* Makes all preparations for reading from the data source and writing
374    to the data sink. */
375 static void
376 open_active_file (struct dataset *ds)
377 {
378   add_case_limit_trns (ds);
379   add_filter_trns (ds);
380
381   /* Finalize transformations. */
382   trns_chain_finalize (ds->cur_trns_chain);
383
384   /* Make permanent_dict refer to the dictionary right before
385      data reaches the sink. */
386   if (ds->permanent_dict == NULL)
387     ds->permanent_dict = ds->dict;
388
389   /* Figure out whether to compact. */
390   ds->compactor = 
391     (dict_compacting_would_shrink (ds->permanent_dict)
392      ? dict_make_compactor (ds->permanent_dict)
393      : NULL);
394
395   /* Prepare sink. */
396   if (ds->proc_sink == NULL)
397     ds->proc_sink = create_case_sink (&storage_sink_class,
398                                       ds->permanent_dict,
399                                       ds->cf_factory,
400                                       NULL);
401   if (ds->proc_sink->class->open != NULL)
402     ds->proc_sink->class->open (ds->proc_sink);
403
404   /* Allocate memory for lag queue. */
405   if (ds->n_lag > 0)
406     {
407       int i;
408   
409       ds->lag_count = 0;
410       ds->lag_head = 0;
411       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
412       for (i = 0; i < ds->n_lag; i++)
413         case_nullify (&ds->lag_queue[i]);
414     }
415 }
416
417 /* Add C to the lag queue. */
418 static void
419 lag_case (struct dataset *ds, const struct ccase *c)
420 {
421   if (ds->lag_count < ds->n_lag)
422     ds->lag_count++;
423   case_destroy (&ds->lag_queue[ds->lag_head]);
424   case_clone (&ds->lag_queue[ds->lag_head], c);
425   if (++ds->lag_head >= ds->n_lag)
426     ds->lag_head = 0;
427 }
428
429 /* Clears the variables in C that need to be cleared between
430    processing cases.  */
431 static void
432 clear_case (const struct dataset *ds, struct ccase *c)
433 {
434   size_t var_cnt = dict_get_var_cnt (ds->dict);
435   size_t i;
436   
437   for (i = 0; i < var_cnt; i++) 
438     {
439       struct variable *v = dict_get_var (ds->dict, i);
440       if (!var_get_leave (v)) 
441         {
442           if (var_is_numeric (v))
443             case_data_rw (c, v)->f = SYSMIS; 
444           else
445             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
446         } 
447     }
448 }
449
450 /* Closes the active file. */
451 static bool
452 close_active_file (struct dataset *ds)
453 {
454   /* Free memory for lag queue, and turn off lagging. */
455   if (ds->n_lag > 0)
456     {
457       int i;
458       
459       for (i = 0; i < ds->n_lag; i++)
460         case_destroy (&ds->lag_queue[i]);
461       free (ds->lag_queue);
462       ds->n_lag = 0;
463     }
464   
465   /* Dictionary from before TEMPORARY becomes permanent. */
466   proc_cancel_temporary_transformations (ds);
467
468   /* Finish compacting. */
469   if (ds->compactor != NULL) 
470     {
471       dict_compactor_destroy (ds->compactor);
472       dict_compact_values (ds->dict);
473       ds->compactor = NULL;
474     }
475     
476   /* Old data sink becomes new data source. */
477   if (ds->proc_sink->class->make_source != NULL)
478     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
479   free_case_sink (ds->proc_sink);
480   ds->proc_sink = NULL;
481
482   dict_clear_vectors (ds->dict);
483   ds->permanent_dict = NULL;
484   return proc_cancel_all_transformations (ds);
485 }
486 \f
487 /* Returns a pointer to the lagged case from N_BEFORE cases before the
488    current one, or NULL if there haven't been that many cases yet. */
489 struct ccase *
490 lagged_case (const struct dataset *ds, int n_before)
491 {
492   assert (n_before >= 1 );
493   assert (n_before <= ds->n_lag);
494
495   if (n_before <= ds->lag_count)
496     {
497       int index = ds->lag_head - n_before;
498       if (index < 0)
499         index += ds->n_lag;
500       return &ds->lag_queue[index];
501     }
502   else
503     return NULL;
504 }
505 \f
506 /* Procedure that separates the data into SPLIT FILE groups. */
507
508 /* Represents auxiliary data for handling SPLIT FILE. */
509 struct split_aux_data 
510   {
511     struct dataset *dataset;    /* The dataset */
512     struct ccase prev_case;     /* Data in previous case. */
513
514     /* Callback functions. */
515     begin_func *begin; 
516     case_func *proc;
517     end_func *end;
518     void *func_aux;
519   };
520
521 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
522 static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
523 static bool split_procedure_end_func (void *, const struct dataset *);
524
525 /* Like procedure(), but it automatically breaks the case stream
526    into SPLIT FILE break groups.  Before each group of cases with
527    identical SPLIT FILE variable values, BEGIN_FUNC is called
528    with the first case in the group.
529    Then PROC_FUNC is called for each case in the group (including
530    the first).
531    END_FUNC is called when the group is finished.  FUNC_AUX is
532    passed to each of the functions as auxiliary data.
533
534    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
535    and END_FUNC will be called at all. 
536
537    If SPLIT FILE is not in effect, then there is one break group
538    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
539    will be called once.
540    
541    Returns true if successful, false if an I/O error occurred. */
542 bool
543 procedure_with_splits (struct dataset *ds,
544                        begin_func begin, 
545                        case_func *proc,
546                        end_func *end,
547                        void *func_aux) 
548 {
549   struct split_aux_data split_aux;
550   bool ok;
551
552   case_nullify (&split_aux.prev_case);
553   split_aux.begin = begin;
554   split_aux.proc = proc;
555   split_aux.end = end;
556   split_aux.func_aux = func_aux;
557   split_aux.dataset = ds;
558
559   ok = internal_procedure (ds, split_procedure_case_func,
560                            split_procedure_end_func, &split_aux);
561
562   case_destroy (&split_aux.prev_case);
563
564   return ok;
565 }
566
567 /* Case callback used by procedure_with_splits(). */
568 static bool
569 split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
570 {
571   struct split_aux_data *split_aux = split_aux_;
572
573   /* Start a new series if needed. */
574   if (case_is_null (&split_aux->prev_case)
575       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
576     {
577       if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
578         split_aux->end (split_aux->func_aux, ds);
579
580       case_destroy (&split_aux->prev_case);
581       case_clone (&split_aux->prev_case, c);
582
583       if (split_aux->begin != NULL)
584         split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
585     }
586
587   return (split_aux->proc == NULL
588           || split_aux->proc (c, split_aux->func_aux, ds));
589 }
590
591 /* End-of-file callback used by procedure_with_splits(). */
592 static bool
593 split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
594 {
595   struct split_aux_data *split_aux = split_aux_;
596
597   if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
598     split_aux->end (split_aux->func_aux, ds);
599   return true;
600 }
601
602 /* Compares the SPLIT FILE variables in cases A and B and returns
603    nonzero only if they differ. */
604 static int
605 equal_splits (const struct ccase *a, const struct ccase *b, 
606               const struct dataset *ds) 
607 {
608   return case_compare (a, b,
609                        dict_get_split_vars (ds->dict),
610                        dict_get_split_cnt (ds->dict)) == 0;
611 }
612 \f
613 /* Multipass procedure that separates the data into SPLIT FILE
614    groups. */
615
616 /* Represents auxiliary data for handling SPLIT FILE in a
617    multipass procedure. */
618 struct multipass_split_aux_data 
619   {
620     struct dataset *dataset;    /* The dataset of the split */
621     struct ccase prev_case;     /* Data in previous case. */
622     struct casefile *casefile;  /* Accumulates data for a split. */
623     split_func *split;          /* Function to call with the accumulated 
624                                    data. */
625     void *func_aux;             /* Auxiliary data. */ 
626   };
627
628 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
629 static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
630 static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
631
632 /* Returns true if successful, false if an I/O error occurred. */
633 bool
634 multipass_procedure_with_splits (struct dataset *ds, 
635                                  split_func  *split,
636                                  void *func_aux)
637 {
638   struct multipass_split_aux_data aux;
639   bool ok;
640
641   case_nullify (&aux.prev_case);
642   aux.casefile = NULL;
643   aux.split = split;
644   aux.func_aux = func_aux;
645   aux.dataset = ds;
646
647   ok = internal_procedure (ds, multipass_split_case_func,
648                            multipass_split_end_func, &aux);
649   case_destroy (&aux.prev_case);
650
651   return ok;
652 }
653
654 /* Case callback used by multipass_procedure_with_splits(). */
655 static bool
656 multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
657 {
658   struct multipass_split_aux_data *aux = aux_;
659   bool ok = true;
660
661   /* Start a new series if needed. */
662   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
663     {
664       /* Record split values. */
665       case_destroy (&aux->prev_case);
666       case_clone (&aux->prev_case, c);
667
668       /* Pass any cases to split_func. */
669       if (aux->casefile != NULL)
670         ok = multipass_split_output (aux, ds);
671
672       /* Start a new casefile. */
673       aux->casefile = 
674         ds->cf_factory->create_casefile (ds->cf_factory,
675                                          dict_get_next_value_idx (ds->dict));
676     }
677
678   return casefile_append (aux->casefile, c) && ok;
679 }
680
681 /* End-of-file callback used by multipass_procedure_with_splits(). */
682 static bool
683 multipass_split_end_func (void *aux_, const struct dataset *ds)
684 {
685   struct multipass_split_aux_data *aux = aux_;
686   return (aux->casefile == NULL || multipass_split_output (aux, ds));
687 }
688
689 static bool
690 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
691 {
692   bool ok;
693   
694   assert (aux->casefile != NULL);
695   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
696   casefile_destroy (aux->casefile);
697   aux->casefile = NULL;
698
699   return ok;
700 }
701 \f
702 /* Discards all the current state in preparation for a data-input
703    command like DATA LIST or GET. */
704 void
705 discard_variables (struct dataset *ds)
706 {
707   dict_clear (ds->dict);
708   fh_set_default_handle (NULL);
709
710   ds->n_lag = 0;
711   
712   free_case_source (ds->proc_source);
713   ds->proc_source = NULL;
714
715   proc_cancel_all_transformations (ds);
716 }
717 \f
718 /* Returns the current set of permanent transformations,
719    and clears the permanent transformations.
720    For use by INPUT PROGRAM. */
721 struct trns_chain *
722 proc_capture_transformations (struct dataset *ds) 
723 {
724   struct trns_chain *chain;
725   
726   assert (ds->temporary_trns_chain == NULL);
727   chain = ds->permanent_trns_chain;
728   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
729   return chain;
730 }
731
732 /* Adds a transformation that processes a case with PROC and
733    frees itself with FREE to the current set of transformations.
734    The functions are passed AUX as auxiliary data. */
735 void
736 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
737 {
738   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
739 }
740
741 /* Adds a transformation that processes a case with PROC and
742    frees itself with FREE to the current set of transformations.
743    When parsing of the block of transformations is complete,
744    FINALIZE will be called.
745    The functions are passed AUX as auxiliary data. */
746 void
747 add_transformation_with_finalizer (struct dataset *ds, 
748                                    trns_finalize_func *finalize,
749                                    trns_proc_func *proc,
750                                    trns_free_func *free, void *aux)
751 {
752   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
753 }
754
755 /* Returns the index of the next transformation.
756    This value can be returned by a transformation procedure
757    function to indicate a "jump" to that transformation. */
758 size_t
759 next_transformation (const struct dataset *ds) 
760 {
761   return trns_chain_next (ds->cur_trns_chain);
762 }
763
764 /* Returns true if the next call to add_transformation() will add
765    a temporary transformation, false if it will add a permanent
766    transformation. */
767 bool
768 proc_in_temporary_transformations (const struct dataset *ds) 
769 {
770   return ds->temporary_trns_chain != NULL;
771 }
772
773 /* Marks the start of temporary transformations.
774    Further calls to add_transformation() will add temporary
775    transformations. */
776 void
777 proc_start_temporary_transformations (struct dataset *ds) 
778 {
779   if (!proc_in_temporary_transformations (ds))
780     {
781       add_case_limit_trns (ds);
782
783       ds->permanent_dict = dict_clone (ds->dict);
784       trns_chain_finalize (ds->permanent_trns_chain);
785       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
786     }
787 }
788
789 /* Converts all the temporary transformations, if any, to
790    permanent transformations.  Further transformations will be
791    permanent.
792    Returns true if anything changed, false otherwise. */
793 bool
794 proc_make_temporary_transformations_permanent (struct dataset *ds) 
795 {
796   if (proc_in_temporary_transformations (ds)) 
797     {
798       trns_chain_finalize (ds->temporary_trns_chain);
799       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
800       ds->temporary_trns_chain = NULL;
801
802       dict_destroy (ds->permanent_dict);
803       ds->permanent_dict = NULL;
804
805       return true;
806     }
807   else
808     return false;
809 }
810
811 /* Cancels all temporary transformations, if any.  Further
812    transformations will be permanent.
813    Returns true if anything changed, false otherwise. */
814 bool
815 proc_cancel_temporary_transformations (struct dataset *ds) 
816 {
817   if (proc_in_temporary_transformations (ds)) 
818     {
819       dict_destroy (ds->dict);
820       ds->dict = ds->permanent_dict;
821       ds->permanent_dict = NULL;
822
823       trns_chain_destroy (ds->temporary_trns_chain);
824       ds->temporary_trns_chain = NULL;
825
826       return true;
827     }
828   else
829     return false;
830 }
831
832 /* Cancels all transformations, if any.
833    Returns true if successful, false on I/O error. */
834 bool
835 proc_cancel_all_transformations (struct dataset *ds)
836 {
837   bool ok;
838   ok = trns_chain_destroy (ds->permanent_trns_chain);
839   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
840   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
841   ds->temporary_trns_chain = NULL;
842   return ok;
843 }
844 \f
845 /* Initializes procedure handling. */
846 struct dataset *
847 create_dataset (struct casefile_factory *fact)
848 {
849   struct dataset *ds = xzalloc (sizeof(*ds));
850   ds->dict = dict_create ();
851   ds->cf_factory = fact;
852   proc_cancel_all_transformations (ds);
853   return ds;
854 }
855
856 /* Finishes up procedure handling. */
857 void
858 destroy_dataset (struct dataset *ds)
859 {
860   discard_variables (ds);
861   dict_destroy (ds->dict);
862   trns_chain_destroy (ds->permanent_trns_chain);
863   free (ds);
864 }
865
866 /* Sets SINK as the destination for procedure output from the
867    next procedure. */
868 void
869 proc_set_sink (struct dataset *ds, struct case_sink *sink) 
870 {
871   assert (ds->proc_sink == NULL);
872   ds->proc_sink = sink;
873 }
874
875 /* Sets SOURCE as the source for procedure input for the next
876    procedure. */
877 void
878 proc_set_source (struct dataset *ds, struct case_source *source) 
879 {
880   assert (ds->proc_source == NULL);
881   ds->proc_source = source;
882 }
883
884 /* Returns true if a source for the next procedure has been
885    configured, false otherwise. */
886 bool
887 proc_has_source (const struct dataset *ds) 
888 {
889   return ds->proc_source != NULL;
890 }
891
892 /* Returns the output from the previous procedure.
893    For use only immediately after executing a procedure.
894    The returned casefile is owned by the caller; it will not be
895    automatically used for the next procedure's input. */
896 struct casefile *
897 proc_capture_output (struct dataset *ds) 
898 {
899   struct casefile *casefile;
900
901   /* Try to make sure that this function is called immediately
902      after procedure() or a similar function. */
903   assert (ds->proc_source != NULL);
904   assert (case_source_is_class (ds->proc_source, &storage_source_class));
905   assert (trns_chain_is_empty (ds->permanent_trns_chain));
906   assert (!proc_in_temporary_transformations (ds));
907
908   casefile = storage_source_decapsulate (ds->proc_source);
909   ds->proc_source = NULL;
910
911   return casefile;
912 }
913 \f
914 static trns_proc_func case_limit_trns_proc;
915 static trns_free_func case_limit_trns_free;
916
917 /* Adds a transformation that limits the number of cases that may
918    pass through, if DS->DICT has a case limit. */
919 static void
920 add_case_limit_trns (struct dataset *ds) 
921 {
922   size_t case_limit = dict_get_case_limit (ds->dict);
923   if (case_limit != 0)
924     {
925       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
926       *cases_remaining = case_limit;
927       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
928                           cases_remaining);
929       dict_set_case_limit (ds->dict, 0);
930     }
931 }
932
933 /* Limits the maximum number of cases processed to
934    *CASES_REMAINING. */
935 static int
936 case_limit_trns_proc (void *cases_remaining_,
937                       struct ccase *c UNUSED, casenumber case_nr UNUSED) 
938 {
939   size_t *cases_remaining = cases_remaining_;
940   if (*cases_remaining > 0) 
941     {
942       (*cases_remaining)--;
943       return TRNS_CONTINUE;
944     }
945   else
946     return TRNS_DROP_CASE;
947 }
948
949 /* Frees the data associated with a case limit transformation. */
950 static bool
951 case_limit_trns_free (void *cases_remaining_) 
952 {
953   size_t *cases_remaining = cases_remaining_;
954   free (cases_remaining);
955   return true;
956 }
957 \f
958 static trns_proc_func filter_trns_proc;
959
960 /* Adds a temporary transformation to filter data according to
961    the variable specified on FILTER, if any. */
962 static void
963 add_filter_trns (struct dataset *ds) 
964 {
965   struct variable *filter_var = dict_get_filter (ds->dict);
966   if (filter_var != NULL) 
967     {
968       proc_start_temporary_transformations (ds);
969       add_transformation (ds, filter_trns_proc, NULL, filter_var);
970     }
971 }
972
973 /* FILTER transformation. */
974 static int
975 filter_trns_proc (void *filter_var_,
976                   struct ccase *c UNUSED, casenumber case_nr UNUSED) 
977   
978 {
979   struct variable *filter_var = filter_var_;
980   double f = case_num (c, filter_var);
981   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
982           ? TRNS_CONTINUE : TRNS_DROP_CASE);
983 }
984
985
986 struct dictionary *
987 dataset_dict (const struct dataset *ds)
988 {
989   return ds->dict;
990 }
991
992
993 void 
994 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
995 {
996   ds->dict = dict;
997 }
998
999 int 
1000 dataset_n_lag (const struct dataset *ds)
1001 {
1002   return ds->n_lag;
1003 }
1004
1005 void 
1006 dataset_set_n_lag (struct dataset *ds, int n_lag)
1007 {
1008   ds->n_lag = n_lag;
1009 }
1010
1011
1012 struct casefile_factory *
1013 dataset_get_casefile_factory (const struct dataset *ds)
1014 {
1015   return ds->cf_factory;
1016 }
1017