Patch #6086. Adds "transformation pending" state.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40
41 struct dataset {
42   /* Cases are read from source,
43      their transformation variables are initialized,
44      pass through permanent_trns_chain (which transforms them into
45      the format described by permanent_dict),
46      are written to sink,
47      pass through temporary_trns_chain (which transforms them into
48      the format described by dict),
49      and are finally passed to the procedure. */
50   struct casereader *source;
51   struct caseinit *caseinit;
52   struct trns_chain *permanent_trns_chain;
53   struct dictionary *permanent_dict;
54   struct casewriter *sink;
55   struct trns_chain *temporary_trns_chain;
56   struct dictionary *dict;
57
58   /* Callback which occurs when a procedure provides a new source for
59      the dataset */
60   replace_source_callback *replace_source ;
61
62   /* Callback which occurs whenever the DICT is replaced by a new one */
63   replace_dictionary_callback *replace_dict;
64
65   /* Callback which occurs whenever the transformation chain(s) have
66      been modified */
67   transformation_change_callback_func *xform_callback;
68   void *xform_callback_aux;
69
70   /* If true, cases are discarded instead of being written to
71      sink. */
72   bool discard_output;
73
74   /* The transformation chain that the next transformation will be
75      added to. */
76   struct trns_chain *cur_trns_chain;
77
78   /* The compactor used to compact a case, if necessary;
79      otherwise a null pointer. */
80   struct dict_compactor *compactor;
81
82   /* Time at which proc was last invoked. */
83   time_t last_proc_invocation;
84
85   /* Cases just before ("lagging") the current one. */
86   int n_lag;                    /* Number of cases to lag. */
87   struct deque lag;             /* Deque of lagged cases. */
88   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
89
90   /* Procedure data. */
91   enum
92     {
93       PROC_COMMITTED,           /* No procedure in progress. */
94       PROC_OPEN,                /* proc_open called, casereader still open. */
95       PROC_CLOSED               /* casereader from proc_open destroyed,
96                                    but proc_commit not yet called. */
97     }
98   proc_state;
99   casenumber cases_written;       /* Cases output so far. */
100   bool ok;                    /* Error status. */
101 }; /* struct dataset */
102
103
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
106
107 static void update_last_proc_invocation (struct dataset *ds);
108 \f
109 /* Public functions. */
110
111 /* Returns the last time the data was read. */
112 time_t
113 time_of_last_procedure (struct dataset *ds)
114 {
115   if (ds->last_proc_invocation == 0)
116     update_last_proc_invocation (ds);
117   return ds->last_proc_invocation;
118 }
119 \f
120 /* Regular procedure. */
121
122 /* Executes any pending transformations, if necessary.
123    This is not identical to the EXECUTE command in that it won't
124    always read the source data.  This can be important when the
125    source data is given inline within BEGIN DATA...END FILE. */
126 bool
127 proc_execute (struct dataset *ds)
128 {
129   bool ok;
130
131   if ((ds->temporary_trns_chain == NULL
132        || trns_chain_is_empty (ds->temporary_trns_chain))
133       && trns_chain_is_empty (ds->permanent_trns_chain))
134     {
135       ds->n_lag = 0;
136       ds->discard_output = false;
137       dict_set_case_limit (ds->dict, 0);
138       dict_clear_vectors (ds->dict);
139       return true;
140     }
141
142   ok = casereader_destroy (proc_open (ds));
143   return proc_commit (ds) && ok;
144 }
145
146 static struct casereader_class proc_casereader_class;
147
148 /* Opens dataset DS for reading cases with proc_read.
149    proc_commit must be called when done. */
150 struct casereader *
151 proc_open (struct dataset *ds)
152 {
153   assert (ds->source != NULL);
154   assert (ds->proc_state == PROC_COMMITTED);
155
156   update_last_proc_invocation (ds);
157
158   caseinit_mark_for_init (ds->caseinit, ds->dict);
159
160   /* Finish up the collection of transformations. */
161   add_case_limit_trns (ds);
162   add_filter_trns (ds);
163   trns_chain_finalize (ds->cur_trns_chain);
164
165   /* Make permanent_dict refer to the dictionary right before
166      data reaches the sink. */
167   if (ds->permanent_dict == NULL)
168     ds->permanent_dict = ds->dict;
169
170   /* Prepare sink. */
171   if (!ds->discard_output)
172     {
173       ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
174                        ? dict_make_compactor (ds->permanent_dict)
175                        : NULL);
176       ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
177                                              ds->permanent_dict));
178     }
179   else
180     {
181       ds->compactor = NULL;
182       ds->sink = NULL;
183     }
184
185   /* Allocate memory for lagged cases. */
186   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
187
188   ds->proc_state = PROC_OPEN;
189   ds->cases_written = 0;
190   ds->ok = true;
191
192   /* FIXME: use taint in dataset in place of `ok'? */
193   /* FIXME: for trivial cases we can just return a clone of
194      ds->source? */
195   return casereader_create_sequential (NULL,
196                                        dict_get_next_value_idx (ds->dict),
197                                        CASENUMBER_MAX,
198                                        &proc_casereader_class, ds);
199 }
200
201 /* Returns true if a procedure is in progress, that is, if
202    proc_open has been called but proc_commit has not. */
203 bool
204 proc_is_open (const struct dataset *ds)
205 {
206   return ds->proc_state != PROC_COMMITTED;
207 }
208
209 /* "read" function for procedure casereader. */
210 static bool
211 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
212                       struct ccase *c)
213 {
214   struct dataset *ds = ds_;
215   enum trns_result retval = TRNS_DROP_CASE;
216
217   assert (ds->proc_state == PROC_OPEN);
218   for (;;)
219     {
220       casenumber case_nr;
221
222       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
223       if (retval == TRNS_ERROR)
224         ds->ok = false;
225       if (!ds->ok)
226         return false;
227
228       /* Read a case from source. */
229       if (!casereader_read (ds->source, c))
230         return false;
231       case_resize (c, dict_get_next_value_idx (ds->dict));
232       caseinit_init_vars (ds->caseinit, c);
233
234       /* Execute permanent transformations.  */
235       case_nr = ds->cases_written + 1;
236       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
237                                    c, case_nr);
238       caseinit_update_left_vars (ds->caseinit, c);
239       if (retval != TRNS_CONTINUE)
240         {
241           case_destroy (c);
242           continue;
243         }
244
245       /* Write case to collection of lagged cases. */
246       if (ds->n_lag > 0)
247         {
248           while (deque_count (&ds->lag) >= ds->n_lag)
249             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
250           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
251         }
252
253       /* Write case to replacement active file. */
254       ds->cases_written++;
255       if (ds->sink != NULL)
256         {
257           struct ccase tmp;
258           if (ds->compactor != NULL)
259             {
260               case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
261               dict_compactor_compact (ds->compactor, &tmp, c);
262             }
263           else
264             case_clone (&tmp, c);
265           casewriter_write (ds->sink, &tmp);
266         }
267
268       /* Execute temporary transformations. */
269       if (ds->temporary_trns_chain != NULL)
270         {
271           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
272                                        c, ds->cases_written);
273           if (retval != TRNS_CONTINUE)
274             {
275               case_destroy (c);
276               continue;
277             }
278         }
279
280       return true;
281     }
282 }
283
284 /* "destroy" function for procedure casereader. */
285 static void
286 proc_casereader_destroy (struct casereader *reader, void *ds_)
287 {
288   struct dataset *ds = ds_;
289   struct ccase c;
290
291   /* Make sure transformations happen for every input case, in
292      case they have side effects, and ensure that the replacement
293      active file gets all the cases it should. */
294   while (casereader_read (reader, &c))
295     case_destroy (&c);
296
297   ds->proc_state = PROC_CLOSED;
298   ds->ok = casereader_destroy (ds->source) && ds->ok;
299   ds->source = NULL;
300   proc_set_active_file_data (ds, NULL);
301 }
302
303 /* Must return false if the source casereader, a transformation,
304    or the sink casewriter signaled an error.  (If a temporary
305    transformation signals an error, then the return value is
306    false, but the replacement active file may still be
307    untainted.) */
308 bool
309 proc_commit (struct dataset *ds)
310 {
311   assert (ds->proc_state == PROC_CLOSED);
312   ds->proc_state = PROC_COMMITTED;
313
314   /* Free memory for lagged cases. */
315   while (!deque_is_empty (&ds->lag))
316     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
317   free (ds->lag_cases);
318
319   /* Dictionary from before TEMPORARY becomes permanent. */
320   proc_cancel_temporary_transformations (ds);
321
322   if (!ds->discard_output)
323     {
324       /* Finish compacting. */
325       if (ds->compactor != NULL)
326         {
327           dict_compactor_destroy (ds->compactor);
328           dict_compact_values (ds->dict);
329           ds->compactor = NULL;
330         }
331
332       /* Old data sink becomes new data source. */
333       if (ds->sink != NULL)
334         ds->source = casewriter_make_reader (ds->sink);
335     }
336   else
337     {
338       ds->source = NULL;
339       ds->discard_output = false;
340     }
341   ds->sink = NULL;
342   if ( ds->replace_source) ds->replace_source (ds->source);
343
344   caseinit_clear (ds->caseinit);
345   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
346
347   dict_clear_vectors (ds->dict);
348   ds->permanent_dict = NULL;
349   return proc_cancel_all_transformations (ds) && ds->ok;
350 }
351
352 /* Casereader class for procedure execution. */
353 static struct casereader_class proc_casereader_class =
354   {
355     proc_casereader_read,
356     proc_casereader_destroy,
357     NULL,
358     NULL,
359   };
360
361 /* Updates last_proc_invocation. */
362 static void
363 update_last_proc_invocation (struct dataset *ds)
364 {
365   ds->last_proc_invocation = time (NULL);
366 }
367 \f
368 /* Returns a pointer to the lagged case from N_BEFORE cases before the
369    current one, or NULL if there haven't been that many cases yet. */
370 struct ccase *
371 lagged_case (const struct dataset *ds, int n_before)
372 {
373   assert (n_before >= 1);
374   assert (n_before <= ds->n_lag);
375
376   if (n_before <= deque_count (&ds->lag))
377     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
378   else
379     return NULL;
380 }
381 \f
382 /* Returns the current set of permanent transformations,
383    and clears the permanent transformations.
384    For use by INPUT PROGRAM. */
385 struct trns_chain *
386 proc_capture_transformations (struct dataset *ds)
387 {
388   struct trns_chain *chain;
389
390   assert (ds->temporary_trns_chain == NULL);
391   chain = ds->permanent_trns_chain;
392   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
393
394   if ( ds->xform_callback)
395     ds->xform_callback (false, ds->xform_callback_aux);
396
397   return chain;
398 }
399
400 /* Adds a transformation that processes a case with PROC and
401    frees itself with FREE to the current set of transformations.
402    The functions are passed AUX as auxiliary data. */
403 void
404 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
405 {
406   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
407   if ( ds->xform_callback)
408     ds->xform_callback (true, ds->xform_callback_aux);
409 }
410
411 /* Adds a transformation that processes a case with PROC and
412    frees itself with FREE to the current set of transformations.
413    When parsing of the block of transformations is complete,
414    FINALIZE will be called.
415    The functions are passed AUX as auxiliary data. */
416 void
417 add_transformation_with_finalizer (struct dataset *ds,
418                                    trns_finalize_func *finalize,
419                                    trns_proc_func *proc,
420                                    trns_free_func *free, void *aux)
421 {
422   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
423
424   if ( ds->xform_callback)
425     ds->xform_callback (true, ds->xform_callback_aux);
426 }
427
428 /* Returns the index of the next transformation.
429    This value can be returned by a transformation procedure
430    function to indicate a "jump" to that transformation. */
431 size_t
432 next_transformation (const struct dataset *ds)
433 {
434   return trns_chain_next (ds->cur_trns_chain);
435 }
436
437 /* Returns true if the next call to add_transformation() will add
438    a temporary transformation, false if it will add a permanent
439    transformation. */
440 bool
441 proc_in_temporary_transformations (const struct dataset *ds)
442 {
443   return ds->temporary_trns_chain != NULL;
444 }
445
446 /* Marks the start of temporary transformations.
447    Further calls to add_transformation() will add temporary
448    transformations. */
449 void
450 proc_start_temporary_transformations (struct dataset *ds)
451 {
452   if (!proc_in_temporary_transformations (ds))
453     {
454       add_case_limit_trns (ds);
455
456       ds->permanent_dict = dict_clone (ds->dict);
457
458       trns_chain_finalize (ds->permanent_trns_chain);
459       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
460
461       if ( ds->xform_callback)
462         ds->xform_callback (true, ds->xform_callback_aux);
463     }
464 }
465
466 /* Converts all the temporary transformations, if any, to
467    permanent transformations.  Further transformations will be
468    permanent.
469    Returns true if anything changed, false otherwise. */
470 bool
471 proc_make_temporary_transformations_permanent (struct dataset *ds)
472 {
473   if (proc_in_temporary_transformations (ds))
474     {
475       trns_chain_finalize (ds->temporary_trns_chain);
476       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
477       ds->temporary_trns_chain = NULL;
478
479       dict_destroy (ds->permanent_dict);
480       ds->permanent_dict = NULL;
481
482       return true;
483     }
484   else
485     return false;
486 }
487
488 /* Cancels all temporary transformations, if any.  Further
489    transformations will be permanent.
490    Returns true if anything changed, false otherwise. */
491 bool
492 proc_cancel_temporary_transformations (struct dataset *ds)
493 {
494   if (proc_in_temporary_transformations (ds))
495     {
496       dict_destroy (ds->dict);
497       ds->dict = ds->permanent_dict;
498       ds->permanent_dict = NULL;
499       if (ds->replace_dict) ds->replace_dict (ds->dict);
500
501       trns_chain_destroy (ds->temporary_trns_chain);
502       ds->temporary_trns_chain = NULL;
503
504       if ( ds->xform_callback)
505         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
506                             ds->xform_callback_aux);
507
508       return true;
509     }
510   else
511     return false;
512 }
513
514 /* Cancels all transformations, if any.
515    Returns true if successful, false on I/O error. */
516 bool
517 proc_cancel_all_transformations (struct dataset *ds)
518 {
519   bool ok;
520   assert (ds->proc_state == PROC_COMMITTED);
521   ok = trns_chain_destroy (ds->permanent_trns_chain);
522   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
523   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
524   ds->temporary_trns_chain = NULL;
525   if ( ds->xform_callback)
526     ds->xform_callback (false, ds->xform_callback_aux);
527
528   return ok;
529 }
530 \f
531 /* Initializes procedure handling. */
532 struct dataset *
533 create_dataset (transformation_change_callback_func *cb, void *aux)
534 {
535   struct dataset *ds = xzalloc (sizeof(*ds));
536   ds->dict = dict_create ();
537   ds->caseinit = caseinit_create ();
538   ds->xform_callback = cb;
539   ds->xform_callback_aux = aux;
540   proc_cancel_all_transformations (ds);
541   return ds;
542 }
543
544
545 void
546 dataset_add_transform_change_callback (struct dataset *ds,
547                                        transformation_change_callback_func *cb,
548                                        void *aux)
549 {
550   ds->xform_callback = cb;
551   ds->xform_callback_aux = aux;
552 }
553
554 /* Finishes up procedure handling. */
555 void
556 destroy_dataset (struct dataset *ds)
557 {
558   proc_discard_active_file (ds);
559   dict_destroy (ds->dict);
560   caseinit_destroy (ds->caseinit);
561   trns_chain_destroy (ds->permanent_trns_chain);
562
563   if ( ds->xform_callback)
564     ds->xform_callback (false, ds->xform_callback_aux);
565   free (ds);
566 }
567
568 /* Causes output from the next procedure to be discarded, instead
569    of being preserved for use as input for the next procedure. */
570 void
571 proc_discard_output (struct dataset *ds)
572 {
573   ds->discard_output = true;
574 }
575
576 /* Discards the active file dictionary, data, and
577    transformations. */
578 void
579 proc_discard_active_file (struct dataset *ds)
580 {
581   assert (ds->proc_state == PROC_COMMITTED);
582
583   dict_clear (ds->dict);
584   fh_set_default_handle (NULL);
585
586   ds->n_lag = 0;
587
588   casereader_destroy (ds->source);
589   ds->source = NULL;
590   if ( ds->replace_source) ds->replace_source (NULL);
591
592   proc_cancel_all_transformations (ds);
593 }
594
595 /* Sets SOURCE as the source for procedure input for the next
596    procedure. */
597 void
598 proc_set_active_file (struct dataset *ds,
599                       struct casereader *source,
600                       struct dictionary *dict)
601 {
602   assert (ds->proc_state == PROC_COMMITTED);
603   assert (ds->dict != dict);
604
605   proc_discard_active_file (ds);
606
607   dict_destroy (ds->dict);
608   ds->dict = dict;
609   if ( ds->replace_dict) ds->replace_dict (dict);
610
611   proc_set_active_file_data (ds, source);
612 }
613
614 /* Replaces the active file's data by READER without replacing
615    the associated dictionary. */
616 bool
617 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
618 {
619   casereader_destroy (ds->source);
620   ds->source = reader;
621   if (ds->replace_source) ds->replace_source (reader);
622
623   caseinit_clear (ds->caseinit);
624   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
625
626   return reader == NULL || !casereader_error (reader);
627 }
628
629 /* Returns true if an active file data source is available, false
630    otherwise. */
631 bool
632 proc_has_active_file (const struct dataset *ds)
633 {
634   return ds->source != NULL;
635 }
636
637 /* Checks whether DS has a corrupted active file.  If so,
638    discards it and returns false.  If not, returns true without
639    doing anything. */
640 bool
641 dataset_end_of_command (struct dataset *ds)
642 {
643   if (ds->source != NULL)
644     {
645       if (casereader_error (ds->source))
646         {
647           proc_discard_active_file (ds);
648           return false;
649         }
650       else
651         {
652           const struct taint *taint = casereader_get_taint (ds->source);
653           taint_reset_successor_taint ((struct taint *) taint);
654           assert (!taint_has_tainted_successor (taint));
655         }
656     }
657   return true;
658 }
659 \f
660 static trns_proc_func case_limit_trns_proc;
661 static trns_free_func case_limit_trns_free;
662
663 /* Adds a transformation that limits the number of cases that may
664    pass through, if DS->DICT has a case limit. */
665 static void
666 add_case_limit_trns (struct dataset *ds)
667 {
668   size_t case_limit = dict_get_case_limit (ds->dict);
669   if (case_limit != 0)
670     {
671       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
672       *cases_remaining = case_limit;
673       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
674                           cases_remaining);
675       dict_set_case_limit (ds->dict, 0);
676     }
677 }
678
679 /* Limits the maximum number of cases processed to
680    *CASES_REMAINING. */
681 static int
682 case_limit_trns_proc (void *cases_remaining_,
683                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
684 {
685   size_t *cases_remaining = cases_remaining_;
686   if (*cases_remaining > 0)
687     {
688       (*cases_remaining)--;
689       return TRNS_CONTINUE;
690     }
691   else
692     return TRNS_DROP_CASE;
693 }
694
695 /* Frees the data associated with a case limit transformation. */
696 static bool
697 case_limit_trns_free (void *cases_remaining_)
698 {
699   size_t *cases_remaining = cases_remaining_;
700   free (cases_remaining);
701   return true;
702 }
703 \f
704 static trns_proc_func filter_trns_proc;
705
706 /* Adds a temporary transformation to filter data according to
707    the variable specified on FILTER, if any. */
708 static void
709 add_filter_trns (struct dataset *ds)
710 {
711   struct variable *filter_var = dict_get_filter (ds->dict);
712   if (filter_var != NULL)
713     {
714       proc_start_temporary_transformations (ds);
715       add_transformation (ds, filter_trns_proc, NULL, filter_var);
716     }
717 }
718
719 /* FILTER transformation. */
720 static int
721 filter_trns_proc (void *filter_var_,
722                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
723
724 {
725   struct variable *filter_var = filter_var_;
726   double f = case_num (c, filter_var);
727   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
728           ? TRNS_CONTINUE : TRNS_DROP_CASE);
729 }
730
731
732 struct dictionary *
733 dataset_dict (const struct dataset *ds)
734 {
735   return ds->dict;
736 }
737
738 const struct casereader *
739 dataset_source (const struct dataset *ds)
740 {
741   return ds->source;
742 }
743
744 void
745 dataset_need_lag (struct dataset *ds, int n_before)
746 {
747   ds->n_lag = MAX (ds->n_lag, n_before);
748 }