Actually implement the new procedure code and adapt all of its clients
[pspp] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case.h>
27 #include <data/caseinit.h>
28 #include <data/casereader.h>
29 #include <data/casereader-provider.h>
30 #include <data/casewriter.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/transformations.h>
35 #include <data/variable.h>
36 #include <libpspp/alloc.h>
37 #include <libpspp/deque.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40 #include <libpspp/taint.h>
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs when a procedure provides a new source for
60      the dataset */
61   replace_source_callback *replace_source ;
62
63   /* Callback which occurs whenever the DICT is replaced by a new one */
64   replace_dictionary_callback *replace_dict;
65
66   /* If true, cases are discarded instead of being written to
67      sink. */
68   bool discard_output;
69
70   /* The transformation chain that the next transformation will be
71      added to. */
72   struct trns_chain *cur_trns_chain;
73
74   /* The compactor used to compact a case, if necessary;
75      otherwise a null pointer. */
76   struct dict_compactor *compactor;
77
78   /* Time at which proc was last invoked. */
79   time_t last_proc_invocation;
80
81   /* Cases just before ("lagging") the current one. */
82   int n_lag;                    /* Number of cases to lag. */
83   struct deque lag;             /* Deque of lagged cases. */
84   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
85
86   /* Procedure data. */
87   enum 
88     {
89       PROC_COMMITTED,
90       PROC_OPEN,
91       PROC_CLOSED 
92     }
93   proc_state;
94   size_t cases_written;       /* Cases output so far. */
95   bool ok;                    /* Error status. */
96 }; /* struct dataset */
97
98
99 static void add_case_limit_trns (struct dataset *ds);
100 static void add_filter_trns (struct dataset *ds);
101
102 static void update_last_proc_invocation (struct dataset *ds);
103 \f
104 /* Public functions. */
105
106 /* Returns the last time the data was read. */
107 time_t
108 time_of_last_procedure (struct dataset *ds)
109 {
110   if (ds->last_proc_invocation == 0)
111     update_last_proc_invocation (ds);
112   return ds->last_proc_invocation;
113 }
114 \f
115 /* Regular procedure. */
116
117 /* Executes any pending transformations, if necessary.
118    This is not identical to the EXECUTE command in that it won't
119    always read the source data.  This can be important when the
120    source data is given inline within BEGIN DATA...END FILE. */
121 bool
122 proc_execute (struct dataset *ds)
123 {
124   bool ok;
125
126   if ((ds->temporary_trns_chain == NULL
127        || trns_chain_is_empty (ds->temporary_trns_chain))
128       && trns_chain_is_empty (ds->permanent_trns_chain))
129     {
130       ds->n_lag = 0;
131       ds->discard_output = false;
132       dict_set_case_limit (ds->dict, 0);
133       dict_clear_vectors (ds->dict);
134       return true;
135     }
136
137   ok = casereader_destroy (proc_open (ds));
138   return proc_commit (ds) && ok;
139 }
140
141 static struct casereader_class proc_casereader_class;
142
143 /* Opens dataset DS for reading cases with proc_read.
144    proc_commit must be called when done. */
145 struct casereader *
146 proc_open (struct dataset *ds)
147 {
148   assert (ds->source != NULL);
149   assert (ds->proc_state == PROC_COMMITTED);
150
151   update_last_proc_invocation (ds);
152
153   caseinit_mark_for_init (ds->caseinit, ds->dict);
154
155   /* Finish up the collection of transformations. */
156   add_case_limit_trns (ds);
157   add_filter_trns (ds);
158   trns_chain_finalize (ds->cur_trns_chain);
159
160   /* Make permanent_dict refer to the dictionary right before
161      data reaches the sink. */
162   if (ds->permanent_dict == NULL)
163     ds->permanent_dict = ds->dict;
164
165   /* Prepare sink. */
166   if (!ds->discard_output) 
167     {
168       ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
169                        ? dict_make_compactor (ds->permanent_dict)
170                        : NULL);
171       ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
172                                              ds->permanent_dict)); 
173     }
174   else 
175     {
176       ds->compactor = NULL;
177       ds->sink = NULL;
178     }
179
180   /* Allocate memory for lagged cases. */
181   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
182
183   ds->proc_state = PROC_OPEN;
184   ds->cases_written = 0;
185   ds->ok = true;
186
187   /* FIXME: use taint in dataset in place of `ok'? */
188   /* FIXME: for trivial cases we can just return a clone of
189      ds->source? */
190   return casereader_create_sequential (NULL,
191                                        dict_get_next_value_idx (ds->dict),
192                                        CASENUMBER_MAX,
193                                        &proc_casereader_class, ds);
194 }
195
196 bool
197 proc_is_open (const struct dataset *ds) 
198 {
199   return ds->proc_state != PROC_COMMITTED;
200 }
201
202 /* Reads the next case from dataset DS, which must have been
203    opened for reading with proc_open.
204    Returns true if successful, in which case a pointer to the
205    case is stored in *C.
206    Return false at end of file or if a read error occurs.  In
207    this case a null pointer is stored in *C. */
208 static bool
209 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
210                       struct ccase *c) 
211 {
212   struct dataset *ds = ds_;
213   enum trns_result retval = TRNS_DROP_CASE;
214
215   assert (ds->proc_state == PROC_OPEN);
216   for (;;) 
217     {
218       size_t case_nr;
219
220       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
221       if (retval == TRNS_ERROR)
222         ds->ok = false;
223       if (!ds->ok)
224         return false;
225
226       /* Read a case from source. */
227       if (!casereader_read (ds->source, c))
228         return false;
229       case_resize (c, dict_get_next_value_idx (ds->dict));
230       caseinit_init_reinit_vars (ds->caseinit, c);
231       caseinit_init_left_vars (ds->caseinit, c);
232
233       /* Execute permanent transformations.  */
234       case_nr = ds->cases_written + 1;
235       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
236                                    c, &case_nr);
237       caseinit_update_left_vars (ds->caseinit, c);
238       if (retval != TRNS_CONTINUE) 
239         {
240           case_destroy (c);
241           continue; 
242         }
243   
244       /* Write case to collection of lagged cases. */
245       if (ds->n_lag > 0) 
246         {
247           while (deque_count (&ds->lag) >= ds->n_lag)
248             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
249           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
250         }
251
252       /* Write case to replacement active file. */
253       ds->cases_written++;
254       if (ds->sink != NULL) 
255         {
256           struct ccase tmp;
257           if (ds->compactor != NULL) 
258             {
259               case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
260               dict_compactor_compact (ds->compactor, &tmp, c);
261             }
262           else
263             case_clone (&tmp, c);
264           casewriter_write (ds->sink, &tmp);
265         }
266
267       /* Execute temporary transformations. */
268       if (ds->temporary_trns_chain != NULL)
269         {
270           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
271                                        c, &ds->cases_written);
272           if (retval != TRNS_CONTINUE)
273             {
274               case_destroy (c);
275               continue;
276             }
277         }
278
279       return true;
280     }
281 }
282
283 /* Closes dataset DS for reading.
284    Returns true if successful, false if an I/O error occurred
285    while reading or closing the data set.
286    If DS has not been opened, returns true without doing
287    anything else. */
288 static void
289 proc_casereader_destroy (struct casereader *reader, void *ds_)
290 {
291   struct dataset *ds = ds_;
292   struct ccase c;
293
294   /* Make sure transformations happen for every input case, in
295      case they have side effects, and ensure that the replacement
296      active file gets all the cases it should. */
297   while (casereader_read (reader, &c))
298     case_destroy (&c);
299
300   ds->proc_state = PROC_CLOSED;
301   ds->ok = casereader_destroy (ds->source) && ds->ok;
302   ds->source = NULL;
303   proc_set_active_file_data (ds, NULL);
304 }
305
306 /* Must return false if the source casereader, a transformation,
307    or the sink casewriter signaled an error.  (If a temporary
308    transformation signals an error, then the return value is
309    false, but the replacement active file may still be
310    untainted.) */
311 bool
312 proc_commit (struct dataset *ds) 
313 {
314   assert (ds->proc_state == PROC_CLOSED);
315   ds->proc_state = PROC_COMMITTED;
316
317   /* Free memory for lagged cases. */
318   while (!deque_is_empty (&ds->lag))
319     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
320   free (ds->lag_cases);
321
322   /* Dictionary from before TEMPORARY becomes permanent. */
323   proc_cancel_temporary_transformations (ds);
324
325   if (!ds->discard_output) 
326     {
327       /* Finish compacting. */
328       if (ds->compactor != NULL) 
329         {
330           dict_compactor_destroy (ds->compactor);
331           dict_compact_values (ds->dict);
332           ds->compactor = NULL;
333         }
334     
335       /* Old data sink becomes new data source. */
336       if (ds->sink != NULL) 
337         ds->source = casewriter_make_reader (ds->sink);
338     }
339   else 
340     {
341       ds->source = NULL;
342       ds->discard_output = false; 
343     }
344   ds->sink = NULL;
345   if ( ds->replace_source) ds->replace_source (ds->source);
346
347   caseinit_clear (ds->caseinit);
348   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
349
350   dict_clear_vectors (ds->dict);
351   ds->permanent_dict = NULL;
352   return proc_cancel_all_transformations (ds) && ds->ok;
353 }
354
355 static struct casereader_class proc_casereader_class = 
356   {
357     proc_casereader_read,
358     proc_casereader_destroy,
359     NULL,
360     NULL,
361   };
362
363 /* Updates last_proc_invocation. */
364 static void
365 update_last_proc_invocation (struct dataset *ds)
366 {
367   ds->last_proc_invocation = time (NULL);
368 }
369 \f
370 /* Returns a pointer to the lagged case from N_BEFORE cases before the
371    current one, or NULL if there haven't been that many cases yet. */
372 struct ccase *
373 lagged_case (const struct dataset *ds, int n_before)
374 {
375   assert (n_before >= 1);
376   assert (n_before <= ds->n_lag);
377
378   if (n_before <= deque_count (&ds->lag))
379     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
380   else
381     return NULL;
382 }
383 \f
384 /* Returns the current set of permanent transformations,
385    and clears the permanent transformations.
386    For use by INPUT PROGRAM. */
387 struct trns_chain *
388 proc_capture_transformations (struct dataset *ds)
389 {
390   struct trns_chain *chain;
391
392   assert (ds->temporary_trns_chain == NULL);
393   chain = ds->permanent_trns_chain;
394   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
395   return chain;
396 }
397
398 /* Adds a transformation that processes a case with PROC and
399    frees itself with FREE to the current set of transformations.
400    The functions are passed AUX as auxiliary data. */
401 void
402 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
403 {
404   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
405 }
406
407 /* Adds a transformation that processes a case with PROC and
408    frees itself with FREE to the current set of transformations.
409    When parsing of the block of transformations is complete,
410    FINALIZE will be called.
411    The functions are passed AUX as auxiliary data. */
412 void
413 add_transformation_with_finalizer (struct dataset *ds,
414                                    trns_finalize_func *finalize,
415                                    trns_proc_func *proc,
416                                    trns_free_func *free, void *aux)
417 {
418   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
419 }
420
421 /* Returns the index of the next transformation.
422    This value can be returned by a transformation procedure
423    function to indicate a "jump" to that transformation. */
424 size_t
425 next_transformation (const struct dataset *ds)
426 {
427   return trns_chain_next (ds->cur_trns_chain);
428 }
429
430 /* Returns true if the next call to add_transformation() will add
431    a temporary transformation, false if it will add a permanent
432    transformation. */
433 bool
434 proc_in_temporary_transformations (const struct dataset *ds)
435 {
436   return ds->temporary_trns_chain != NULL;
437 }
438
439 /* Marks the start of temporary transformations.
440    Further calls to add_transformation() will add temporary
441    transformations. */
442 void
443 proc_start_temporary_transformations (struct dataset *ds)
444 {
445   if (!proc_in_temporary_transformations (ds))
446     {
447       add_case_limit_trns (ds);
448
449       ds->permanent_dict = dict_clone (ds->dict);
450
451       trns_chain_finalize (ds->permanent_trns_chain);
452       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
453     }
454 }
455
456 /* Converts all the temporary transformations, if any, to
457    permanent transformations.  Further transformations will be
458    permanent.
459    Returns true if anything changed, false otherwise. */
460 bool
461 proc_make_temporary_transformations_permanent (struct dataset *ds)
462 {
463   if (proc_in_temporary_transformations (ds))
464     {
465       trns_chain_finalize (ds->temporary_trns_chain);
466       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
467       ds->temporary_trns_chain = NULL;
468
469       dict_destroy (ds->permanent_dict);
470       ds->permanent_dict = NULL;
471
472       return true;
473     }
474   else
475     return false;
476 }
477
478 /* Cancels all temporary transformations, if any.  Further
479    transformations will be permanent.
480    Returns true if anything changed, false otherwise. */
481 bool
482 proc_cancel_temporary_transformations (struct dataset *ds)
483 {
484   if (proc_in_temporary_transformations (ds))
485     {
486       dict_destroy (ds->dict);
487       ds->dict = ds->permanent_dict;
488       ds->permanent_dict = NULL;
489       if (ds->replace_dict) ds->replace_dict (ds->dict);
490
491       trns_chain_destroy (ds->temporary_trns_chain);
492       ds->temporary_trns_chain = NULL;
493
494       return true;
495     }
496   else
497     return false;
498 }
499
500 /* Cancels all transformations, if any.
501    Returns true if successful, false on I/O error. */
502 bool
503 proc_cancel_all_transformations (struct dataset *ds)
504 {
505   bool ok;
506   assert (ds->proc_state == PROC_COMMITTED);
507   ok = trns_chain_destroy (ds->permanent_trns_chain);
508   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
509   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
510   ds->temporary_trns_chain = NULL;
511   return ok;
512 }
513 \f
514 /* Initializes procedure handling. */
515 struct dataset *
516 create_dataset (replace_source_callback *rps,
517                 replace_dictionary_callback *rds)
518 {
519   struct dataset *ds = xzalloc (sizeof(*ds));
520   ds->dict = dict_create ();
521   ds->caseinit = caseinit_create ();
522   ds->replace_source = rps;
523   ds->replace_dict = rds;
524   proc_cancel_all_transformations (ds);
525   return ds;
526 }
527
528 /* Finishes up procedure handling. */
529 void
530 destroy_dataset (struct dataset *ds)
531 {
532   proc_discard_active_file (ds);
533   dict_destroy (ds->dict);
534   caseinit_destroy (ds->caseinit);
535   trns_chain_destroy (ds->permanent_trns_chain);
536   free (ds);
537 }
538
539 /* Causes output from the next procedure to be discarded, instead
540    of being preserved for use as input for the next procedure. */
541 void
542 proc_discard_output (struct dataset *ds) 
543 {
544   ds->discard_output = true;
545 }
546
547 /* Discards the active file dictionary, data, and
548    transformations. */
549 void
550 proc_discard_active_file (struct dataset *ds)
551 {
552   assert (ds->proc_state == PROC_COMMITTED);
553
554   dict_clear (ds->dict);
555   fh_set_default_handle (NULL);
556
557   ds->n_lag = 0;
558   
559   casereader_destroy (ds->source);
560   ds->source = NULL;
561   if ( ds->replace_source) ds->replace_source (NULL);
562
563   proc_cancel_all_transformations (ds);
564 }
565
566 /* Sets SOURCE as the source for procedure input for the next
567    procedure. */
568 void
569 proc_set_active_file (struct dataset *ds,
570                       struct casereader *source,
571                       struct dictionary *dict) 
572 {
573   assert (ds->proc_state == PROC_COMMITTED);
574   assert (ds->dict != dict);
575
576   proc_discard_active_file (ds);
577
578   dict_destroy (ds->dict);
579   ds->dict = dict;
580   if ( ds->replace_dict) ds->replace_dict (dict);
581
582   proc_set_active_file_data (ds, source);
583 }
584
585 /* Replaces the active file's data by READER without replacing
586    the associated dictionary. */
587 bool
588 proc_set_active_file_data (struct dataset *ds, struct casereader *reader) 
589 {
590   casereader_destroy (ds->source);
591   ds->source = reader;
592   if (ds->replace_source) ds->replace_source (reader);
593
594   caseinit_clear (ds->caseinit);
595   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
596
597   return reader == NULL || !casereader_error (reader);
598 }
599
600 /* Returns true if an active file data source is available, false
601    otherwise. */
602 bool
603 proc_has_active_file (const struct dataset *ds) 
604 {
605   return ds->source != NULL;
606 }
607
608 /* Checks whether DS has a corrupted active file.  If so,
609    discards it and returns false.  If not, returns true without
610    doing anything. */
611 bool
612 dataset_end_of_command (struct dataset *ds) 
613 {
614   if (ds->source != NULL) 
615     {
616       if (casereader_error (ds->source)) 
617         {
618           proc_discard_active_file (ds);
619           return false;
620         }
621       else 
622         {
623           const struct taint *taint = casereader_get_taint (ds->source);
624           taint_reset_successor_taint ((struct taint *) taint);
625           assert (!taint_has_tainted_successor (taint));
626         }
627     }
628   return true; 
629 }
630 \f
631 static trns_proc_func case_limit_trns_proc;
632 static trns_free_func case_limit_trns_free;
633
634 /* Adds a transformation that limits the number of cases that may
635    pass through, if DS->DICT has a case limit. */
636 static void
637 add_case_limit_trns (struct dataset *ds)
638 {
639   size_t case_limit = dict_get_case_limit (ds->dict);
640   if (case_limit != 0)
641     {
642       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
643       *cases_remaining = case_limit;
644       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
645                           cases_remaining);
646       dict_set_case_limit (ds->dict, 0);
647     }
648 }
649
650 /* Limits the maximum number of cases processed to
651    *CASES_REMAINING. */
652 static int
653 case_limit_trns_proc (void *cases_remaining_,
654                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
655 {
656   size_t *cases_remaining = cases_remaining_;
657   if (*cases_remaining > 0)
658     {
659       (*cases_remaining)--;
660       return TRNS_CONTINUE;
661     }
662   else
663     return TRNS_DROP_CASE;
664 }
665
666 /* Frees the data associated with a case limit transformation. */
667 static bool
668 case_limit_trns_free (void *cases_remaining_)
669 {
670   size_t *cases_remaining = cases_remaining_;
671   free (cases_remaining);
672   return true;
673 }
674 \f
675 static trns_proc_func filter_trns_proc;
676
677 /* Adds a temporary transformation to filter data according to
678    the variable specified on FILTER, if any. */
679 static void
680 add_filter_trns (struct dataset *ds)
681 {
682   struct variable *filter_var = dict_get_filter (ds->dict);
683   if (filter_var != NULL)
684     {
685       proc_start_temporary_transformations (ds);
686       add_transformation (ds, filter_trns_proc, NULL, filter_var);
687     }
688 }
689
690 /* FILTER transformation. */
691 static int
692 filter_trns_proc (void *filter_var_,
693                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
694
695 {
696   struct variable *filter_var = filter_var_;
697   double f = case_num (c, filter_var);
698   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
699           ? TRNS_CONTINUE : TRNS_DROP_CASE);
700 }
701
702
703 struct dictionary *
704 dataset_dict (const struct dataset *ds)
705 {
706   return ds->dict;
707 }
708
709 void 
710 dataset_need_lag (struct dataset *ds, int n_before)
711 {
712   ds->n_lag = MAX (ds->n_lag, n_before);
713 }