32839e6413356c42b61d238a6634c7c1894775ca
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40 #include "xalloc.h"
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs whenever the transformation chain(s) have
60      been modified */
61   transformation_change_callback_func *xform_callback;
62   void *xform_callback_aux;
63
64   /* If true, cases are discarded instead of being written to
65      sink. */
66   bool discard_output;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The case map used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct case_map *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct deque lag;             /* Deque of lagged cases. */
82   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
83
84   /* Procedure data. */
85   enum
86     {
87       PROC_COMMITTED,           /* No procedure in progress. */
88       PROC_OPEN,                /* proc_open called, casereader still open. */
89       PROC_CLOSED               /* casereader from proc_open destroyed,
90                                    but proc_commit not yet called. */
91     }
92   proc_state;
93   casenumber cases_written;       /* Cases output so far. */
94   bool ok;                    /* Error status. */
95
96   void (*callback) (void *); /* Callback for when the dataset changes */
97   void *cb_data;
98
99 }; /* struct dataset */
100
101
102 static void add_case_limit_trns (struct dataset *ds);
103 static void add_filter_trns (struct dataset *ds);
104
105 static void update_last_proc_invocation (struct dataset *ds);
106
107 static void
108 dataset_set_unsaved (const struct dataset *ds)
109 {
110   if (ds->callback) ds->callback (ds->cb_data);
111 }
112
113 \f
114 /* Public functions. */
115
116 void
117 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
118 {
119   ds->callback = cb;
120   ds->cb_data = cb_data;
121 }
122
123
124 /* Returns the last time the data was read. */
125 time_t
126 time_of_last_procedure (struct dataset *ds)
127 {
128   if (ds->last_proc_invocation == 0)
129     update_last_proc_invocation (ds);
130   return ds->last_proc_invocation;
131 }
132 \f
133 /* Regular procedure. */
134
135 /* Executes any pending transformations, if necessary.
136    This is not identical to the EXECUTE command in that it won't
137    always read the source data.  This can be important when the
138    source data is given inline within BEGIN DATA...END FILE. */
139 bool
140 proc_execute (struct dataset *ds)
141 {
142   bool ok;
143
144   if ((ds->temporary_trns_chain == NULL
145        || trns_chain_is_empty (ds->temporary_trns_chain))
146       && trns_chain_is_empty (ds->permanent_trns_chain))
147     {
148       ds->n_lag = 0;
149       ds->discard_output = false;
150       dict_set_case_limit (ds->dict, 0);
151       dict_clear_vectors (ds->dict);
152       return true;
153     }
154
155   ok = casereader_destroy (proc_open (ds));
156   return proc_commit (ds) && ok;
157 }
158
159 static const struct casereader_class proc_casereader_class;
160
161 /* Opens dataset DS for reading cases with proc_read.
162    proc_commit must be called when done. */
163 struct casereader *
164 proc_open (struct dataset *ds)
165 {
166   assert (ds->source != NULL);
167   assert (ds->proc_state == PROC_COMMITTED);
168
169   update_last_proc_invocation (ds);
170
171   caseinit_mark_for_init (ds->caseinit, ds->dict);
172
173   /* Finish up the collection of transformations. */
174   add_case_limit_trns (ds);
175   add_filter_trns (ds);
176   trns_chain_finalize (ds->cur_trns_chain);
177
178   /* Make permanent_dict refer to the dictionary right before
179      data reaches the sink. */
180   if (ds->permanent_dict == NULL)
181     ds->permanent_dict = ds->dict;
182
183   /* Prepare sink. */
184   if (!ds->discard_output)
185     {
186       struct dictionary *pd = ds->permanent_dict;
187       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
188       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
189       ds->compactor = (should_compact
190                        ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
191                        : NULL);
192       ds->sink = autopaging_writer_create (compacted_value_cnt);
193     }
194   else
195     {
196       ds->compactor = NULL;
197       ds->sink = NULL;
198     }
199
200   /* Allocate memory for lagged cases. */
201   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
202
203   ds->proc_state = PROC_OPEN;
204   ds->cases_written = 0;
205   ds->ok = true;
206
207   /* FIXME: use taint in dataset in place of `ok'? */
208   /* FIXME: for trivial cases we can just return a clone of
209      ds->source? */
210   return casereader_create_sequential (NULL,
211                                        dict_get_next_value_idx (ds->dict),
212                                        CASENUMBER_MAX,
213                                        &proc_casereader_class, ds);
214 }
215
216 /* Returns true if a procedure is in progress, that is, if
217    proc_open has been called but proc_commit has not. */
218 bool
219 proc_is_open (const struct dataset *ds)
220 {
221   return ds->proc_state != PROC_COMMITTED;
222 }
223
224 /* "read" function for procedure casereader. */
225 static struct ccase *
226 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
227 {
228   struct dataset *ds = ds_;
229   enum trns_result retval = TRNS_DROP_CASE;
230   struct ccase *c;
231
232   assert (ds->proc_state == PROC_OPEN);
233   for (; ; case_unref (c))
234     {
235       casenumber case_nr;
236
237       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
238       if (retval == TRNS_ERROR)
239         ds->ok = false;
240       if (!ds->ok)
241         return NULL;
242
243       /* Read a case from source. */
244       c = casereader_read (ds->source);
245       if (c == NULL)
246         return NULL;
247       c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
248       caseinit_init_vars (ds->caseinit, c);
249
250       /* Execute permanent transformations.  */
251       case_nr = ds->cases_written + 1;
252       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
253                                    &c, case_nr);
254       caseinit_update_left_vars (ds->caseinit, c);
255       if (retval != TRNS_CONTINUE)
256         continue;
257
258       /* Write case to collection of lagged cases. */
259       if (ds->n_lag > 0)
260         {
261           while (deque_count (&ds->lag) >= ds->n_lag)
262             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
263           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
264         }
265
266       /* Write case to replacement active file. */
267       ds->cases_written++;
268       if (ds->sink != NULL)
269         casewriter_write (ds->sink,
270                           case_map_execute (ds->compactor, case_ref (c)));
271
272       /* Execute temporary transformations. */
273       if (ds->temporary_trns_chain != NULL)
274         {
275           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
276                                        &c, ds->cases_written);
277           if (retval != TRNS_CONTINUE)
278             continue;
279         }
280
281       return c;
282     }
283 }
284
285 /* "destroy" function for procedure casereader. */
286 static void
287 proc_casereader_destroy (struct casereader *reader, void *ds_)
288 {
289   struct dataset *ds = ds_;
290   struct ccase *c;
291
292   /* Make sure transformations happen for every input case, in
293      case they have side effects, and ensure that the replacement
294      active file gets all the cases it should. */
295   while ((c = casereader_read (reader)) != NULL)
296     case_unref (c);
297
298   ds->proc_state = PROC_CLOSED;
299   ds->ok = casereader_destroy (ds->source) && ds->ok;
300   ds->source = NULL;
301   proc_set_active_file_data (ds, NULL);
302 }
303
304 /* Must return false if the source casereader, a transformation,
305    or the sink casewriter signaled an error.  (If a temporary
306    transformation signals an error, then the return value is
307    false, but the replacement active file may still be
308    untainted.) */
309 bool
310 proc_commit (struct dataset *ds)
311 {
312   assert (ds->proc_state == PROC_CLOSED);
313   ds->proc_state = PROC_COMMITTED;
314
315   dataset_set_unsaved (ds);
316
317   /* Free memory for lagged cases. */
318   while (!deque_is_empty (&ds->lag))
319     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
320   free (ds->lag_cases);
321
322   /* Dictionary from before TEMPORARY becomes permanent. */
323   proc_cancel_temporary_transformations (ds);
324
325   if (!ds->discard_output)
326     {
327       /* Finish compacting. */
328       if (ds->compactor != NULL)
329         {
330           case_map_destroy (ds->compactor);
331           ds->compactor = NULL;
332
333           dict_delete_scratch_vars (ds->dict);
334           dict_compact_values (ds->dict);
335         }
336
337       /* Old data sink becomes new data source. */
338       if (ds->sink != NULL)
339         ds->source = casewriter_make_reader (ds->sink);
340     }
341   else
342     {
343       ds->source = NULL;
344       ds->discard_output = false;
345     }
346   ds->sink = NULL;
347
348   caseinit_clear (ds->caseinit);
349   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
350
351   dict_clear_vectors (ds->dict);
352   ds->permanent_dict = NULL;
353   return proc_cancel_all_transformations (ds) && ds->ok;
354 }
355
356 /* Casereader class for procedure execution. */
357 static const struct casereader_class proc_casereader_class =
358   {
359     proc_casereader_read,
360     proc_casereader_destroy,
361     NULL,
362     NULL,
363   };
364
365 /* Updates last_proc_invocation. */
366 static void
367 update_last_proc_invocation (struct dataset *ds)
368 {
369   ds->last_proc_invocation = time (NULL);
370 }
371 \f
372 /* Returns a pointer to the lagged case from N_BEFORE cases before the
373    current one, or NULL if there haven't been that many cases yet. */
374 const struct ccase *
375 lagged_case (const struct dataset *ds, int n_before)
376 {
377   assert (n_before >= 1);
378   assert (n_before <= ds->n_lag);
379
380   if (n_before <= deque_count (&ds->lag))
381     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
382   else
383     return NULL;
384 }
385 \f
386 /* Returns the current set of permanent transformations,
387    and clears the permanent transformations.
388    For use by INPUT PROGRAM. */
389 struct trns_chain *
390 proc_capture_transformations (struct dataset *ds)
391 {
392   struct trns_chain *chain;
393
394   assert (ds->temporary_trns_chain == NULL);
395   chain = ds->permanent_trns_chain;
396   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
397
398   if ( ds->xform_callback)
399     ds->xform_callback (false, ds->xform_callback_aux);
400
401   return chain;
402 }
403
404 /* Adds a transformation that processes a case with PROC and
405    frees itself with FREE to the current set of transformations.
406    The functions are passed AUX as auxiliary data. */
407 void
408 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
409 {
410   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
411   if ( ds->xform_callback)
412     ds->xform_callback (true, ds->xform_callback_aux);
413 }
414
415 /* Adds a transformation that processes a case with PROC and
416    frees itself with FREE to the current set of transformations.
417    When parsing of the block of transformations is complete,
418    FINALIZE will be called.
419    The functions are passed AUX as auxiliary data. */
420 void
421 add_transformation_with_finalizer (struct dataset *ds,
422                                    trns_finalize_func *finalize,
423                                    trns_proc_func *proc,
424                                    trns_free_func *free, void *aux)
425 {
426   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
427
428   if ( ds->xform_callback)
429     ds->xform_callback (true, ds->xform_callback_aux);
430 }
431
432 /* Returns the index of the next transformation.
433    This value can be returned by a transformation procedure
434    function to indicate a "jump" to that transformation. */
435 size_t
436 next_transformation (const struct dataset *ds)
437 {
438   return trns_chain_next (ds->cur_trns_chain);
439 }
440
441 /* Returns true if the next call to add_transformation() will add
442    a temporary transformation, false if it will add a permanent
443    transformation. */
444 bool
445 proc_in_temporary_transformations (const struct dataset *ds)
446 {
447   return ds->temporary_trns_chain != NULL;
448 }
449
450 /* Marks the start of temporary transformations.
451    Further calls to add_transformation() will add temporary
452    transformations. */
453 void
454 proc_start_temporary_transformations (struct dataset *ds)
455 {
456   if (!proc_in_temporary_transformations (ds))
457     {
458       add_case_limit_trns (ds);
459
460       ds->permanent_dict = dict_clone (ds->dict);
461
462       trns_chain_finalize (ds->permanent_trns_chain);
463       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
464
465       if ( ds->xform_callback)
466         ds->xform_callback (true, ds->xform_callback_aux);
467     }
468 }
469
470 /* Converts all the temporary transformations, if any, to
471    permanent transformations.  Further transformations will be
472    permanent.
473    Returns true if anything changed, false otherwise. */
474 bool
475 proc_make_temporary_transformations_permanent (struct dataset *ds)
476 {
477   if (proc_in_temporary_transformations (ds))
478     {
479       trns_chain_finalize (ds->temporary_trns_chain);
480       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
481       ds->temporary_trns_chain = NULL;
482
483       dict_destroy (ds->permanent_dict);
484       ds->permanent_dict = NULL;
485
486       return true;
487     }
488   else
489     return false;
490 }
491
492 /* Cancels all temporary transformations, if any.  Further
493    transformations will be permanent.
494    Returns true if anything changed, false otherwise. */
495 bool
496 proc_cancel_temporary_transformations (struct dataset *ds)
497 {
498   if (proc_in_temporary_transformations (ds))
499     {
500       dict_destroy (ds->dict);
501       ds->dict = ds->permanent_dict;
502       ds->permanent_dict = NULL;
503
504       trns_chain_destroy (ds->temporary_trns_chain);
505       ds->temporary_trns_chain = NULL;
506
507       if ( ds->xform_callback)
508         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
509                             ds->xform_callback_aux);
510
511       return true;
512     }
513   else
514     return false;
515 }
516
517 /* Cancels all transformations, if any.
518    Returns true if successful, false on I/O error. */
519 bool
520 proc_cancel_all_transformations (struct dataset *ds)
521 {
522   bool ok;
523   assert (ds->proc_state == PROC_COMMITTED);
524   ok = trns_chain_destroy (ds->permanent_trns_chain);
525   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
526   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
527   ds->temporary_trns_chain = NULL;
528   if ( ds->xform_callback)
529     ds->xform_callback (false, ds->xform_callback_aux);
530
531   return ok;
532 }
533 \f
534
535 static void
536 dict_callback (struct dictionary *d UNUSED, void *ds_)
537 {
538   struct dataset *ds = ds_;
539   dataset_set_unsaved (ds);
540 }
541
542 /* Initializes procedure handling. */
543 struct dataset *
544 create_dataset (void)
545 {
546   struct dataset *ds = xzalloc (sizeof(*ds));
547   ds->dict = dict_create ();
548
549   dict_set_change_callback (ds->dict, dict_callback, ds);
550
551   ds->caseinit = caseinit_create ();
552   proc_cancel_all_transformations (ds);
553   return ds;
554 }
555
556
557 void
558 dataset_add_transform_change_callback (struct dataset *ds,
559                                        transformation_change_callback_func *cb,
560                                        void *aux)
561 {
562   ds->xform_callback = cb;
563   ds->xform_callback_aux = aux;
564 }
565
566 /* Finishes up procedure handling. */
567 void
568 destroy_dataset (struct dataset *ds)
569 {
570   proc_discard_active_file (ds);
571   dict_destroy (ds->dict);
572   caseinit_destroy (ds->caseinit);
573   trns_chain_destroy (ds->permanent_trns_chain);
574
575   if ( ds->xform_callback)
576     ds->xform_callback (false, ds->xform_callback_aux);
577   free (ds);
578 }
579
580 /* Causes output from the next procedure to be discarded, instead
581    of being preserved for use as input for the next procedure. */
582 void
583 proc_discard_output (struct dataset *ds)
584 {
585   ds->discard_output = true;
586 }
587
588 /* Discards the active file dictionary, data, and
589    transformations. */
590 void
591 proc_discard_active_file (struct dataset *ds)
592 {
593   assert (ds->proc_state == PROC_COMMITTED);
594
595   dict_clear (ds->dict);
596   fh_set_default_handle (NULL);
597
598   ds->n_lag = 0;
599
600   casereader_destroy (ds->source);
601   ds->source = NULL;
602
603   proc_cancel_all_transformations (ds);
604 }
605
606 /* Sets SOURCE as the source for procedure input for the next
607    procedure. */
608 void
609 proc_set_active_file (struct dataset *ds,
610                       struct casereader *source,
611                       struct dictionary *dict)
612 {
613   assert (ds->proc_state == PROC_COMMITTED);
614   assert (ds->dict != dict);
615
616   proc_discard_active_file (ds);
617
618   dict_destroy (ds->dict);
619   ds->dict = dict;
620   dict_set_change_callback (ds->dict, dict_callback, ds);
621
622   proc_set_active_file_data (ds, source);
623 }
624
625 /* Replaces the active file's data by READER without replacing
626    the associated dictionary. */
627 bool
628 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
629 {
630   casereader_destroy (ds->source);
631   ds->source = reader;
632
633   caseinit_clear (ds->caseinit);
634   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
635
636   return reader == NULL || !casereader_error (reader);
637 }
638
639 /* Returns true if an active file data source is available, false
640    otherwise. */
641 bool
642 proc_has_active_file (const struct dataset *ds)
643 {
644   return ds->source != NULL;
645 }
646
647 /* Returns the active file data source from DS, or a null pointer
648    if DS has no data source, and removes it from DS. */
649 struct casereader *
650 proc_extract_active_file_data (struct dataset *ds)
651 {
652   struct casereader *reader = ds->source;
653   ds->source = NULL;
654
655   return reader;
656 }
657
658 /* Checks whether DS has a corrupted active file.  If so,
659    discards it and returns false.  If not, returns true without
660    doing anything. */
661 bool
662 dataset_end_of_command (struct dataset *ds)
663 {
664   if (ds->source != NULL)
665     {
666       if (casereader_error (ds->source))
667         {
668           proc_discard_active_file (ds);
669           return false;
670         }
671       else
672         {
673           const struct taint *taint = casereader_get_taint (ds->source);
674           taint_reset_successor_taint ((struct taint *) taint);
675           assert (!taint_has_tainted_successor (taint));
676         }
677     }
678   return true;
679 }
680 \f
681 static trns_proc_func case_limit_trns_proc;
682 static trns_free_func case_limit_trns_free;
683
684 /* Adds a transformation that limits the number of cases that may
685    pass through, if DS->DICT has a case limit. */
686 static void
687 add_case_limit_trns (struct dataset *ds)
688 {
689   casenumber case_limit = dict_get_case_limit (ds->dict);
690   if (case_limit != 0)
691     {
692       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
693       *cases_remaining = case_limit;
694       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
695                           cases_remaining);
696       dict_set_case_limit (ds->dict, 0);
697     }
698 }
699
700 /* Limits the maximum number of cases processed to
701    *CASES_REMAINING. */
702 static int
703 case_limit_trns_proc (void *cases_remaining_,
704                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
705 {
706   size_t *cases_remaining = cases_remaining_;
707   if (*cases_remaining > 0)
708     {
709       (*cases_remaining)--;
710       return TRNS_CONTINUE;
711     }
712   else
713     return TRNS_DROP_CASE;
714 }
715
716 /* Frees the data associated with a case limit transformation. */
717 static bool
718 case_limit_trns_free (void *cases_remaining_)
719 {
720   size_t *cases_remaining = cases_remaining_;
721   free (cases_remaining);
722   return true;
723 }
724 \f
725 static trns_proc_func filter_trns_proc;
726
727 /* Adds a temporary transformation to filter data according to
728    the variable specified on FILTER, if any. */
729 static void
730 add_filter_trns (struct dataset *ds)
731 {
732   struct variable *filter_var = dict_get_filter (ds->dict);
733   if (filter_var != NULL)
734     {
735       proc_start_temporary_transformations (ds);
736       add_transformation (ds, filter_trns_proc, NULL, filter_var);
737     }
738 }
739
740 /* FILTER transformation. */
741 static int
742 filter_trns_proc (void *filter_var_,
743                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
744
745 {
746   struct variable *filter_var = filter_var_;
747   double f = case_num (*c, filter_var);
748   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
749           ? TRNS_CONTINUE : TRNS_DROP_CASE);
750 }
751
752
753 struct dictionary *
754 dataset_dict (const struct dataset *ds)
755 {
756   return ds->dict;
757 }
758
759 const struct casereader *
760 dataset_source (const struct dataset *ds)
761 {
762   return ds->source;
763 }
764
765 void
766 dataset_need_lag (struct dataset *ds, int n_before)
767 {
768   ds->n_lag = MAX (ds->n_lag, n_before);
769 }