Change license from GPLv2+ to GPLv3+.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40 struct dataset {
41   /* Cases are read from source,
42      their transformation variables are initialized,
43      pass through permanent_trns_chain (which transforms them into
44      the format described by permanent_dict),
45      are written to sink,
46      pass through temporary_trns_chain (which transforms them into
47      the format described by dict),
48      and are finally passed to the procedure. */
49   struct casereader *source;
50   struct caseinit *caseinit;
51   struct trns_chain *permanent_trns_chain;
52   struct dictionary *permanent_dict;
53   struct casewriter *sink;
54   struct trns_chain *temporary_trns_chain;
55   struct dictionary *dict;
56
57   /* Callback which occurs when a procedure provides a new source for
58      the dataset */
59   replace_source_callback *replace_source ;
60
61   /* Callback which occurs whenever the DICT is replaced by a new one */
62   replace_dictionary_callback *replace_dict;
63
64   /* If true, cases are discarded instead of being written to
65      sink. */
66   bool discard_output;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The compactor used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct dict_compactor *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct deque lag;             /* Deque of lagged cases. */
82   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
83
84   /* Procedure data. */
85   enum
86     {
87       PROC_COMMITTED,           /* No procedure in progress. */
88       PROC_OPEN,                /* proc_open called, casereader still open. */
89       PROC_CLOSED               /* casereader from proc_open destroyed,
90                                    but proc_commit not yet called. */
91     }
92   proc_state;
93   casenumber cases_written;       /* Cases output so far. */
94   bool ok;                    /* Error status. */
95 }; /* struct dataset */
96
97
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
100
101 static void update_last_proc_invocation (struct dataset *ds);
102 \f
103 /* Public functions. */
104
105 /* Returns the last time the data was read. */
106 time_t
107 time_of_last_procedure (struct dataset *ds)
108 {
109   if (ds->last_proc_invocation == 0)
110     update_last_proc_invocation (ds);
111   return ds->last_proc_invocation;
112 }
113 \f
114 /* Regular procedure. */
115
116 /* Executes any pending transformations, if necessary.
117    This is not identical to the EXECUTE command in that it won't
118    always read the source data.  This can be important when the
119    source data is given inline within BEGIN DATA...END FILE. */
120 bool
121 proc_execute (struct dataset *ds)
122 {
123   bool ok;
124
125   if ((ds->temporary_trns_chain == NULL
126        || trns_chain_is_empty (ds->temporary_trns_chain))
127       && trns_chain_is_empty (ds->permanent_trns_chain))
128     {
129       ds->n_lag = 0;
130       ds->discard_output = false;
131       dict_set_case_limit (ds->dict, 0);
132       dict_clear_vectors (ds->dict);
133       return true;
134     }
135
136   ok = casereader_destroy (proc_open (ds));
137   return proc_commit (ds) && ok;
138 }
139
140 static struct casereader_class proc_casereader_class;
141
142 /* Opens dataset DS for reading cases with proc_read.
143    proc_commit must be called when done. */
144 struct casereader *
145 proc_open (struct dataset *ds)
146 {
147   assert (ds->source != NULL);
148   assert (ds->proc_state == PROC_COMMITTED);
149
150   update_last_proc_invocation (ds);
151
152   caseinit_mark_for_init (ds->caseinit, ds->dict);
153
154   /* Finish up the collection of transformations. */
155   add_case_limit_trns (ds);
156   add_filter_trns (ds);
157   trns_chain_finalize (ds->cur_trns_chain);
158
159   /* Make permanent_dict refer to the dictionary right before
160      data reaches the sink. */
161   if (ds->permanent_dict == NULL)
162     ds->permanent_dict = ds->dict;
163
164   /* Prepare sink. */
165   if (!ds->discard_output)
166     {
167       ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
168                        ? dict_make_compactor (ds->permanent_dict)
169                        : NULL);
170       ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
171                                              ds->permanent_dict));
172     }
173   else
174     {
175       ds->compactor = NULL;
176       ds->sink = NULL;
177     }
178
179   /* Allocate memory for lagged cases. */
180   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
181
182   ds->proc_state = PROC_OPEN;
183   ds->cases_written = 0;
184   ds->ok = true;
185
186   /* FIXME: use taint in dataset in place of `ok'? */
187   /* FIXME: for trivial cases we can just return a clone of
188      ds->source? */
189   return casereader_create_sequential (NULL,
190                                        dict_get_next_value_idx (ds->dict),
191                                        CASENUMBER_MAX,
192                                        &proc_casereader_class, ds);
193 }
194
195 /* Returns true if a procedure is in progress, that is, if
196    proc_open has been called but proc_commit has not. */
197 bool
198 proc_is_open (const struct dataset *ds)
199 {
200   return ds->proc_state != PROC_COMMITTED;
201 }
202
203 /* "read" function for procedure casereader. */
204 static bool
205 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
206                       struct ccase *c)
207 {
208   struct dataset *ds = ds_;
209   enum trns_result retval = TRNS_DROP_CASE;
210
211   assert (ds->proc_state == PROC_OPEN);
212   for (;;)
213     {
214       casenumber case_nr;
215
216       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
217       if (retval == TRNS_ERROR)
218         ds->ok = false;
219       if (!ds->ok)
220         return false;
221
222       /* Read a case from source. */
223       if (!casereader_read (ds->source, c))
224         return false;
225       case_resize (c, dict_get_next_value_idx (ds->dict));
226       caseinit_init_vars (ds->caseinit, c);
227
228       /* Execute permanent transformations.  */
229       case_nr = ds->cases_written + 1;
230       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
231                                    c, case_nr);
232       caseinit_update_left_vars (ds->caseinit, c);
233       if (retval != TRNS_CONTINUE)
234         {
235           case_destroy (c);
236           continue;
237         }
238
239       /* Write case to collection of lagged cases. */
240       if (ds->n_lag > 0)
241         {
242           while (deque_count (&ds->lag) >= ds->n_lag)
243             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
244           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
245         }
246
247       /* Write case to replacement active file. */
248       ds->cases_written++;
249       if (ds->sink != NULL)
250         {
251           struct ccase tmp;
252           if (ds->compactor != NULL)
253             {
254               case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
255               dict_compactor_compact (ds->compactor, &tmp, c);
256             }
257           else
258             case_clone (&tmp, c);
259           casewriter_write (ds->sink, &tmp);
260         }
261
262       /* Execute temporary transformations. */
263       if (ds->temporary_trns_chain != NULL)
264         {
265           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
266                                        c, ds->cases_written);
267           if (retval != TRNS_CONTINUE)
268             {
269               case_destroy (c);
270               continue;
271             }
272         }
273
274       return true;
275     }
276 }
277
278 /* "destroy" function for procedure casereader. */
279 static void
280 proc_casereader_destroy (struct casereader *reader, void *ds_)
281 {
282   struct dataset *ds = ds_;
283   struct ccase c;
284
285   /* Make sure transformations happen for every input case, in
286      case they have side effects, and ensure that the replacement
287      active file gets all the cases it should. */
288   while (casereader_read (reader, &c))
289     case_destroy (&c);
290
291   ds->proc_state = PROC_CLOSED;
292   ds->ok = casereader_destroy (ds->source) && ds->ok;
293   ds->source = NULL;
294   proc_set_active_file_data (ds, NULL);
295 }
296
297 /* Must return false if the source casereader, a transformation,
298    or the sink casewriter signaled an error.  (If a temporary
299    transformation signals an error, then the return value is
300    false, but the replacement active file may still be
301    untainted.) */
302 bool
303 proc_commit (struct dataset *ds)
304 {
305   assert (ds->proc_state == PROC_CLOSED);
306   ds->proc_state = PROC_COMMITTED;
307
308   /* Free memory for lagged cases. */
309   while (!deque_is_empty (&ds->lag))
310     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
311   free (ds->lag_cases);
312
313   /* Dictionary from before TEMPORARY becomes permanent. */
314   proc_cancel_temporary_transformations (ds);
315
316   if (!ds->discard_output)
317     {
318       /* Finish compacting. */
319       if (ds->compactor != NULL)
320         {
321           dict_compactor_destroy (ds->compactor);
322           dict_compact_values (ds->dict);
323           ds->compactor = NULL;
324         }
325
326       /* Old data sink becomes new data source. */
327       if (ds->sink != NULL)
328         ds->source = casewriter_make_reader (ds->sink);
329     }
330   else
331     {
332       ds->source = NULL;
333       ds->discard_output = false;
334     }
335   ds->sink = NULL;
336   if ( ds->replace_source) ds->replace_source (ds->source);
337
338   caseinit_clear (ds->caseinit);
339   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
340
341   dict_clear_vectors (ds->dict);
342   ds->permanent_dict = NULL;
343   return proc_cancel_all_transformations (ds) && ds->ok;
344 }
345
346 /* Casereader class for procedure execution. */
347 static struct casereader_class proc_casereader_class =
348   {
349     proc_casereader_read,
350     proc_casereader_destroy,
351     NULL,
352     NULL,
353   };
354
355 /* Updates last_proc_invocation. */
356 static void
357 update_last_proc_invocation (struct dataset *ds)
358 {
359   ds->last_proc_invocation = time (NULL);
360 }
361 \f
362 /* Returns a pointer to the lagged case from N_BEFORE cases before the
363    current one, or NULL if there haven't been that many cases yet. */
364 struct ccase *
365 lagged_case (const struct dataset *ds, int n_before)
366 {
367   assert (n_before >= 1);
368   assert (n_before <= ds->n_lag);
369
370   if (n_before <= deque_count (&ds->lag))
371     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
372   else
373     return NULL;
374 }
375 \f
376 /* Returns the current set of permanent transformations,
377    and clears the permanent transformations.
378    For use by INPUT PROGRAM. */
379 struct trns_chain *
380 proc_capture_transformations (struct dataset *ds)
381 {
382   struct trns_chain *chain;
383
384   assert (ds->temporary_trns_chain == NULL);
385   chain = ds->permanent_trns_chain;
386   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
387   return chain;
388 }
389
390 /* Adds a transformation that processes a case with PROC and
391    frees itself with FREE to the current set of transformations.
392    The functions are passed AUX as auxiliary data. */
393 void
394 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
395 {
396   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
397 }
398
399 /* Adds a transformation that processes a case with PROC and
400    frees itself with FREE to the current set of transformations.
401    When parsing of the block of transformations is complete,
402    FINALIZE will be called.
403    The functions are passed AUX as auxiliary data. */
404 void
405 add_transformation_with_finalizer (struct dataset *ds,
406                                    trns_finalize_func *finalize,
407                                    trns_proc_func *proc,
408                                    trns_free_func *free, void *aux)
409 {
410   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
411 }
412
413 /* Returns the index of the next transformation.
414    This value can be returned by a transformation procedure
415    function to indicate a "jump" to that transformation. */
416 size_t
417 next_transformation (const struct dataset *ds)
418 {
419   return trns_chain_next (ds->cur_trns_chain);
420 }
421
422 /* Returns true if the next call to add_transformation() will add
423    a temporary transformation, false if it will add a permanent
424    transformation. */
425 bool
426 proc_in_temporary_transformations (const struct dataset *ds)
427 {
428   return ds->temporary_trns_chain != NULL;
429 }
430
431 /* Marks the start of temporary transformations.
432    Further calls to add_transformation() will add temporary
433    transformations. */
434 void
435 proc_start_temporary_transformations (struct dataset *ds)
436 {
437   if (!proc_in_temporary_transformations (ds))
438     {
439       add_case_limit_trns (ds);
440
441       ds->permanent_dict = dict_clone (ds->dict);
442
443       trns_chain_finalize (ds->permanent_trns_chain);
444       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
445     }
446 }
447
448 /* Converts all the temporary transformations, if any, to
449    permanent transformations.  Further transformations will be
450    permanent.
451    Returns true if anything changed, false otherwise. */
452 bool
453 proc_make_temporary_transformations_permanent (struct dataset *ds)
454 {
455   if (proc_in_temporary_transformations (ds))
456     {
457       trns_chain_finalize (ds->temporary_trns_chain);
458       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
459       ds->temporary_trns_chain = NULL;
460
461       dict_destroy (ds->permanent_dict);
462       ds->permanent_dict = NULL;
463
464       return true;
465     }
466   else
467     return false;
468 }
469
470 /* Cancels all temporary transformations, if any.  Further
471    transformations will be permanent.
472    Returns true if anything changed, false otherwise. */
473 bool
474 proc_cancel_temporary_transformations (struct dataset *ds)
475 {
476   if (proc_in_temporary_transformations (ds))
477     {
478       dict_destroy (ds->dict);
479       ds->dict = ds->permanent_dict;
480       ds->permanent_dict = NULL;
481       if (ds->replace_dict) ds->replace_dict (ds->dict);
482
483       trns_chain_destroy (ds->temporary_trns_chain);
484       ds->temporary_trns_chain = NULL;
485
486       return true;
487     }
488   else
489     return false;
490 }
491
492 /* Cancels all transformations, if any.
493    Returns true if successful, false on I/O error. */
494 bool
495 proc_cancel_all_transformations (struct dataset *ds)
496 {
497   bool ok;
498   assert (ds->proc_state == PROC_COMMITTED);
499   ok = trns_chain_destroy (ds->permanent_trns_chain);
500   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
501   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
502   ds->temporary_trns_chain = NULL;
503   return ok;
504 }
505 \f
506 /* Initializes procedure handling. */
507 struct dataset *
508 create_dataset (replace_source_callback *rps,
509                 replace_dictionary_callback *rds)
510 {
511   struct dataset *ds = xzalloc (sizeof(*ds));
512   ds->dict = dict_create ();
513   ds->caseinit = caseinit_create ();
514   ds->replace_source = rps;
515   ds->replace_dict = rds;
516   proc_cancel_all_transformations (ds);
517   return ds;
518 }
519
520 /* Finishes up procedure handling. */
521 void
522 destroy_dataset (struct dataset *ds)
523 {
524   proc_discard_active_file (ds);
525   dict_destroy (ds->dict);
526   caseinit_destroy (ds->caseinit);
527   trns_chain_destroy (ds->permanent_trns_chain);
528   free (ds);
529 }
530
531 /* Causes output from the next procedure to be discarded, instead
532    of being preserved for use as input for the next procedure. */
533 void
534 proc_discard_output (struct dataset *ds)
535 {
536   ds->discard_output = true;
537 }
538
539 /* Discards the active file dictionary, data, and
540    transformations. */
541 void
542 proc_discard_active_file (struct dataset *ds)
543 {
544   assert (ds->proc_state == PROC_COMMITTED);
545
546   dict_clear (ds->dict);
547   fh_set_default_handle (NULL);
548
549   ds->n_lag = 0;
550
551   casereader_destroy (ds->source);
552   ds->source = NULL;
553   if ( ds->replace_source) ds->replace_source (NULL);
554
555   proc_cancel_all_transformations (ds);
556 }
557
558 /* Sets SOURCE as the source for procedure input for the next
559    procedure. */
560 void
561 proc_set_active_file (struct dataset *ds,
562                       struct casereader *source,
563                       struct dictionary *dict)
564 {
565   assert (ds->proc_state == PROC_COMMITTED);
566   assert (ds->dict != dict);
567
568   proc_discard_active_file (ds);
569
570   dict_destroy (ds->dict);
571   ds->dict = dict;
572   if ( ds->replace_dict) ds->replace_dict (dict);
573
574   proc_set_active_file_data (ds, source);
575 }
576
577 /* Replaces the active file's data by READER without replacing
578    the associated dictionary. */
579 bool
580 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
581 {
582   casereader_destroy (ds->source);
583   ds->source = reader;
584   if (ds->replace_source) ds->replace_source (reader);
585
586   caseinit_clear (ds->caseinit);
587   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
588
589   return reader == NULL || !casereader_error (reader);
590 }
591
592 /* Returns true if an active file data source is available, false
593    otherwise. */
594 bool
595 proc_has_active_file (const struct dataset *ds)
596 {
597   return ds->source != NULL;
598 }
599
600 /* Checks whether DS has a corrupted active file.  If so,
601    discards it and returns false.  If not, returns true without
602    doing anything. */
603 bool
604 dataset_end_of_command (struct dataset *ds)
605 {
606   if (ds->source != NULL)
607     {
608       if (casereader_error (ds->source))
609         {
610           proc_discard_active_file (ds);
611           return false;
612         }
613       else
614         {
615           const struct taint *taint = casereader_get_taint (ds->source);
616           taint_reset_successor_taint ((struct taint *) taint);
617           assert (!taint_has_tainted_successor (taint));
618         }
619     }
620   return true;
621 }
622 \f
623 static trns_proc_func case_limit_trns_proc;
624 static trns_free_func case_limit_trns_free;
625
626 /* Adds a transformation that limits the number of cases that may
627    pass through, if DS->DICT has a case limit. */
628 static void
629 add_case_limit_trns (struct dataset *ds)
630 {
631   size_t case_limit = dict_get_case_limit (ds->dict);
632   if (case_limit != 0)
633     {
634       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
635       *cases_remaining = case_limit;
636       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
637                           cases_remaining);
638       dict_set_case_limit (ds->dict, 0);
639     }
640 }
641
642 /* Limits the maximum number of cases processed to
643    *CASES_REMAINING. */
644 static int
645 case_limit_trns_proc (void *cases_remaining_,
646                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
647 {
648   size_t *cases_remaining = cases_remaining_;
649   if (*cases_remaining > 0)
650     {
651       (*cases_remaining)--;
652       return TRNS_CONTINUE;
653     }
654   else
655     return TRNS_DROP_CASE;
656 }
657
658 /* Frees the data associated with a case limit transformation. */
659 static bool
660 case_limit_trns_free (void *cases_remaining_)
661 {
662   size_t *cases_remaining = cases_remaining_;
663   free (cases_remaining);
664   return true;
665 }
666 \f
667 static trns_proc_func filter_trns_proc;
668
669 /* Adds a temporary transformation to filter data according to
670    the variable specified on FILTER, if any. */
671 static void
672 add_filter_trns (struct dataset *ds)
673 {
674   struct variable *filter_var = dict_get_filter (ds->dict);
675   if (filter_var != NULL)
676     {
677       proc_start_temporary_transformations (ds);
678       add_transformation (ds, filter_trns_proc, NULL, filter_var);
679     }
680 }
681
682 /* FILTER transformation. */
683 static int
684 filter_trns_proc (void *filter_var_,
685                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
686
687 {
688   struct variable *filter_var = filter_var_;
689   double f = case_num (c, filter_var);
690   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
691           ? TRNS_CONTINUE : TRNS_DROP_CASE);
692 }
693
694
695 struct dictionary *
696 dataset_dict (const struct dataset *ds)
697 {
698   return ds->dict;
699 }
700
701 const struct casereader *
702 dataset_source (const struct dataset *ds)
703 {
704   return ds->source;
705 }
706
707 void
708 dataset_need_lag (struct dataset *ds, int n_before)
709 {
710   ds->n_lag = MAX (ds->n_lag, n_before);
711 }