84d53d349a054f35f737c029e3fa6e9410daca5a
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <data/case.h>
27 #include <data/caseinit.h>
28 #include <data/casereader.h>
29 #include <data/casereader-provider.h>
30 #include <data/casewriter.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/transformations.h>
35 #include <data/variable.h>
36 #include <libpspp/alloc.h>
37 #include <libpspp/deque.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40 #include <libpspp/taint.h>
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs when a procedure provides a new source for
60      the dataset */
61   replace_source_callback *replace_source ;
62
63   /* Callback which occurs whenever the DICT is replaced by a new one */
64   replace_dictionary_callback *replace_dict;
65
66   /* If true, cases are discarded instead of being written to
67      sink. */
68   bool discard_output;
69
70   /* The transformation chain that the next transformation will be
71      added to. */
72   struct trns_chain *cur_trns_chain;
73
74   /* The compactor used to compact a case, if necessary;
75      otherwise a null pointer. */
76   struct dict_compactor *compactor;
77
78   /* Time at which proc was last invoked. */
79   time_t last_proc_invocation;
80
81   /* Cases just before ("lagging") the current one. */
82   int n_lag;                    /* Number of cases to lag. */
83   struct deque lag;             /* Deque of lagged cases. */
84   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
85
86   /* Procedure data. */
87   enum
88     {
89       PROC_COMMITTED,           /* No procedure in progress. */
90       PROC_OPEN,                /* proc_open called, casereader still open. */
91       PROC_CLOSED               /* casereader from proc_open destroyed,
92                                    but proc_commit not yet called. */
93     }
94   proc_state;
95   casenumber cases_written;       /* Cases output so far. */
96   bool ok;                    /* Error status. */
97 }; /* struct dataset */
98
99
100 static void add_case_limit_trns (struct dataset *ds);
101 static void add_filter_trns (struct dataset *ds);
102
103 static void update_last_proc_invocation (struct dataset *ds);
104 \f
105 /* Public functions. */
106
107 /* Returns the last time the data was read. */
108 time_t
109 time_of_last_procedure (struct dataset *ds)
110 {
111   if (ds->last_proc_invocation == 0)
112     update_last_proc_invocation (ds);
113   return ds->last_proc_invocation;
114 }
115 \f
116 /* Regular procedure. */
117
118 /* Executes any pending transformations, if necessary.
119    This is not identical to the EXECUTE command in that it won't
120    always read the source data.  This can be important when the
121    source data is given inline within BEGIN DATA...END FILE. */
122 bool
123 proc_execute (struct dataset *ds)
124 {
125   bool ok;
126
127   if ((ds->temporary_trns_chain == NULL
128        || trns_chain_is_empty (ds->temporary_trns_chain))
129       && trns_chain_is_empty (ds->permanent_trns_chain))
130     {
131       ds->n_lag = 0;
132       ds->discard_output = false;
133       dict_set_case_limit (ds->dict, 0);
134       dict_clear_vectors (ds->dict);
135       return true;
136     }
137
138   ok = casereader_destroy (proc_open (ds));
139   return proc_commit (ds) && ok;
140 }
141
142 static struct casereader_class proc_casereader_class;
143
144 /* Opens dataset DS for reading cases with proc_read.
145    proc_commit must be called when done. */
146 struct casereader *
147 proc_open (struct dataset *ds)
148 {
149   assert (ds->source != NULL);
150   assert (ds->proc_state == PROC_COMMITTED);
151
152   update_last_proc_invocation (ds);
153
154   caseinit_mark_for_init (ds->caseinit, ds->dict);
155
156   /* Finish up the collection of transformations. */
157   add_case_limit_trns (ds);
158   add_filter_trns (ds);
159   trns_chain_finalize (ds->cur_trns_chain);
160
161   /* Make permanent_dict refer to the dictionary right before
162      data reaches the sink. */
163   if (ds->permanent_dict == NULL)
164     ds->permanent_dict = ds->dict;
165
166   /* Prepare sink. */
167   if (!ds->discard_output)
168     {
169       ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
170                        ? dict_make_compactor (ds->permanent_dict)
171                        : NULL);
172       ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
173                                              ds->permanent_dict));
174     }
175   else
176     {
177       ds->compactor = NULL;
178       ds->sink = NULL;
179     }
180
181   /* Allocate memory for lagged cases. */
182   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
183
184   ds->proc_state = PROC_OPEN;
185   ds->cases_written = 0;
186   ds->ok = true;
187
188   /* FIXME: use taint in dataset in place of `ok'? */
189   /* FIXME: for trivial cases we can just return a clone of
190      ds->source? */
191   return casereader_create_sequential (NULL,
192                                        dict_get_next_value_idx (ds->dict),
193                                        CASENUMBER_MAX,
194                                        &proc_casereader_class, ds);
195 }
196
197 /* Returns true if a procedure is in progress, that is, if
198    proc_open has been called but proc_commit has not. */
199 bool
200 proc_is_open (const struct dataset *ds)
201 {
202   return ds->proc_state != PROC_COMMITTED;
203 }
204
205 /* "read" function for procedure casereader. */
206 static bool
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
208                       struct ccase *c)
209 {
210   struct dataset *ds = ds_;
211   enum trns_result retval = TRNS_DROP_CASE;
212
213   assert (ds->proc_state == PROC_OPEN);
214   for (;;)
215     {
216       casenumber case_nr;
217
218       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219       if (retval == TRNS_ERROR)
220         ds->ok = false;
221       if (!ds->ok)
222         return false;
223
224       /* Read a case from source. */
225       if (!casereader_read (ds->source, c))
226         return false;
227       case_resize (c, dict_get_next_value_idx (ds->dict));
228       caseinit_init_vars (ds->caseinit, c);
229
230       /* Execute permanent transformations.  */
231       case_nr = ds->cases_written + 1;
232       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
233                                    c, case_nr);
234       caseinit_update_left_vars (ds->caseinit, c);
235       if (retval != TRNS_CONTINUE)
236         {
237           case_destroy (c);
238           continue;
239         }
240
241       /* Write case to collection of lagged cases. */
242       if (ds->n_lag > 0)
243         {
244           while (deque_count (&ds->lag) >= ds->n_lag)
245             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
246           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
247         }
248
249       /* Write case to replacement active file. */
250       ds->cases_written++;
251       if (ds->sink != NULL)
252         {
253           struct ccase tmp;
254           if (ds->compactor != NULL)
255             {
256               case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
257               dict_compactor_compact (ds->compactor, &tmp, c);
258             }
259           else
260             case_clone (&tmp, c);
261           casewriter_write (ds->sink, &tmp);
262         }
263
264       /* Execute temporary transformations. */
265       if (ds->temporary_trns_chain != NULL)
266         {
267           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
268                                        c, ds->cases_written);
269           if (retval != TRNS_CONTINUE)
270             {
271               case_destroy (c);
272               continue;
273             }
274         }
275
276       return true;
277     }
278 }
279
280 /* "destroy" function for procedure casereader. */
281 static void
282 proc_casereader_destroy (struct casereader *reader, void *ds_)
283 {
284   struct dataset *ds = ds_;
285   struct ccase c;
286
287   /* Make sure transformations happen for every input case, in
288      case they have side effects, and ensure that the replacement
289      active file gets all the cases it should. */
290   while (casereader_read (reader, &c))
291     case_destroy (&c);
292
293   ds->proc_state = PROC_CLOSED;
294   ds->ok = casereader_destroy (ds->source) && ds->ok;
295   ds->source = NULL;
296   proc_set_active_file_data (ds, NULL);
297 }
298
299 /* Must return false if the source casereader, a transformation,
300    or the sink casewriter signaled an error.  (If a temporary
301    transformation signals an error, then the return value is
302    false, but the replacement active file may still be
303    untainted.) */
304 bool
305 proc_commit (struct dataset *ds)
306 {
307   assert (ds->proc_state == PROC_CLOSED);
308   ds->proc_state = PROC_COMMITTED;
309
310   /* Free memory for lagged cases. */
311   while (!deque_is_empty (&ds->lag))
312     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
313   free (ds->lag_cases);
314
315   /* Dictionary from before TEMPORARY becomes permanent. */
316   proc_cancel_temporary_transformations (ds);
317
318   if (!ds->discard_output)
319     {
320       /* Finish compacting. */
321       if (ds->compactor != NULL)
322         {
323           dict_compactor_destroy (ds->compactor);
324           dict_compact_values (ds->dict);
325           ds->compactor = NULL;
326         }
327
328       /* Old data sink becomes new data source. */
329       if (ds->sink != NULL)
330         ds->source = casewriter_make_reader (ds->sink);
331     }
332   else
333     {
334       ds->source = NULL;
335       ds->discard_output = false;
336     }
337   ds->sink = NULL;
338   if ( ds->replace_source) ds->replace_source (ds->source);
339
340   caseinit_clear (ds->caseinit);
341   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
342
343   dict_clear_vectors (ds->dict);
344   ds->permanent_dict = NULL;
345   return proc_cancel_all_transformations (ds) && ds->ok;
346 }
347
348 /* Casereader class for procedure execution. */
349 static struct casereader_class proc_casereader_class =
350   {
351     proc_casereader_read,
352     proc_casereader_destroy,
353     NULL,
354     NULL,
355   };
356
357 /* Updates last_proc_invocation. */
358 static void
359 update_last_proc_invocation (struct dataset *ds)
360 {
361   ds->last_proc_invocation = time (NULL);
362 }
363 \f
364 /* Returns a pointer to the lagged case from N_BEFORE cases before the
365    current one, or NULL if there haven't been that many cases yet. */
366 struct ccase *
367 lagged_case (const struct dataset *ds, int n_before)
368 {
369   assert (n_before >= 1);
370   assert (n_before <= ds->n_lag);
371
372   if (n_before <= deque_count (&ds->lag))
373     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
374   else
375     return NULL;
376 }
377 \f
378 /* Returns the current set of permanent transformations,
379    and clears the permanent transformations.
380    For use by INPUT PROGRAM. */
381 struct trns_chain *
382 proc_capture_transformations (struct dataset *ds)
383 {
384   struct trns_chain *chain;
385
386   assert (ds->temporary_trns_chain == NULL);
387   chain = ds->permanent_trns_chain;
388   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
389   return chain;
390 }
391
392 /* Adds a transformation that processes a case with PROC and
393    frees itself with FREE to the current set of transformations.
394    The functions are passed AUX as auxiliary data. */
395 void
396 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
397 {
398   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
399 }
400
401 /* Adds a transformation that processes a case with PROC and
402    frees itself with FREE to the current set of transformations.
403    When parsing of the block of transformations is complete,
404    FINALIZE will be called.
405    The functions are passed AUX as auxiliary data. */
406 void
407 add_transformation_with_finalizer (struct dataset *ds,
408                                    trns_finalize_func *finalize,
409                                    trns_proc_func *proc,
410                                    trns_free_func *free, void *aux)
411 {
412   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
413 }
414
415 /* Returns the index of the next transformation.
416    This value can be returned by a transformation procedure
417    function to indicate a "jump" to that transformation. */
418 size_t
419 next_transformation (const struct dataset *ds)
420 {
421   return trns_chain_next (ds->cur_trns_chain);
422 }
423
424 /* Returns true if the next call to add_transformation() will add
425    a temporary transformation, false if it will add a permanent
426    transformation. */
427 bool
428 proc_in_temporary_transformations (const struct dataset *ds)
429 {
430   return ds->temporary_trns_chain != NULL;
431 }
432
433 /* Marks the start of temporary transformations.
434    Further calls to add_transformation() will add temporary
435    transformations. */
436 void
437 proc_start_temporary_transformations (struct dataset *ds)
438 {
439   if (!proc_in_temporary_transformations (ds))
440     {
441       add_case_limit_trns (ds);
442
443       ds->permanent_dict = dict_clone (ds->dict);
444
445       trns_chain_finalize (ds->permanent_trns_chain);
446       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
447     }
448 }
449
450 /* Converts all the temporary transformations, if any, to
451    permanent transformations.  Further transformations will be
452    permanent.
453    Returns true if anything changed, false otherwise. */
454 bool
455 proc_make_temporary_transformations_permanent (struct dataset *ds)
456 {
457   if (proc_in_temporary_transformations (ds))
458     {
459       trns_chain_finalize (ds->temporary_trns_chain);
460       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
461       ds->temporary_trns_chain = NULL;
462
463       dict_destroy (ds->permanent_dict);
464       ds->permanent_dict = NULL;
465
466       return true;
467     }
468   else
469     return false;
470 }
471
472 /* Cancels all temporary transformations, if any.  Further
473    transformations will be permanent.
474    Returns true if anything changed, false otherwise. */
475 bool
476 proc_cancel_temporary_transformations (struct dataset *ds)
477 {
478   if (proc_in_temporary_transformations (ds))
479     {
480       dict_destroy (ds->dict);
481       ds->dict = ds->permanent_dict;
482       ds->permanent_dict = NULL;
483       if (ds->replace_dict) ds->replace_dict (ds->dict);
484
485       trns_chain_destroy (ds->temporary_trns_chain);
486       ds->temporary_trns_chain = NULL;
487
488       return true;
489     }
490   else
491     return false;
492 }
493
494 /* Cancels all transformations, if any.
495    Returns true if successful, false on I/O error. */
496 bool
497 proc_cancel_all_transformations (struct dataset *ds)
498 {
499   bool ok;
500   assert (ds->proc_state == PROC_COMMITTED);
501   ok = trns_chain_destroy (ds->permanent_trns_chain);
502   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
503   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
504   ds->temporary_trns_chain = NULL;
505   return ok;
506 }
507 \f
508 /* Initializes procedure handling. */
509 struct dataset *
510 create_dataset (replace_source_callback *rps,
511                 replace_dictionary_callback *rds)
512 {
513   struct dataset *ds = xzalloc (sizeof(*ds));
514   ds->dict = dict_create ();
515   ds->caseinit = caseinit_create ();
516   ds->replace_source = rps;
517   ds->replace_dict = rds;
518   proc_cancel_all_transformations (ds);
519   return ds;
520 }
521
522 /* Finishes up procedure handling. */
523 void
524 destroy_dataset (struct dataset *ds)
525 {
526   proc_discard_active_file (ds);
527   dict_destroy (ds->dict);
528   caseinit_destroy (ds->caseinit);
529   trns_chain_destroy (ds->permanent_trns_chain);
530   free (ds);
531 }
532
533 /* Causes output from the next procedure to be discarded, instead
534    of being preserved for use as input for the next procedure. */
535 void
536 proc_discard_output (struct dataset *ds)
537 {
538   ds->discard_output = true;
539 }
540
541 /* Discards the active file dictionary, data, and
542    transformations. */
543 void
544 proc_discard_active_file (struct dataset *ds)
545 {
546   assert (ds->proc_state == PROC_COMMITTED);
547
548   dict_clear (ds->dict);
549   fh_set_default_handle (NULL);
550
551   ds->n_lag = 0;
552
553   casereader_destroy (ds->source);
554   ds->source = NULL;
555   if ( ds->replace_source) ds->replace_source (NULL);
556
557   proc_cancel_all_transformations (ds);
558 }
559
560 /* Sets SOURCE as the source for procedure input for the next
561    procedure. */
562 void
563 proc_set_active_file (struct dataset *ds,
564                       struct casereader *source,
565                       struct dictionary *dict)
566 {
567   assert (ds->proc_state == PROC_COMMITTED);
568   assert (ds->dict != dict);
569
570   proc_discard_active_file (ds);
571
572   dict_destroy (ds->dict);
573   ds->dict = dict;
574   if ( ds->replace_dict) ds->replace_dict (dict);
575
576   proc_set_active_file_data (ds, source);
577 }
578
579 /* Replaces the active file's data by READER without replacing
580    the associated dictionary. */
581 bool
582 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
583 {
584   casereader_destroy (ds->source);
585   ds->source = reader;
586   if (ds->replace_source) ds->replace_source (reader);
587
588   caseinit_clear (ds->caseinit);
589   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
590
591   return reader == NULL || !casereader_error (reader);
592 }
593
594 /* Returns true if an active file data source is available, false
595    otherwise. */
596 bool
597 proc_has_active_file (const struct dataset *ds)
598 {
599   return ds->source != NULL;
600 }
601
602 /* Checks whether DS has a corrupted active file.  If so,
603    discards it and returns false.  If not, returns true without
604    doing anything. */
605 bool
606 dataset_end_of_command (struct dataset *ds)
607 {
608   if (ds->source != NULL)
609     {
610       if (casereader_error (ds->source))
611         {
612           proc_discard_active_file (ds);
613           return false;
614         }
615       else
616         {
617           const struct taint *taint = casereader_get_taint (ds->source);
618           taint_reset_successor_taint ((struct taint *) taint);
619           assert (!taint_has_tainted_successor (taint));
620         }
621     }
622   return true;
623 }
624 \f
625 static trns_proc_func case_limit_trns_proc;
626 static trns_free_func case_limit_trns_free;
627
628 /* Adds a transformation that limits the number of cases that may
629    pass through, if DS->DICT has a case limit. */
630 static void
631 add_case_limit_trns (struct dataset *ds)
632 {
633   size_t case_limit = dict_get_case_limit (ds->dict);
634   if (case_limit != 0)
635     {
636       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
637       *cases_remaining = case_limit;
638       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
639                           cases_remaining);
640       dict_set_case_limit (ds->dict, 0);
641     }
642 }
643
644 /* Limits the maximum number of cases processed to
645    *CASES_REMAINING. */
646 static int
647 case_limit_trns_proc (void *cases_remaining_,
648                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
649 {
650   size_t *cases_remaining = cases_remaining_;
651   if (*cases_remaining > 0)
652     {
653       (*cases_remaining)--;
654       return TRNS_CONTINUE;
655     }
656   else
657     return TRNS_DROP_CASE;
658 }
659
660 /* Frees the data associated with a case limit transformation. */
661 static bool
662 case_limit_trns_free (void *cases_remaining_)
663 {
664   size_t *cases_remaining = cases_remaining_;
665   free (cases_remaining);
666   return true;
667 }
668 \f
669 static trns_proc_func filter_trns_proc;
670
671 /* Adds a temporary transformation to filter data according to
672    the variable specified on FILTER, if any. */
673 static void
674 add_filter_trns (struct dataset *ds)
675 {
676   struct variable *filter_var = dict_get_filter (ds->dict);
677   if (filter_var != NULL)
678     {
679       proc_start_temporary_transformations (ds);
680       add_transformation (ds, filter_trns_proc, NULL, filter_var);
681     }
682 }
683
684 /* FILTER transformation. */
685 static int
686 filter_trns_proc (void *filter_var_,
687                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
688
689 {
690   struct variable *filter_var = filter_var_;
691   double f = case_num (c, filter_var);
692   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
693           ? TRNS_CONTINUE : TRNS_DROP_CASE);
694 }
695
696
697 struct dictionary *
698 dataset_dict (const struct dataset *ds)
699 {
700   return ds->dict;
701 }
702
703 const struct casereader *
704 dataset_source (const struct dataset *ds)
705 {
706   return ds->source;
707 }
708
709 void
710 dataset_need_lag (struct dataset *ds, int n_before)
711 {
712   ds->n_lag = MAX (ds->n_lag, n_before);
713 }