fbb9a7575cd3eba3a2864e898c85573d388a4cfc
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39 #include <libpspp/i18n.h>
40
41 #include "xalloc.h"
42
43 struct dataset {
44   /* Cases are read from source,
45      their transformation variables are initialized,
46      pass through permanent_trns_chain (which transforms them into
47      the format described by permanent_dict),
48      are written to sink,
49      pass through temporary_trns_chain (which transforms them into
50      the format described by dict),
51      and are finally passed to the procedure. */
52   struct casereader *source;
53   struct caseinit *caseinit;
54   struct trns_chain *permanent_trns_chain;
55   struct dictionary *permanent_dict;
56   struct casewriter *sink;
57   struct trns_chain *temporary_trns_chain;
58   struct dictionary *dict;
59
60   /* Callback which occurs whenever the transformation chain(s) have
61      been modified */
62   transformation_change_callback_func *xform_callback;
63   void *xform_callback_aux;
64
65   /* If true, cases are discarded instead of being written to
66      sink. */
67   bool discard_output;
68
69   /* The transformation chain that the next transformation will be
70      added to. */
71   struct trns_chain *cur_trns_chain;
72
73   /* The case map used to compact a case, if necessary;
74      otherwise a null pointer. */
75   struct case_map *compactor;
76
77   /* Time at which proc was last invoked. */
78   time_t last_proc_invocation;
79
80   /* Cases just before ("lagging") the current one. */
81   int n_lag;                    /* Number of cases to lag. */
82   struct deque lag;             /* Deque of lagged cases. */
83   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
84
85   /* Procedure data. */
86   enum
87     {
88       PROC_COMMITTED,           /* No procedure in progress. */
89       PROC_OPEN,                /* proc_open called, casereader still open. */
90       PROC_CLOSED               /* casereader from proc_open destroyed,
91                                    but proc_commit not yet called. */
92     }
93   proc_state;
94   casenumber cases_written;       /* Cases output so far. */
95   bool ok;                    /* Error status. */
96
97   void (*callback) (void *); /* Callback for when the dataset changes */
98   void *cb_data;
99
100 }; /* struct dataset */
101
102
103 static void add_case_limit_trns (struct dataset *ds);
104 static void add_filter_trns (struct dataset *ds);
105
106 static void update_last_proc_invocation (struct dataset *ds);
107
108 static void
109 dataset_set_unsaved (const struct dataset *ds)
110 {
111   if (ds->callback) ds->callback (ds->cb_data);
112 }
113
114 \f
115 /* Public functions. */
116
117 void
118 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
119 {
120   ds->callback = cb;
121   ds->cb_data = cb_data;
122 }
123
124
125 /* Returns the last time the data was read. */
126 time_t
127 time_of_last_procedure (struct dataset *ds)
128 {
129   if (ds->last_proc_invocation == 0)
130     update_last_proc_invocation (ds);
131   return ds->last_proc_invocation;
132 }
133 \f
134 /* Regular procedure. */
135
136 /* Executes any pending transformations, if necessary.
137    This is not identical to the EXECUTE command in that it won't
138    always read the source data.  This can be important when the
139    source data is given inline within BEGIN DATA...END FILE. */
140 bool
141 proc_execute (struct dataset *ds)
142 {
143   bool ok;
144
145   if ((ds->temporary_trns_chain == NULL
146        || trns_chain_is_empty (ds->temporary_trns_chain))
147       && trns_chain_is_empty (ds->permanent_trns_chain))
148     {
149       ds->n_lag = 0;
150       ds->discard_output = false;
151       dict_set_case_limit (ds->dict, 0);
152       dict_clear_vectors (ds->dict);
153       return true;
154     }
155
156   ok = casereader_destroy (proc_open (ds));
157   return proc_commit (ds) && ok;
158 }
159
160 static const struct casereader_class proc_casereader_class;
161
162 /* Opens dataset DS for reading cases with proc_read.
163    proc_commit must be called when done. */
164 struct casereader *
165 proc_open (struct dataset *ds)
166 {
167   assert (ds->source != NULL);
168   assert (ds->proc_state == PROC_COMMITTED);
169
170   update_last_proc_invocation (ds);
171
172   caseinit_mark_for_init (ds->caseinit, ds->dict);
173
174   /* Finish up the collection of transformations. */
175   add_case_limit_trns (ds);
176   add_filter_trns (ds);
177   trns_chain_finalize (ds->cur_trns_chain);
178
179   /* Make permanent_dict refer to the dictionary right before
180      data reaches the sink. */
181   if (ds->permanent_dict == NULL)
182     ds->permanent_dict = ds->dict;
183
184   /* Prepare sink. */
185   if (!ds->discard_output)
186     {
187       struct dictionary *pd = ds->permanent_dict;
188       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
189       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
190       ds->compactor = (should_compact
191                        ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
192                        : NULL);
193       ds->sink = autopaging_writer_create (compacted_value_cnt);
194     }
195   else
196     {
197       ds->compactor = NULL;
198       ds->sink = NULL;
199     }
200
201   /* Allocate memory for lagged cases. */
202   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
203
204   ds->proc_state = PROC_OPEN;
205   ds->cases_written = 0;
206   ds->ok = true;
207
208   /* FIXME: use taint in dataset in place of `ok'? */
209   /* FIXME: for trivial cases we can just return a clone of
210      ds->source? */
211   return casereader_create_sequential (NULL,
212                                        dict_get_next_value_idx (ds->dict),
213                                        CASENUMBER_MAX,
214                                        &proc_casereader_class, ds);
215 }
216
217 /* Returns true if a procedure is in progress, that is, if
218    proc_open has been called but proc_commit has not. */
219 bool
220 proc_is_open (const struct dataset *ds)
221 {
222   return ds->proc_state != PROC_COMMITTED;
223 }
224
225 /* "read" function for procedure casereader. */
226 static struct ccase *
227 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
228 {
229   struct dataset *ds = ds_;
230   enum trns_result retval = TRNS_DROP_CASE;
231   struct ccase *c;
232
233   assert (ds->proc_state == PROC_OPEN);
234   for (; ; case_unref (c))
235     {
236       casenumber case_nr;
237
238       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
239       if (retval == TRNS_ERROR)
240         ds->ok = false;
241       if (!ds->ok)
242         return NULL;
243
244       /* Read a case from source. */
245       c = casereader_read (ds->source);
246       if (c == NULL)
247         return NULL;
248       c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
249       caseinit_init_vars (ds->caseinit, c);
250
251       /* Execute permanent transformations.  */
252       case_nr = ds->cases_written + 1;
253       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
254                                    &c, case_nr);
255       caseinit_update_left_vars (ds->caseinit, c);
256       if (retval != TRNS_CONTINUE)
257         continue;
258
259       /* Write case to collection of lagged cases. */
260       if (ds->n_lag > 0)
261         {
262           while (deque_count (&ds->lag) >= ds->n_lag)
263             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
264           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
265         }
266
267       /* Write case to replacement active file. */
268       ds->cases_written++;
269       if (ds->sink != NULL)
270         casewriter_write (ds->sink,
271                           case_map_execute (ds->compactor, case_ref (c)));
272
273       /* Execute temporary transformations. */
274       if (ds->temporary_trns_chain != NULL)
275         {
276           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
277                                        &c, ds->cases_written);
278           if (retval != TRNS_CONTINUE)
279             continue;
280         }
281
282       return c;
283     }
284 }
285
286 /* "destroy" function for procedure casereader. */
287 static void
288 proc_casereader_destroy (struct casereader *reader, void *ds_)
289 {
290   struct dataset *ds = ds_;
291   struct ccase *c;
292
293   /* Make sure transformations happen for every input case, in
294      case they have side effects, and ensure that the replacement
295      active file gets all the cases it should. */
296   while ((c = casereader_read (reader)) != NULL)
297     case_unref (c);
298
299   ds->proc_state = PROC_CLOSED;
300   ds->ok = casereader_destroy (ds->source) && ds->ok;
301   ds->source = NULL;
302   proc_set_active_file_data (ds, NULL);
303 }
304
305 /* Must return false if the source casereader, a transformation,
306    or the sink casewriter signaled an error.  (If a temporary
307    transformation signals an error, then the return value is
308    false, but the replacement active file may still be
309    untainted.) */
310 bool
311 proc_commit (struct dataset *ds)
312 {
313   assert (ds->proc_state == PROC_CLOSED);
314   ds->proc_state = PROC_COMMITTED;
315
316   dataset_set_unsaved (ds);
317
318   /* Free memory for lagged cases. */
319   while (!deque_is_empty (&ds->lag))
320     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
321   free (ds->lag_cases);
322
323   /* Dictionary from before TEMPORARY becomes permanent. */
324   proc_cancel_temporary_transformations (ds);
325
326   if (!ds->discard_output)
327     {
328       /* Finish compacting. */
329       if (ds->compactor != NULL)
330         {
331           case_map_destroy (ds->compactor);
332           ds->compactor = NULL;
333
334           dict_delete_scratch_vars (ds->dict);
335           dict_compact_values (ds->dict);
336         }
337
338       /* Old data sink becomes new data source. */
339       if (ds->sink != NULL)
340         ds->source = casewriter_make_reader (ds->sink);
341     }
342   else
343     {
344       ds->source = NULL;
345       ds->discard_output = false;
346     }
347   ds->sink = NULL;
348
349   caseinit_clear (ds->caseinit);
350   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
351
352   dict_clear_vectors (ds->dict);
353   ds->permanent_dict = NULL;
354   return proc_cancel_all_transformations (ds) && ds->ok;
355 }
356
357 /* Casereader class for procedure execution. */
358 static const struct casereader_class proc_casereader_class =
359   {
360     proc_casereader_read,
361     proc_casereader_destroy,
362     NULL,
363     NULL,
364   };
365
366 /* Updates last_proc_invocation. */
367 static void
368 update_last_proc_invocation (struct dataset *ds)
369 {
370   ds->last_proc_invocation = time (NULL);
371 }
372 \f
373 /* Returns a pointer to the lagged case from N_BEFORE cases before the
374    current one, or NULL if there haven't been that many cases yet. */
375 const struct ccase *
376 lagged_case (const struct dataset *ds, int n_before)
377 {
378   assert (n_before >= 1);
379   assert (n_before <= ds->n_lag);
380
381   if (n_before <= deque_count (&ds->lag))
382     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
383   else
384     return NULL;
385 }
386 \f
387 /* Returns the current set of permanent transformations,
388    and clears the permanent transformations.
389    For use by INPUT PROGRAM. */
390 struct trns_chain *
391 proc_capture_transformations (struct dataset *ds)
392 {
393   struct trns_chain *chain;
394
395   assert (ds->temporary_trns_chain == NULL);
396   chain = ds->permanent_trns_chain;
397   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
398
399   if ( ds->xform_callback)
400     ds->xform_callback (false, ds->xform_callback_aux);
401
402   return chain;
403 }
404
405 /* Adds a transformation that processes a case with PROC and
406    frees itself with FREE to the current set of transformations.
407    The functions are passed AUX as auxiliary data. */
408 void
409 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
410 {
411   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
412   if ( ds->xform_callback)
413     ds->xform_callback (true, ds->xform_callback_aux);
414 }
415
416 /* Adds a transformation that processes a case with PROC and
417    frees itself with FREE to the current set of transformations.
418    When parsing of the block of transformations is complete,
419    FINALIZE will be called.
420    The functions are passed AUX as auxiliary data. */
421 void
422 add_transformation_with_finalizer (struct dataset *ds,
423                                    trns_finalize_func *finalize,
424                                    trns_proc_func *proc,
425                                    trns_free_func *free, void *aux)
426 {
427   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
428
429   if ( ds->xform_callback)
430     ds->xform_callback (true, ds->xform_callback_aux);
431 }
432
433 /* Returns the index of the next transformation.
434    This value can be returned by a transformation procedure
435    function to indicate a "jump" to that transformation. */
436 size_t
437 next_transformation (const struct dataset *ds)
438 {
439   return trns_chain_next (ds->cur_trns_chain);
440 }
441
442 /* Returns true if the next call to add_transformation() will add
443    a temporary transformation, false if it will add a permanent
444    transformation. */
445 bool
446 proc_in_temporary_transformations (const struct dataset *ds)
447 {
448   return ds->temporary_trns_chain != NULL;
449 }
450
451 /* Marks the start of temporary transformations.
452    Further calls to add_transformation() will add temporary
453    transformations. */
454 void
455 proc_start_temporary_transformations (struct dataset *ds)
456 {
457   if (!proc_in_temporary_transformations (ds))
458     {
459       add_case_limit_trns (ds);
460
461       ds->permanent_dict = dict_clone (ds->dict);
462
463       trns_chain_finalize (ds->permanent_trns_chain);
464       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
465
466       if ( ds->xform_callback)
467         ds->xform_callback (true, ds->xform_callback_aux);
468     }
469 }
470
471 /* Converts all the temporary transformations, if any, to
472    permanent transformations.  Further transformations will be
473    permanent.
474    Returns true if anything changed, false otherwise. */
475 bool
476 proc_make_temporary_transformations_permanent (struct dataset *ds)
477 {
478   if (proc_in_temporary_transformations (ds))
479     {
480       trns_chain_finalize (ds->temporary_trns_chain);
481       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
482       ds->temporary_trns_chain = NULL;
483
484       dict_destroy (ds->permanent_dict);
485       ds->permanent_dict = NULL;
486
487       return true;
488     }
489   else
490     return false;
491 }
492
493 /* Cancels all temporary transformations, if any.  Further
494    transformations will be permanent.
495    Returns true if anything changed, false otherwise. */
496 bool
497 proc_cancel_temporary_transformations (struct dataset *ds)
498 {
499   if (proc_in_temporary_transformations (ds))
500     {
501       dict_destroy (ds->dict);
502       ds->dict = ds->permanent_dict;
503       ds->permanent_dict = NULL;
504
505       trns_chain_destroy (ds->temporary_trns_chain);
506       ds->temporary_trns_chain = NULL;
507
508       if ( ds->xform_callback)
509         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
510                             ds->xform_callback_aux);
511
512       return true;
513     }
514   else
515     return false;
516 }
517
518 /* Cancels all transformations, if any.
519    Returns true if successful, false on I/O error. */
520 bool
521 proc_cancel_all_transformations (struct dataset *ds)
522 {
523   bool ok;
524   assert (ds->proc_state == PROC_COMMITTED);
525   ok = trns_chain_destroy (ds->permanent_trns_chain);
526   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
527   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
528   ds->temporary_trns_chain = NULL;
529   if ( ds->xform_callback)
530     ds->xform_callback (false, ds->xform_callback_aux);
531
532   return ok;
533 }
534 \f
535
536 static void
537 dict_callback (struct dictionary *d UNUSED, void *ds_)
538 {
539   struct dataset *ds = ds_;
540   dataset_set_unsaved (ds);
541 }
542
543 /* Initializes procedure handling. */
544 struct dataset *
545 create_dataset (void)
546 {
547   struct dataset *ds = xzalloc (sizeof(*ds));
548   ds->dict = dict_create ();
549
550   dict_set_change_callback (ds->dict, dict_callback, ds);
551
552   dict_set_encoding (ds->dict, get_default_encoding ());
553
554   ds->caseinit = caseinit_create ();
555   proc_cancel_all_transformations (ds);
556   return ds;
557 }
558
559
560 void
561 dataset_add_transform_change_callback (struct dataset *ds,
562                                        transformation_change_callback_func *cb,
563                                        void *aux)
564 {
565   ds->xform_callback = cb;
566   ds->xform_callback_aux = aux;
567 }
568
569 /* Finishes up procedure handling. */
570 void
571 destroy_dataset (struct dataset *ds)
572 {
573   proc_discard_active_file (ds);
574   dict_destroy (ds->dict);
575   caseinit_destroy (ds->caseinit);
576   trns_chain_destroy (ds->permanent_trns_chain);
577
578   if ( ds->xform_callback)
579     ds->xform_callback (false, ds->xform_callback_aux);
580   free (ds);
581 }
582
583 /* Causes output from the next procedure to be discarded, instead
584    of being preserved for use as input for the next procedure. */
585 void
586 proc_discard_output (struct dataset *ds)
587 {
588   ds->discard_output = true;
589 }
590
591 /* Discards the active file dictionary, data, and
592    transformations. */
593 void
594 proc_discard_active_file (struct dataset *ds)
595 {
596   assert (ds->proc_state == PROC_COMMITTED);
597
598   dict_clear (ds->dict);
599   fh_set_default_handle (NULL);
600
601   ds->n_lag = 0;
602
603   casereader_destroy (ds->source);
604   ds->source = NULL;
605
606   proc_cancel_all_transformations (ds);
607 }
608
609 /* Sets SOURCE as the source for procedure input for the next
610    procedure. */
611 void
612 proc_set_active_file (struct dataset *ds,
613                       struct casereader *source,
614                       struct dictionary *dict)
615 {
616   assert (ds->proc_state == PROC_COMMITTED);
617   assert (ds->dict != dict);
618
619   proc_discard_active_file (ds);
620
621   dict_destroy (ds->dict);
622   ds->dict = dict;
623   dict_set_change_callback (ds->dict, dict_callback, ds);
624
625   proc_set_active_file_data (ds, source);
626 }
627
628 /* Replaces the active file's data by READER without replacing
629    the associated dictionary. */
630 bool
631 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
632 {
633   casereader_destroy (ds->source);
634   ds->source = reader;
635
636   caseinit_clear (ds->caseinit);
637   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
638
639   return reader == NULL || !casereader_error (reader);
640 }
641
642 /* Returns true if an active file data source is available, false
643    otherwise. */
644 bool
645 proc_has_active_file (const struct dataset *ds)
646 {
647   return ds->source != NULL;
648 }
649
650 /* Returns the active file data source from DS, or a null pointer
651    if DS has no data source, and removes it from DS. */
652 struct casereader *
653 proc_extract_active_file_data (struct dataset *ds)
654 {
655   struct casereader *reader = ds->source;
656   ds->source = NULL;
657
658   return reader;
659 }
660
661 /* Checks whether DS has a corrupted active file.  If so,
662    discards it and returns false.  If not, returns true without
663    doing anything. */
664 bool
665 dataset_end_of_command (struct dataset *ds)
666 {
667   if (ds->source != NULL)
668     {
669       if (casereader_error (ds->source))
670         {
671           proc_discard_active_file (ds);
672           return false;
673         }
674       else
675         {
676           const struct taint *taint = casereader_get_taint (ds->source);
677           taint_reset_successor_taint ((struct taint *) taint);
678           assert (!taint_has_tainted_successor (taint));
679         }
680     }
681   return true;
682 }
683 \f
684 static trns_proc_func case_limit_trns_proc;
685 static trns_free_func case_limit_trns_free;
686
687 /* Adds a transformation that limits the number of cases that may
688    pass through, if DS->DICT has a case limit. */
689 static void
690 add_case_limit_trns (struct dataset *ds)
691 {
692   casenumber case_limit = dict_get_case_limit (ds->dict);
693   if (case_limit != 0)
694     {
695       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
696       *cases_remaining = case_limit;
697       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
698                           cases_remaining);
699       dict_set_case_limit (ds->dict, 0);
700     }
701 }
702
703 /* Limits the maximum number of cases processed to
704    *CASES_REMAINING. */
705 static int
706 case_limit_trns_proc (void *cases_remaining_,
707                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
708 {
709   size_t *cases_remaining = cases_remaining_;
710   if (*cases_remaining > 0)
711     {
712       (*cases_remaining)--;
713       return TRNS_CONTINUE;
714     }
715   else
716     return TRNS_DROP_CASE;
717 }
718
719 /* Frees the data associated with a case limit transformation. */
720 static bool
721 case_limit_trns_free (void *cases_remaining_)
722 {
723   size_t *cases_remaining = cases_remaining_;
724   free (cases_remaining);
725   return true;
726 }
727 \f
728 static trns_proc_func filter_trns_proc;
729
730 /* Adds a temporary transformation to filter data according to
731    the variable specified on FILTER, if any. */
732 static void
733 add_filter_trns (struct dataset *ds)
734 {
735   struct variable *filter_var = dict_get_filter (ds->dict);
736   if (filter_var != NULL)
737     {
738       proc_start_temporary_transformations (ds);
739       add_transformation (ds, filter_trns_proc, NULL, filter_var);
740     }
741 }
742
743 /* FILTER transformation. */
744 static int
745 filter_trns_proc (void *filter_var_,
746                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
747
748 {
749   struct variable *filter_var = filter_var_;
750   double f = case_num (*c, filter_var);
751   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
752           ? TRNS_CONTINUE : TRNS_DROP_CASE);
753 }
754
755
756 struct dictionary *
757 dataset_dict (const struct dataset *ds)
758 {
759   return ds->dict;
760 }
761
762 const struct casereader *
763 dataset_source (const struct dataset *ds)
764 {
765   return ds->source;
766 }
767
768 void
769 dataset_need_lag (struct dataset *ds, int n_before)
770 {
771   ds->n_lag = MAX (ds->n_lag, n_before);
772 }