6a891c9baff9d14f47f90350827d5406c65c19a8
[pspp] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/alloc.h>
36 #include <libpspp/deque.h>
37 #include <libpspp/misc.h>
38 #include <libpspp/str.h>
39 #include <libpspp/taint.h>
40
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs when a procedure provides a new source for
60      the dataset */
61   replace_source_callback *replace_source ;
62
63   /* Callback which occurs whenever the DICT is replaced by a new one */
64   replace_dictionary_callback *replace_dict;
65
66   /* Callback which occurs whenever the transformation chain(s) have
67      been modified */
68   transformation_change_callback_func *xform_callback;
69   void *xform_callback_aux;
70
71   /* If true, cases are discarded instead of being written to
72      sink. */
73   bool discard_output;
74
75   /* The transformation chain that the next transformation will be
76      added to. */
77   struct trns_chain *cur_trns_chain;
78
79   /* The case map used to compact a case, if necessary;
80      otherwise a null pointer. */
81   struct case_map *compactor;
82
83   /* Time at which proc was last invoked. */
84   time_t last_proc_invocation;
85
86   /* Cases just before ("lagging") the current one. */
87   int n_lag;                    /* Number of cases to lag. */
88   struct deque lag;             /* Deque of lagged cases. */
89   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
90
91   /* Procedure data. */
92   enum
93     {
94       PROC_COMMITTED,           /* No procedure in progress. */
95       PROC_OPEN,                /* proc_open called, casereader still open. */
96       PROC_CLOSED               /* casereader from proc_open destroyed,
97                                    but proc_commit not yet called. */
98     }
99   proc_state;
100   casenumber cases_written;       /* Cases output so far. */
101   bool ok;                    /* Error status. */
102 }; /* struct dataset */
103
104
105 static void add_case_limit_trns (struct dataset *ds);
106 static void add_filter_trns (struct dataset *ds);
107
108 static void update_last_proc_invocation (struct dataset *ds);
109 \f
110 /* Public functions. */
111
112 /* Returns the last time the data was read. */
113 time_t
114 time_of_last_procedure (struct dataset *ds)
115 {
116   if (ds->last_proc_invocation == 0)
117     update_last_proc_invocation (ds);
118   return ds->last_proc_invocation;
119 }
120 \f
121 /* Regular procedure. */
122
123 /* Executes any pending transformations, if necessary.
124    This is not identical to the EXECUTE command in that it won't
125    always read the source data.  This can be important when the
126    source data is given inline within BEGIN DATA...END FILE. */
127 bool
128 proc_execute (struct dataset *ds)
129 {
130   bool ok;
131
132   if ((ds->temporary_trns_chain == NULL
133        || trns_chain_is_empty (ds->temporary_trns_chain))
134       && trns_chain_is_empty (ds->permanent_trns_chain))
135     {
136       ds->n_lag = 0;
137       ds->discard_output = false;
138       dict_set_case_limit (ds->dict, 0);
139       dict_clear_vectors (ds->dict);
140       return true;
141     }
142
143   ok = casereader_destroy (proc_open (ds));
144   return proc_commit (ds) && ok;
145 }
146
147 static struct casereader_class proc_casereader_class;
148
149 /* Opens dataset DS for reading cases with proc_read.
150    proc_commit must be called when done. */
151 struct casereader *
152 proc_open (struct dataset *ds)
153 {
154   assert (ds->source != NULL);
155   assert (ds->proc_state == PROC_COMMITTED);
156
157   update_last_proc_invocation (ds);
158
159   caseinit_mark_for_init (ds->caseinit, ds->dict);
160
161   /* Finish up the collection of transformations. */
162   add_case_limit_trns (ds);
163   add_filter_trns (ds);
164   trns_chain_finalize (ds->cur_trns_chain);
165
166   /* Make permanent_dict refer to the dictionary right before
167      data reaches the sink. */
168   if (ds->permanent_dict == NULL)
169     ds->permanent_dict = ds->dict;
170
171   /* Prepare sink. */
172   if (!ds->discard_output)
173     {
174       struct dictionary *pd = ds->permanent_dict;
175       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
176       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
177       ds->compactor = (should_compact
178                        ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
179                        : NULL);
180       ds->sink = autopaging_writer_create (compacted_value_cnt);
181     }
182   else
183     {
184       ds->compactor = NULL;
185       ds->sink = NULL;
186     }
187
188   /* Allocate memory for lagged cases. */
189   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
190
191   ds->proc_state = PROC_OPEN;
192   ds->cases_written = 0;
193   ds->ok = true;
194
195   /* FIXME: use taint in dataset in place of `ok'? */
196   /* FIXME: for trivial cases we can just return a clone of
197      ds->source? */
198   return casereader_create_sequential (NULL,
199                                        dict_get_next_value_idx (ds->dict),
200                                        CASENUMBER_MAX,
201                                        &proc_casereader_class, ds);
202 }
203
204 /* Returns true if a procedure is in progress, that is, if
205    proc_open has been called but proc_commit has not. */
206 bool
207 proc_is_open (const struct dataset *ds)
208 {
209   return ds->proc_state != PROC_COMMITTED;
210 }
211
212 /* "read" function for procedure casereader. */
213 static bool
214 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
215                       struct ccase *c)
216 {
217   struct dataset *ds = ds_;
218   enum trns_result retval = TRNS_DROP_CASE;
219
220   assert (ds->proc_state == PROC_OPEN);
221   for (;;)
222     {
223       casenumber case_nr;
224
225       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
226       if (retval == TRNS_ERROR)
227         ds->ok = false;
228       if (!ds->ok)
229         return false;
230
231       /* Read a case from source. */
232       if (!casereader_read (ds->source, c))
233         return false;
234       case_resize (c, dict_get_next_value_idx (ds->dict));
235       caseinit_init_vars (ds->caseinit, c);
236
237       /* Execute permanent transformations.  */
238       case_nr = ds->cases_written + 1;
239       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
240                                    c, case_nr);
241       caseinit_update_left_vars (ds->caseinit, c);
242       if (retval != TRNS_CONTINUE)
243         {
244           case_destroy (c);
245           continue;
246         }
247
248       /* Write case to collection of lagged cases. */
249       if (ds->n_lag > 0)
250         {
251           while (deque_count (&ds->lag) >= ds->n_lag)
252             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
253           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
254         }
255
256       /* Write case to replacement active file. */
257       ds->cases_written++;
258       if (ds->sink != NULL)
259         {
260           struct ccase tmp;
261           if (ds->compactor != NULL)
262             case_map_execute (ds->compactor, c, &tmp);
263           else
264             case_clone (&tmp, c);
265           casewriter_write (ds->sink, &tmp);
266         }
267
268       /* Execute temporary transformations. */
269       if (ds->temporary_trns_chain != NULL)
270         {
271           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
272                                        c, ds->cases_written);
273           if (retval != TRNS_CONTINUE)
274             {
275               case_destroy (c);
276               continue;
277             }
278         }
279
280       return true;
281     }
282 }
283
284 /* "destroy" function for procedure casereader. */
285 static void
286 proc_casereader_destroy (struct casereader *reader, void *ds_)
287 {
288   struct dataset *ds = ds_;
289   struct ccase c;
290
291   /* Make sure transformations happen for every input case, in
292      case they have side effects, and ensure that the replacement
293      active file gets all the cases it should. */
294   while (casereader_read (reader, &c))
295     case_destroy (&c);
296
297   ds->proc_state = PROC_CLOSED;
298   ds->ok = casereader_destroy (ds->source) && ds->ok;
299   ds->source = NULL;
300   proc_set_active_file_data (ds, NULL);
301 }
302
303 /* Must return false if the source casereader, a transformation,
304    or the sink casewriter signaled an error.  (If a temporary
305    transformation signals an error, then the return value is
306    false, but the replacement active file may still be
307    untainted.) */
308 bool
309 proc_commit (struct dataset *ds)
310 {
311   assert (ds->proc_state == PROC_CLOSED);
312   ds->proc_state = PROC_COMMITTED;
313
314   /* Free memory for lagged cases. */
315   while (!deque_is_empty (&ds->lag))
316     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
317   free (ds->lag_cases);
318
319   /* Dictionary from before TEMPORARY becomes permanent. */
320   proc_cancel_temporary_transformations (ds);
321
322   if (!ds->discard_output)
323     {
324       /* Finish compacting. */
325       if (ds->compactor != NULL)
326         {
327           case_map_destroy (ds->compactor);
328           ds->compactor = NULL;
329
330           dict_delete_scratch_vars (ds->dict);
331           dict_compact_values (ds->dict);
332         }
333
334       /* Old data sink becomes new data source. */
335       if (ds->sink != NULL)
336         ds->source = casewriter_make_reader (ds->sink);
337     }
338   else
339     {
340       ds->source = NULL;
341       ds->discard_output = false;
342     }
343   ds->sink = NULL;
344   if ( ds->replace_source) ds->replace_source (ds->source);
345
346   caseinit_clear (ds->caseinit);
347   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
348
349   dict_clear_vectors (ds->dict);
350   ds->permanent_dict = NULL;
351   return proc_cancel_all_transformations (ds) && ds->ok;
352 }
353
354 /* Casereader class for procedure execution. */
355 static struct casereader_class proc_casereader_class =
356   {
357     proc_casereader_read,
358     proc_casereader_destroy,
359     NULL,
360     NULL,
361   };
362
363 /* Updates last_proc_invocation. */
364 static void
365 update_last_proc_invocation (struct dataset *ds)
366 {
367   ds->last_proc_invocation = time (NULL);
368 }
369 \f
370 /* Returns a pointer to the lagged case from N_BEFORE cases before the
371    current one, or NULL if there haven't been that many cases yet. */
372 struct ccase *
373 lagged_case (const struct dataset *ds, int n_before)
374 {
375   assert (n_before >= 1);
376   assert (n_before <= ds->n_lag);
377
378   if (n_before <= deque_count (&ds->lag))
379     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
380   else
381     return NULL;
382 }
383 \f
384 /* Returns the current set of permanent transformations,
385    and clears the permanent transformations.
386    For use by INPUT PROGRAM. */
387 struct trns_chain *
388 proc_capture_transformations (struct dataset *ds)
389 {
390   struct trns_chain *chain;
391
392   assert (ds->temporary_trns_chain == NULL);
393   chain = ds->permanent_trns_chain;
394   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
395
396   if ( ds->xform_callback)
397     ds->xform_callback (false, ds->xform_callback_aux);
398
399   return chain;
400 }
401
402 /* Adds a transformation that processes a case with PROC and
403    frees itself with FREE to the current set of transformations.
404    The functions are passed AUX as auxiliary data. */
405 void
406 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
407 {
408   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
409   if ( ds->xform_callback)
410     ds->xform_callback (true, ds->xform_callback_aux);
411 }
412
413 /* Adds a transformation that processes a case with PROC and
414    frees itself with FREE to the current set of transformations.
415    When parsing of the block of transformations is complete,
416    FINALIZE will be called.
417    The functions are passed AUX as auxiliary data. */
418 void
419 add_transformation_with_finalizer (struct dataset *ds,
420                                    trns_finalize_func *finalize,
421                                    trns_proc_func *proc,
422                                    trns_free_func *free, void *aux)
423 {
424   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
425
426   if ( ds->xform_callback)
427     ds->xform_callback (true, ds->xform_callback_aux);
428 }
429
430 /* Returns the index of the next transformation.
431    This value can be returned by a transformation procedure
432    function to indicate a "jump" to that transformation. */
433 size_t
434 next_transformation (const struct dataset *ds)
435 {
436   return trns_chain_next (ds->cur_trns_chain);
437 }
438
439 /* Returns true if the next call to add_transformation() will add
440    a temporary transformation, false if it will add a permanent
441    transformation. */
442 bool
443 proc_in_temporary_transformations (const struct dataset *ds)
444 {
445   return ds->temporary_trns_chain != NULL;
446 }
447
448 /* Marks the start of temporary transformations.
449    Further calls to add_transformation() will add temporary
450    transformations. */
451 void
452 proc_start_temporary_transformations (struct dataset *ds)
453 {
454   if (!proc_in_temporary_transformations (ds))
455     {
456       add_case_limit_trns (ds);
457
458       ds->permanent_dict = dict_clone (ds->dict);
459
460       trns_chain_finalize (ds->permanent_trns_chain);
461       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
462
463       if ( ds->xform_callback)
464         ds->xform_callback (true, ds->xform_callback_aux);
465     }
466 }
467
468 /* Converts all the temporary transformations, if any, to
469    permanent transformations.  Further transformations will be
470    permanent.
471    Returns true if anything changed, false otherwise. */
472 bool
473 proc_make_temporary_transformations_permanent (struct dataset *ds)
474 {
475   if (proc_in_temporary_transformations (ds))
476     {
477       trns_chain_finalize (ds->temporary_trns_chain);
478       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
479       ds->temporary_trns_chain = NULL;
480
481       dict_destroy (ds->permanent_dict);
482       ds->permanent_dict = NULL;
483
484       return true;
485     }
486   else
487     return false;
488 }
489
490 /* Cancels all temporary transformations, if any.  Further
491    transformations will be permanent.
492    Returns true if anything changed, false otherwise. */
493 bool
494 proc_cancel_temporary_transformations (struct dataset *ds)
495 {
496   if (proc_in_temporary_transformations (ds))
497     {
498       dict_destroy (ds->dict);
499       ds->dict = ds->permanent_dict;
500       ds->permanent_dict = NULL;
501       if (ds->replace_dict) ds->replace_dict (ds->dict);
502
503       trns_chain_destroy (ds->temporary_trns_chain);
504       ds->temporary_trns_chain = NULL;
505
506       if ( ds->xform_callback)
507         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
508                             ds->xform_callback_aux);
509
510       return true;
511     }
512   else
513     return false;
514 }
515
516 /* Cancels all transformations, if any.
517    Returns true if successful, false on I/O error. */
518 bool
519 proc_cancel_all_transformations (struct dataset *ds)
520 {
521   bool ok;
522   assert (ds->proc_state == PROC_COMMITTED);
523   ok = trns_chain_destroy (ds->permanent_trns_chain);
524   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
525   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
526   ds->temporary_trns_chain = NULL;
527   if ( ds->xform_callback)
528     ds->xform_callback (false, ds->xform_callback_aux);
529
530   return ok;
531 }
532 \f
533 /* Initializes procedure handling. */
534 struct dataset *
535 create_dataset (transformation_change_callback_func *cb, void *aux)
536 {
537   struct dataset *ds = xzalloc (sizeof(*ds));
538   ds->dict = dict_create ();
539   ds->caseinit = caseinit_create ();
540   ds->xform_callback = cb;
541   ds->xform_callback_aux = aux;
542   proc_cancel_all_transformations (ds);
543   return ds;
544 }
545
546
547 void
548 dataset_add_transform_change_callback (struct dataset *ds,
549                                        transformation_change_callback_func *cb,
550                                        void *aux)
551 {
552   ds->xform_callback = cb;
553   ds->xform_callback_aux = aux;
554 }
555
556 /* Finishes up procedure handling. */
557 void
558 destroy_dataset (struct dataset *ds)
559 {
560   proc_discard_active_file (ds);
561   dict_destroy (ds->dict);
562   caseinit_destroy (ds->caseinit);
563   trns_chain_destroy (ds->permanent_trns_chain);
564
565   if ( ds->xform_callback)
566     ds->xform_callback (false, ds->xform_callback_aux);
567   free (ds);
568 }
569
570 /* Causes output from the next procedure to be discarded, instead
571    of being preserved for use as input for the next procedure. */
572 void
573 proc_discard_output (struct dataset *ds)
574 {
575   ds->discard_output = true;
576 }
577
578 /* Discards the active file dictionary, data, and
579    transformations. */
580 void
581 proc_discard_active_file (struct dataset *ds)
582 {
583   assert (ds->proc_state == PROC_COMMITTED);
584
585   dict_clear (ds->dict);
586   fh_set_default_handle (NULL);
587
588   ds->n_lag = 0;
589
590   casereader_destroy (ds->source);
591   ds->source = NULL;
592   if ( ds->replace_source) ds->replace_source (NULL);
593
594   proc_cancel_all_transformations (ds);
595 }
596
597 /* Sets SOURCE as the source for procedure input for the next
598    procedure. */
599 void
600 proc_set_active_file (struct dataset *ds,
601                       struct casereader *source,
602                       struct dictionary *dict)
603 {
604   assert (ds->proc_state == PROC_COMMITTED);
605   assert (ds->dict != dict);
606
607   proc_discard_active_file (ds);
608
609   dict_destroy (ds->dict);
610   ds->dict = dict;
611   if ( ds->replace_dict) ds->replace_dict (dict);
612
613   proc_set_active_file_data (ds, source);
614 }
615
616 /* Replaces the active file's data by READER without replacing
617    the associated dictionary. */
618 bool
619 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
620 {
621   casereader_destroy (ds->source);
622   ds->source = reader;
623   if (ds->replace_source) ds->replace_source (reader);
624
625   caseinit_clear (ds->caseinit);
626   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
627
628   return reader == NULL || !casereader_error (reader);
629 }
630
631 /* Returns true if an active file data source is available, false
632    otherwise. */
633 bool
634 proc_has_active_file (const struct dataset *ds)
635 {
636   return ds->source != NULL;
637 }
638
639 /* Checks whether DS has a corrupted active file.  If so,
640    discards it and returns false.  If not, returns true without
641    doing anything. */
642 bool
643 dataset_end_of_command (struct dataset *ds)
644 {
645   if (ds->source != NULL)
646     {
647       if (casereader_error (ds->source))
648         {
649           proc_discard_active_file (ds);
650           return false;
651         }
652       else
653         {
654           const struct taint *taint = casereader_get_taint (ds->source);
655           taint_reset_successor_taint ((struct taint *) taint);
656           assert (!taint_has_tainted_successor (taint));
657         }
658     }
659   return true;
660 }
661 \f
662 static trns_proc_func case_limit_trns_proc;
663 static trns_free_func case_limit_trns_free;
664
665 /* Adds a transformation that limits the number of cases that may
666    pass through, if DS->DICT has a case limit. */
667 static void
668 add_case_limit_trns (struct dataset *ds)
669 {
670   size_t case_limit = dict_get_case_limit (ds->dict);
671   if (case_limit != 0)
672     {
673       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
674       *cases_remaining = case_limit;
675       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
676                           cases_remaining);
677       dict_set_case_limit (ds->dict, 0);
678     }
679 }
680
681 /* Limits the maximum number of cases processed to
682    *CASES_REMAINING. */
683 static int
684 case_limit_trns_proc (void *cases_remaining_,
685                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
686 {
687   size_t *cases_remaining = cases_remaining_;
688   if (*cases_remaining > 0)
689     {
690       (*cases_remaining)--;
691       return TRNS_CONTINUE;
692     }
693   else
694     return TRNS_DROP_CASE;
695 }
696
697 /* Frees the data associated with a case limit transformation. */
698 static bool
699 case_limit_trns_free (void *cases_remaining_)
700 {
701   size_t *cases_remaining = cases_remaining_;
702   free (cases_remaining);
703   return true;
704 }
705 \f
706 static trns_proc_func filter_trns_proc;
707
708 /* Adds a temporary transformation to filter data according to
709    the variable specified on FILTER, if any. */
710 static void
711 add_filter_trns (struct dataset *ds)
712 {
713   struct variable *filter_var = dict_get_filter (ds->dict);
714   if (filter_var != NULL)
715     {
716       proc_start_temporary_transformations (ds);
717       add_transformation (ds, filter_trns_proc, NULL, filter_var);
718     }
719 }
720
721 /* FILTER transformation. */
722 static int
723 filter_trns_proc (void *filter_var_,
724                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
725
726 {
727   struct variable *filter_var = filter_var_;
728   double f = case_num (c, filter_var);
729   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
730           ? TRNS_CONTINUE : TRNS_DROP_CASE);
731 }
732
733
734 struct dictionary *
735 dataset_dict (const struct dataset *ds)
736 {
737   return ds->dict;
738 }
739
740 const struct casereader *
741 dataset_source (const struct dataset *ds)
742 {
743   return ds->source;
744 }
745
746 void
747 dataset_need_lag (struct dataset *ds, int n_before)
748 {
749   ds->n_lag = MAX (ds->n_lag, n_before);
750 }