Encapsulated the static data of procedure.[ch] into a single object, to be
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26
27 #include <data/case-source.h>
28 #include <data/case-sink.h>
29 #include <data/case.h>
30 #include <data/casefile.h>
31 #include <data/fastfile.h>
32 #include <data/dictionary.h>
33 #include <data/file-handle-def.h>
34 #include <data/procedure.h>
35 #include <data/storage-stream.h>
36 #include <data/transformations.h>
37 #include <data/variable.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/misc.h>
40 #include <libpspp/str.h>
41
42 /* Procedure execution data. */
43 struct write_case_data
44   {
45     /* Function to call for each case. */
46     case_func_t case_func;
47     void *aux;
48
49     struct dataset *dataset;    /* The dataset concerned */
50     struct ccase trns_case;     /* Case used for transformations. */
51     struct ccase sink_case;     /* Case written to sink, if
52                                    compacting is necessary. */
53     size_t cases_written;       /* Cases output so far. */
54   };
55
56 struct dataset {
57   /* Cases are read from proc_source,
58      pass through permanent_trns_chain (which transforms them into
59      the format described by permanent_dict),
60      are written to proc_sink,
61      pass through temporary_trns_chain (which transforms them into
62      the format described by dict),
63      and are finally passed to the procedure. */
64   struct case_source *proc_source;
65   struct trns_chain *permanent_trns_chain;
66   struct dictionary *permanent_dict;
67   struct case_sink *proc_sink;
68   struct trns_chain *temporary_trns_chain;
69   struct dictionary *dict;
70
71   /* The transformation chain that the next transformation will be
72      added to. */
73   struct trns_chain *cur_trns_chain;
74
75   /* The compactor used to compact a case, if necessary;
76      otherwise a null pointer. */
77   struct dict_compactor *compactor;
78
79   /* Time at which proc was last invoked. */
80   time_t last_proc_invocation;
81
82   /* Lag queue. */
83   int n_lag;                    /* Number of cases to lag. */
84   int lag_count;                /* Number of cases in lag_queue so far. */
85   int lag_head;         /* Index where next case will be added. */
86   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
87
88 }; /* struct dataset */
89
90
91 struct dataset *current_dataset;
92
93 static void add_case_limit_trns (struct dataset *ds);
94 static void add_filter_trns (struct dataset *ds);
95
96 static bool internal_procedure (struct dataset *ds, case_func_t,
97                                 bool (*end_func) (void *),
98                                 void *aux);
99 static void update_last_proc_invocation (struct dataset *ds);
100 static void create_trns_case (struct ccase *, struct dictionary *);
101 static void open_active_file (struct dataset *ds);
102 static bool write_case (struct write_case_data *wc_data);
103 static void lag_case (struct dataset *ds, const struct ccase *c);
104 static void clear_case (const struct dataset *ds, struct ccase *c);
105 static bool close_active_file (struct dataset *ds);
106 \f
107 /* Public functions. */
108
109 /* Returns the last time the data was read. */
110 time_t
111 time_of_last_procedure (struct dataset *ds) 
112 {
113   if (ds->last_proc_invocation == 0)
114     update_last_proc_invocation (ds);
115   return ds->last_proc_invocation;
116 }
117 \f
118 /* Regular procedure. */
119
120
121
122 /* Reads the data from the input program and writes it to a new
123    active file.  For each case we read from the input program, we
124    do the following:
125
126    1. Execute permanent transformations.  If these drop the case,
127       start the next case from step 1.
128
129    2. Write case to replacement active file.
130    
131    3. Execute temporary transformations.  If these drop the case,
132       start the next case from step 1.
133       
134    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
135
136    Returns true if successful, false if an I/O error occurred. */
137 bool
138 procedure (struct dataset *ds, case_func_t cf, void *aux)
139 {
140   return internal_procedure (ds, cf, NULL, aux);
141 }
142 \f
143 /* Multipass procedure. */
144
145 struct multipass_aux_data 
146   {
147     struct casefile *casefile;
148     
149     bool (*proc_func) (const struct casefile *, void *aux);
150     void *aux;
151   };
152
153 /* Case processing function for multipass_procedure(). */
154 static bool
155 multipass_case_func (const struct ccase *c, void *aux_data_) 
156 {
157   struct multipass_aux_data *aux_data = aux_data_;
158   return casefile_append (aux_data->casefile, c);
159 }
160
161 /* End-of-file function for multipass_procedure(). */
162 static bool
163 multipass_end_func (void *aux_data_) 
164 {
165   struct multipass_aux_data *aux_data = aux_data_;
166   return (aux_data->proc_func == NULL
167           || aux_data->proc_func (aux_data->casefile, aux_data->aux));
168 }
169
170 /* Procedure that allows multiple passes over the input data.
171    The entire active file is passed to PROC_FUNC, with the given
172    AUX as auxiliary data, as a unit. */
173 bool
174 multipass_procedure (struct dataset *ds, casefile_func_t proc_func,  void *aux) 
175 {
176   struct multipass_aux_data aux_data;
177   bool ok;
178
179   aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
180   aux_data.proc_func = proc_func;
181   aux_data.aux = aux;
182
183   ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
184   ok = !casefile_error (aux_data.casefile) && ok;
185
186   casefile_destroy (aux_data.casefile);
187
188   return ok;
189 }
190 \f
191 /* Procedure implementation. */
192
193
194 /* Executes a procedure.
195    Passes each case to CASE_FUNC.
196    Calls END_FUNC after the last case.
197    Returns true if successful, false if an I/O error occurred (or
198    if CASE_FUNC or END_FUNC ever returned false). */
199 static bool
200 internal_procedure (struct dataset *ds, case_func_t case_func,
201                     bool (*end_func) (void *),
202                     void *aux) 
203 {
204   struct write_case_data wc_data;
205   bool ok = true;
206
207   assert (ds->proc_source != NULL);
208
209   update_last_proc_invocation (ds);
210
211   /* Optimize the trivial case where we're not going to do
212      anything with the data, by not reading the data at all. */
213   if (case_func == NULL && end_func == NULL
214       && case_source_is_class (ds->proc_source, &storage_source_class)
215       && ds->proc_sink == NULL
216       && (ds->temporary_trns_chain == NULL
217           || trns_chain_is_empty (ds->temporary_trns_chain))
218       && trns_chain_is_empty (ds->permanent_trns_chain))
219     {
220       ds->n_lag = 0;
221       dict_set_case_limit (ds->dict, 0);
222       dict_clear_vectors (ds->dict);
223       return true;
224     }
225   
226   open_active_file (ds);
227   
228   wc_data.case_func = case_func;
229   wc_data.aux = aux;
230   wc_data.dataset = ds;
231   create_trns_case (&wc_data.trns_case, ds->dict);
232   case_create (&wc_data.sink_case,
233                dict_get_compacted_value_cnt (ds->dict));
234   wc_data.cases_written = 0;
235
236   ok = ds->proc_source->class->read (ds->proc_source,
237                                  &wc_data.trns_case,
238                                  write_case, &wc_data) && ok;
239   if (end_func != NULL)
240     ok = end_func (aux) && ok;
241
242   case_destroy (&wc_data.sink_case);
243   case_destroy (&wc_data.trns_case);
244
245   ok = close_active_file (ds) && ok;
246
247   return ok;
248 }
249
250 /* Updates last_proc_invocation. */
251 static void
252 update_last_proc_invocation (struct dataset *ds) 
253 {
254   ds->last_proc_invocation = time (NULL);
255 }
256
257 /* Creates and returns a case, initializing it from the vectors
258    that say which `value's need to be initialized just once, and
259    which ones need to be re-initialized before every case. */
260 static void
261 create_trns_case (struct ccase *trns_case, struct dictionary *dict)
262 {
263   size_t var_cnt = dict_get_var_cnt (dict);
264   size_t i;
265
266   case_create (trns_case, dict_get_next_value_idx (dict));
267   for (i = 0; i < var_cnt; i++) 
268     {
269       struct variable *v = dict_get_var (dict, i);
270       union value *value = case_data_rw (trns_case, v->fv);
271
272       if (v->type == NUMERIC)
273         value->f = v->leave ? 0.0 : SYSMIS;
274       else
275         memset (value->s, ' ', v->width);
276     }
277 }
278
279 /* Makes all preparations for reading from the data source and writing
280    to the data sink. */
281 static void
282 open_active_file (struct dataset *ds)
283 {
284   add_case_limit_trns (ds);
285   add_filter_trns (ds);
286
287   /* Finalize transformations. */
288   trns_chain_finalize (ds->cur_trns_chain);
289
290   /* Make permanent_dict refer to the dictionary right before
291      data reaches the sink. */
292   if (ds->permanent_dict == NULL)
293     ds->permanent_dict = ds->dict;
294
295   /* Figure out whether to compact. */
296   ds->compactor = 
297     (dict_compacting_would_shrink (ds->permanent_dict)
298      ? dict_make_compactor (ds->permanent_dict)
299      : NULL);
300
301   /* Prepare sink. */
302   if (ds->proc_sink == NULL)
303     ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
304   if (ds->proc_sink->class->open != NULL)
305     ds->proc_sink->class->open (ds->proc_sink);
306
307   /* Allocate memory for lag queue. */
308   if (ds->n_lag > 0)
309     {
310       int i;
311   
312       ds->lag_count = 0;
313       ds->lag_head = 0;
314       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
315       for (i = 0; i < ds->n_lag; i++)
316         case_nullify (&ds->lag_queue[i]);
317     }
318 }
319
320 /* Transforms trns_case and writes it to the replacement active
321    file if advisable.  Returns true if more cases can be
322    accepted, false otherwise.  Do not call this function again
323    after it has returned false once.  */
324 static bool
325 write_case (struct write_case_data *wc_data)
326 {
327   enum trns_result retval;
328   size_t case_nr;
329
330   struct dataset *ds = wc_data->dataset;
331   
332   /* Execute permanent transformations.  */
333   case_nr = wc_data->cases_written + 1;
334   retval = trns_chain_execute (ds->permanent_trns_chain,
335                                &wc_data->trns_case, &case_nr);
336   if (retval != TRNS_CONTINUE)
337     goto done;
338
339   /* Write case to LAG queue. */
340   if (ds->n_lag)
341     lag_case (ds, &wc_data->trns_case);
342
343   /* Write case to replacement active file. */
344   wc_data->cases_written++;
345   if (ds->proc_sink->class->write != NULL) 
346     {
347       if (ds->compactor != NULL) 
348         {
349           dict_compactor_compact (ds->compactor, &wc_data->sink_case,
350                                   &wc_data->trns_case);
351           ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
352         }
353       else
354         ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
355     }
356   
357   /* Execute temporary transformations. */
358   if (ds->temporary_trns_chain != NULL) 
359     {
360       retval = trns_chain_execute (ds->temporary_trns_chain,
361                                    &wc_data->trns_case,
362                                    &wc_data->cases_written);
363       if (retval != TRNS_CONTINUE)
364         goto done;
365     }
366
367   /* Pass case to procedure. */
368   if (wc_data->case_func != NULL)
369     if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
370       retval = TRNS_ERROR;
371
372  done:
373   clear_case (ds, &wc_data->trns_case);
374   return retval != TRNS_ERROR;
375 }
376
377 /* Add C to the lag queue. */
378 static void
379 lag_case (struct dataset *ds, const struct ccase *c)
380 {
381   if (ds->lag_count < ds->n_lag)
382     ds->lag_count++;
383   case_destroy (&ds->lag_queue[ds->lag_head]);
384   case_clone (&ds->lag_queue[ds->lag_head], c);
385   if (++ds->lag_head >= ds->n_lag)
386     ds->lag_head = 0;
387 }
388
389 /* Clears the variables in C that need to be cleared between
390    processing cases.  */
391 static void
392 clear_case (const struct dataset *ds, struct ccase *c)
393 {
394   size_t var_cnt = dict_get_var_cnt (ds->dict);
395   size_t i;
396   
397   for (i = 0; i < var_cnt; i++) 
398     {
399       struct variable *v = dict_get_var (ds->dict, i);
400       if (!v->leave) 
401         {
402           if (v->type == NUMERIC)
403             case_data_rw (c, v->fv)->f = SYSMIS;
404           else
405             memset (case_data_rw (c, v->fv)->s, ' ', v->width);
406         } 
407     }
408 }
409
410 /* Closes the active file. */
411 static bool
412 close_active_file (struct dataset *ds)
413 {
414   /* Free memory for lag queue, and turn off lagging. */
415   if (ds->n_lag > 0)
416     {
417       int i;
418       
419       for (i = 0; i < ds->n_lag; i++)
420         case_destroy (&ds->lag_queue[i]);
421       free (ds->lag_queue);
422       ds->n_lag = 0;
423     }
424   
425   /* Dictionary from before TEMPORARY becomes permanent. */
426   proc_cancel_temporary_transformations (ds);
427
428   /* Finish compacting. */
429   if (ds->compactor != NULL) 
430     {
431       dict_compactor_destroy (ds->compactor);
432       dict_compact_values (ds->dict);
433       ds->compactor = NULL;
434     }
435     
436   /* Free data source. */
437   free_case_source (ds->proc_source);
438   ds->proc_source = NULL;
439
440   /* Old data sink becomes new data source. */
441   if (ds->proc_sink->class->make_source != NULL)
442     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
443   free_case_sink (ds->proc_sink);
444   ds->proc_sink = NULL;
445
446   dict_clear_vectors (ds->dict);
447   ds->permanent_dict = NULL;
448   return proc_cancel_all_transformations (ds);
449 }
450 \f
451 /* Returns a pointer to the lagged case from N_BEFORE cases before the
452    current one, or NULL if there haven't been that many cases yet. */
453 struct ccase *
454 lagged_case (const struct dataset *ds, int n_before)
455 {
456   assert (n_before >= 1 );
457   assert (n_before <= ds->n_lag);
458
459   if (n_before <= ds->lag_count)
460     {
461       int index = ds->lag_head - n_before;
462       if (index < 0)
463         index += ds->n_lag;
464       return &ds->lag_queue[index];
465     }
466   else
467     return NULL;
468 }
469 \f
470 /* Procedure that separates the data into SPLIT FILE groups. */
471
472 /* Represents auxiliary data for handling SPLIT FILE. */
473 struct split_aux_data 
474   {
475     struct dataset *dataset;    /* The dataset */
476     struct ccase prev_case;     /* Data in previous case. */
477
478     /* Callback functions. */
479     begin_func_t begin_func ; 
480     case_func_t proc_func ;
481     void (*end_func) (void *);
482     void *func_aux;
483   };
484
485 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
486 static bool split_procedure_case_func (const struct ccase *c, void *);
487 static bool split_procedure_end_func (void *);
488
489 /* Like procedure(), but it automatically breaks the case stream
490    into SPLIT FILE break groups.  Before each group of cases with
491    identical SPLIT FILE variable values, BEGIN_FUNC is called
492    with the first case in the group.
493    Then PROC_FUNC is called for each case in the group (including
494    the first).
495    END_FUNC is called when the group is finished.  FUNC_AUX is
496    passed to each of the functions as auxiliary data.
497
498    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
499    and END_FUNC will be called at all. 
500
501    If SPLIT FILE is not in effect, then there is one break group
502    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
503    will be called once.
504    
505    Returns true if successful, false if an I/O error occurred. */
506 bool
507 procedure_with_splits (struct dataset *ds,
508                        begin_func_t begin_func, 
509                        case_func_t proc_func,
510                        void (*end_func) (void *aux),
511                        void *func_aux) 
512 {
513   struct split_aux_data split_aux;
514   bool ok;
515
516   case_nullify (&split_aux.prev_case);
517   split_aux.begin_func = begin_func;
518   split_aux.proc_func = proc_func;
519   split_aux.end_func = end_func;
520   split_aux.func_aux = func_aux;
521   split_aux.dataset = ds;
522
523   ok = internal_procedure (ds, split_procedure_case_func,
524                            split_procedure_end_func, &split_aux);
525
526   case_destroy (&split_aux.prev_case);
527
528   return ok;
529 }
530
531 /* Case callback used by procedure_with_splits(). */
532 static bool
533 split_procedure_case_func (const struct ccase *c, void *split_aux_) 
534 {
535   struct split_aux_data *split_aux = split_aux_;
536
537   /* Start a new series if needed. */
538   if (case_is_null (&split_aux->prev_case)
539       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
540     {
541       if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
542         split_aux->end_func (split_aux->func_aux);
543
544       case_destroy (&split_aux->prev_case);
545       case_clone (&split_aux->prev_case, c);
546
547       if (split_aux->begin_func != NULL)
548         split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
549
550     }
551
552   return (split_aux->proc_func == NULL
553           || split_aux->proc_func (c, split_aux->func_aux));
554 }
555
556 /* End-of-file callback used by procedure_with_splits(). */
557 static bool
558 split_procedure_end_func (void *split_aux_) 
559 {
560   struct split_aux_data *split_aux = split_aux_;
561
562   if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
563     split_aux->end_func (split_aux->func_aux);
564   return true;
565 }
566
567 /* Compares the SPLIT FILE variables in cases A and B and returns
568    nonzero only if they differ. */
569 static int
570 equal_splits (const struct ccase *a, const struct ccase *b, 
571               const struct dataset *ds) 
572 {
573   return case_compare (a, b,
574                        dict_get_split_vars (ds->dict),
575                        dict_get_split_cnt (ds->dict)) == 0;
576 }
577 \f
578 /* Multipass procedure that separates the data into SPLIT FILE
579    groups. */
580
581 /* Represents auxiliary data for handling SPLIT FILE in a
582    multipass procedure. */
583 struct multipass_split_aux_data 
584   {
585     struct dataset *dataset;    /* The dataset of the split */
586     struct ccase prev_case;     /* Data in previous case. */
587     struct casefile *casefile;  /* Accumulates data for a split. */
588
589     /* Function to call with the accumulated data. */
590     bool (*split_func) (const struct ccase *first, const struct casefile *,
591                         void *);
592     void *func_aux;                            /* Auxiliary data. */ 
593   };
594
595 static bool multipass_split_case_func (const struct ccase *c, void *aux_);
596 static bool multipass_split_end_func (void *aux_);
597 static bool multipass_split_output (struct multipass_split_aux_data *);
598
599 /* Returns true if successful, false if an I/O error occurred. */
600 bool
601 multipass_procedure_with_splits (struct dataset *ds, 
602                                  bool (*split_func) (const struct ccase *first,
603                                                      const struct casefile *,
604                                                      void *aux),
605                                  void *func_aux)
606 {
607   struct multipass_split_aux_data aux;
608   bool ok;
609
610   case_nullify (&aux.prev_case);
611   aux.casefile = NULL;
612   aux.split_func = split_func;
613   aux.func_aux = func_aux;
614   aux.dataset = ds;
615
616   ok = internal_procedure (ds, multipass_split_case_func,
617                            multipass_split_end_func, &aux);
618   case_destroy (&aux.prev_case);
619
620   return ok;
621 }
622
623 /* Case callback used by multipass_procedure_with_splits(). */
624 static bool
625 multipass_split_case_func (const struct ccase *c, void *aux_)
626 {
627   struct multipass_split_aux_data *aux = aux_;
628   struct dataset *ds = aux->dataset;
629   bool ok = true;
630
631   /* Start a new series if needed. */
632   if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
633     {
634       /* Record split values. */
635       case_destroy (&aux->prev_case);
636       case_clone (&aux->prev_case, c);
637
638       /* Pass any cases to split_func. */
639       if (aux->casefile != NULL)
640         ok = multipass_split_output (aux);
641
642       /* Start a new casefile. */
643       aux->casefile = 
644         fastfile_create (dict_get_next_value_idx (ds->dict));
645     }
646
647   return casefile_append (aux->casefile, c) && ok;
648 }
649
650 /* End-of-file callback used by multipass_procedure_with_splits(). */
651 static bool
652 multipass_split_end_func (void *aux_)
653 {
654   struct multipass_split_aux_data *aux = aux_;
655   return (aux->casefile == NULL || multipass_split_output (aux));
656 }
657
658 static bool
659 multipass_split_output (struct multipass_split_aux_data *aux)
660 {
661   bool ok;
662   
663   assert (aux->casefile != NULL);
664   ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
665   casefile_destroy (aux->casefile);
666   aux->casefile = NULL;
667
668   return ok;
669 }
670 \f
671 /* Discards all the current state in preparation for a data-input
672    command like DATA LIST or GET. */
673 void
674 discard_variables (struct dataset *ds)
675 {
676   dict_clear (ds->dict);
677   fh_set_default_handle (NULL);
678
679   ds->n_lag = 0;
680   
681   free_case_source (ds->proc_source);
682   ds->proc_source = NULL;
683
684   proc_cancel_all_transformations (ds);
685 }
686 \f
687 /* Returns the current set of permanent transformations,
688    and clears the permanent transformations.
689    For use by INPUT PROGRAM. */
690 struct trns_chain *
691 proc_capture_transformations (struct dataset *ds) 
692 {
693   struct trns_chain *chain;
694   
695   assert (ds->temporary_trns_chain == NULL);
696   chain = ds->permanent_trns_chain;
697   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
698   return chain;
699 }
700
701 /* Adds a transformation that processes a case with PROC and
702    frees itself with FREE to the current set of transformations.
703    The functions are passed AUX as auxiliary data. */
704 void
705 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
706 {
707   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
708 }
709
710 /* Adds a transformation that processes a case with PROC and
711    frees itself with FREE to the current set of transformations.
712    When parsing of the block of transformations is complete,
713    FINALIZE will be called.
714    The functions are passed AUX as auxiliary data. */
715 void
716 add_transformation_with_finalizer (struct dataset *ds, 
717                                    trns_finalize_func *finalize,
718                                    trns_proc_func *proc,
719                                    trns_free_func *free, void *aux)
720 {
721   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
722 }
723
724 /* Returns the index of the next transformation.
725    This value can be returned by a transformation procedure
726    function to indicate a "jump" to that transformation. */
727 size_t
728 next_transformation (const struct dataset *ds) 
729 {
730   return trns_chain_next (ds->cur_trns_chain);
731 }
732
733 /* Returns true if the next call to add_transformation() will add
734    a temporary transformation, false if it will add a permanent
735    transformation. */
736 bool
737 proc_in_temporary_transformations (const struct dataset *ds) 
738 {
739   return ds->temporary_trns_chain != NULL;
740 }
741
742 /* Marks the start of temporary transformations.
743    Further calls to add_transformation() will add temporary
744    transformations. */
745 void
746 proc_start_temporary_transformations (struct dataset *ds) 
747 {
748   if (!proc_in_temporary_transformations (ds))
749     {
750       add_case_limit_trns (ds);
751
752       ds->permanent_dict = dict_clone (ds->dict);
753       trns_chain_finalize (ds->permanent_trns_chain);
754       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
755     }
756 }
757
758 /* Converts all the temporary transformations, if any, to
759    permanent transformations.  Further transformations will be
760    permanent.
761    Returns true if anything changed, false otherwise. */
762 bool
763 proc_make_temporary_transformations_permanent (struct dataset *ds) 
764 {
765   if (proc_in_temporary_transformations (ds)) 
766     {
767       trns_chain_finalize (ds->temporary_trns_chain);
768       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
769       ds->temporary_trns_chain = NULL;
770
771       dict_destroy (ds->permanent_dict);
772       ds->permanent_dict = NULL;
773
774       return true;
775     }
776   else
777     return false;
778 }
779
780 /* Cancels all temporary transformations, if any.  Further
781    transformations will be permanent.
782    Returns true if anything changed, false otherwise. */
783 bool
784 proc_cancel_temporary_transformations (struct dataset *ds) 
785 {
786   if (proc_in_temporary_transformations (ds)) 
787     {
788       dict_destroy (ds->dict);
789       ds->dict = ds->permanent_dict;
790       ds->permanent_dict = NULL;
791
792       trns_chain_destroy (ds->temporary_trns_chain);
793       ds->temporary_trns_chain = NULL;
794
795       return true;
796     }
797   else
798     return false;
799 }
800
801 /* Cancels all transformations, if any.
802    Returns true if successful, false on I/O error. */
803 bool
804 proc_cancel_all_transformations (struct dataset *ds)
805 {
806   bool ok;
807   ok = trns_chain_destroy (ds->permanent_trns_chain);
808   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
809   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
810   ds->temporary_trns_chain = NULL;
811   return ok;
812 }
813 \f
814 /* Initializes procedure handling. */
815 struct dataset *
816 create_dataset (void)
817 {
818   struct dataset *ds = xzalloc (sizeof(*ds));
819   ds->dict = dict_create ();
820   proc_cancel_all_transformations (ds);
821   return ds;
822 }
823
824 /* Finishes up procedure handling. */
825 void
826 destroy_dataset (struct dataset *ds)
827 {
828   discard_variables (ds);
829   dict_destroy (ds->dict);
830   free (ds);
831 }
832
833 /* Sets SINK as the destination for procedure output from the
834    next procedure. */
835 void
836 proc_set_sink (struct dataset *ds, struct case_sink *sink) 
837 {
838   assert (ds->proc_sink == NULL);
839   ds->proc_sink = sink;
840 }
841
842 /* Sets SOURCE as the source for procedure input for the next
843    procedure. */
844 void
845 proc_set_source (struct dataset *ds, struct case_source *source) 
846 {
847   assert (ds->proc_source == NULL);
848   ds->proc_source = source;
849 }
850
851 /* Returns true if a source for the next procedure has been
852    configured, false otherwise. */
853 bool
854 proc_has_source (const struct dataset *ds) 
855 {
856   return ds->proc_source != NULL;
857 }
858
859 /* Returns the output from the previous procedure.
860    For use only immediately after executing a procedure.
861    The returned casefile is owned by the caller; it will not be
862    automatically used for the next procedure's input. */
863 struct casefile *
864 proc_capture_output (struct dataset *ds) 
865 {
866   struct casefile *casefile;
867
868   /* Try to make sure that this function is called immediately
869      after procedure() or a similar function. */
870   assert (ds->proc_source != NULL);
871   assert (case_source_is_class (ds->proc_source, &storage_source_class));
872   assert (trns_chain_is_empty (ds->permanent_trns_chain));
873   assert (!proc_in_temporary_transformations (ds));
874
875   casefile = storage_source_decapsulate (ds->proc_source);
876   ds->proc_source = NULL;
877
878   return casefile;
879 }
880 \f
881 static trns_proc_func case_limit_trns_proc;
882 static trns_free_func case_limit_trns_free;
883
884 /* Adds a transformation that limits the number of cases that may
885    pass through, if DS->DICT has a case limit. */
886 static void
887 add_case_limit_trns (struct dataset *ds) 
888 {
889   size_t case_limit = dict_get_case_limit (ds->dict);
890   if (case_limit != 0)
891     {
892       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
893       *cases_remaining = case_limit;
894       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
895                           cases_remaining);
896       dict_set_case_limit (ds->dict, 0);
897     }
898 }
899
900 /* Limits the maximum number of cases processed to
901    *CASES_REMAINING. */
902 static int
903 case_limit_trns_proc (void *cases_remaining_,
904                       struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
905 {
906   size_t *cases_remaining = cases_remaining_;
907   if (*cases_remaining > 0) 
908     {
909       (*cases_remaining)--;
910       return TRNS_CONTINUE;
911     }
912   else
913     return TRNS_DROP_CASE;
914 }
915
916 /* Frees the data associated with a case limit transformation. */
917 static bool
918 case_limit_trns_free (void *cases_remaining_) 
919 {
920   size_t *cases_remaining = cases_remaining_;
921   free (cases_remaining);
922   return true;
923 }
924 \f
925 static trns_proc_func filter_trns_proc;
926
927 /* Adds a temporary transformation to filter data according to
928    the variable specified on FILTER, if any. */
929 static void
930 add_filter_trns (struct dataset *ds) 
931 {
932   struct variable *filter_var = dict_get_filter (ds->dict);
933   if (filter_var != NULL) 
934     {
935       proc_start_temporary_transformations (ds);
936       add_transformation (ds, filter_trns_proc, NULL, filter_var);
937     }
938 }
939
940 /* FILTER transformation. */
941 static int
942 filter_trns_proc (void *filter_var_,
943                   struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
944   
945 {
946   struct variable *filter_var = filter_var_;
947   double f = case_num (c, filter_var->fv);
948   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
949           ? TRNS_CONTINUE : TRNS_DROP_CASE);
950 }
951
952
953 struct dictionary *
954 dataset_dict (const struct dataset *ds)
955 {
956   return ds->dict;
957 }
958
959
960 void 
961 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
962 {
963   ds->dict = dict;
964 }
965
966 int 
967 dataset_n_lag (const struct dataset *ds)
968 {
969   return ds->n_lag;
970 }
971
972 void 
973 dataset_set_n_lag (struct dataset *ds, int n_lag)
974 {
975   ds->n_lag = n_lag;
976 }
977
978