* psppire-dict.c (psppire_dict_dump): Don't use
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40
41 struct dataset {
42   /* Cases are read from source,
43      their transformation variables are initialized,
44      pass through permanent_trns_chain (which transforms them into
45      the format described by permanent_dict),
46      are written to sink,
47      pass through temporary_trns_chain (which transforms them into
48      the format described by dict),
49      and are finally passed to the procedure. */
50   struct casereader *source;
51   struct caseinit *caseinit;
52   struct trns_chain *permanent_trns_chain;
53   struct dictionary *permanent_dict;
54   struct casewriter *sink;
55   struct trns_chain *temporary_trns_chain;
56   struct dictionary *dict;
57
58   /* Callback which occurs when a procedure provides a new source for
59      the dataset */
60   replace_source_callback *replace_source ;
61
62   /* Callback which occurs whenever the DICT is replaced by a new one */
63   replace_dictionary_callback *replace_dict;
64
65   /* Callback which occurs whenever the transformation chain(s) have
66      been modified */
67   transformation_change_callback_func *xform_callback;
68   void *xform_callback_aux;
69
70   /* If true, cases are discarded instead of being written to
71      sink. */
72   bool discard_output;
73
74   /* The transformation chain that the next transformation will be
75      added to. */
76   struct trns_chain *cur_trns_chain;
77
78   /* The compactor used to compact a case, if necessary;
79      otherwise a null pointer. */
80   struct dict_compactor *compactor;
81
82   /* Time at which proc was last invoked. */
83   time_t last_proc_invocation;
84
85   /* Cases just before ("lagging") the current one. */
86   int n_lag;                    /* Number of cases to lag. */
87   struct deque lag;             /* Deque of lagged cases. */
88   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
89
90   /* Procedure data. */
91   enum
92     {
93       PROC_COMMITTED,           /* No procedure in progress. */
94       PROC_OPEN,                /* proc_open called, casereader still open. */
95       PROC_CLOSED               /* casereader from proc_open destroyed,
96                                    but proc_commit not yet called. */
97     }
98   proc_state;
99   casenumber cases_written;       /* Cases output so far. */
100   bool ok;                    /* Error status. */
101 }; /* struct dataset */
102
103
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
106
107 static void update_last_proc_invocation (struct dataset *ds);
108 \f
109 /* Public functions. */
110
111 /* Returns the last time the data was read. */
112 time_t
113 time_of_last_procedure (struct dataset *ds)
114 {
115   if (ds->last_proc_invocation == 0)
116     update_last_proc_invocation (ds);
117   return ds->last_proc_invocation;
118 }
119 \f
120 /* Regular procedure. */
121
122 /* Executes any pending transformations, if necessary.
123    This is not identical to the EXECUTE command in that it won't
124    always read the source data.  This can be important when the
125    source data is given inline within BEGIN DATA...END FILE. */
126 bool
127 proc_execute (struct dataset *ds)
128 {
129   bool ok;
130
131   if ((ds->temporary_trns_chain == NULL
132        || trns_chain_is_empty (ds->temporary_trns_chain))
133       && trns_chain_is_empty (ds->permanent_trns_chain))
134     {
135       ds->n_lag = 0;
136       ds->discard_output = false;
137       dict_set_case_limit (ds->dict, 0);
138       dict_clear_vectors (ds->dict);
139       return true;
140     }
141
142   ok = casereader_destroy (proc_open (ds));
143   return proc_commit (ds) && ok;
144 }
145
146 static struct casereader_class proc_casereader_class;
147
148 /* Opens dataset DS for reading cases with proc_read.
149    proc_commit must be called when done. */
150 struct casereader *
151 proc_open (struct dataset *ds)
152 {
153   assert (ds->source != NULL);
154   assert (ds->proc_state == PROC_COMMITTED);
155
156   update_last_proc_invocation (ds);
157
158   caseinit_mark_for_init (ds->caseinit, ds->dict);
159
160   /* Finish up the collection of transformations. */
161   add_case_limit_trns (ds);
162   add_filter_trns (ds);
163   trns_chain_finalize (ds->cur_trns_chain);
164
165   /* Make permanent_dict refer to the dictionary right before
166      data reaches the sink. */
167   if (ds->permanent_dict == NULL)
168     ds->permanent_dict = ds->dict;
169
170   /* Prepare sink. */
171   if (!ds->discard_output)
172     {
173       struct dictionary *pd = ds->permanent_dict;
174       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
175       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
176       ds->compactor = (should_compact
177                        ? dict_make_compactor (pd, 1u << DC_SCRATCH)
178                        : NULL);
179       ds->sink = autopaging_writer_create (compacted_value_cnt);
180     }
181   else
182     {
183       ds->compactor = NULL;
184       ds->sink = NULL;
185     }
186
187   /* Allocate memory for lagged cases. */
188   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
189
190   ds->proc_state = PROC_OPEN;
191   ds->cases_written = 0;
192   ds->ok = true;
193
194   /* FIXME: use taint in dataset in place of `ok'? */
195   /* FIXME: for trivial cases we can just return a clone of
196      ds->source? */
197   return casereader_create_sequential (NULL,
198                                        dict_get_next_value_idx (ds->dict),
199                                        CASENUMBER_MAX,
200                                        &proc_casereader_class, ds);
201 }
202
203 /* Returns true if a procedure is in progress, that is, if
204    proc_open has been called but proc_commit has not. */
205 bool
206 proc_is_open (const struct dataset *ds)
207 {
208   return ds->proc_state != PROC_COMMITTED;
209 }
210
211 /* "read" function for procedure casereader. */
212 static bool
213 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
214                       struct ccase *c)
215 {
216   struct dataset *ds = ds_;
217   enum trns_result retval = TRNS_DROP_CASE;
218
219   assert (ds->proc_state == PROC_OPEN);
220   for (;;)
221     {
222       casenumber case_nr;
223
224       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
225       if (retval == TRNS_ERROR)
226         ds->ok = false;
227       if (!ds->ok)
228         return false;
229
230       /* Read a case from source. */
231       if (!casereader_read (ds->source, c))
232         return false;
233       case_resize (c, dict_get_next_value_idx (ds->dict));
234       caseinit_init_vars (ds->caseinit, c);
235
236       /* Execute permanent transformations.  */
237       case_nr = ds->cases_written + 1;
238       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
239                                    c, case_nr);
240       caseinit_update_left_vars (ds->caseinit, c);
241       if (retval != TRNS_CONTINUE)
242         {
243           case_destroy (c);
244           continue;
245         }
246
247       /* Write case to collection of lagged cases. */
248       if (ds->n_lag > 0)
249         {
250           while (deque_count (&ds->lag) >= ds->n_lag)
251             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
252           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
253         }
254
255       /* Write case to replacement active file. */
256       ds->cases_written++;
257       if (ds->sink != NULL)
258         {
259           struct ccase tmp;
260           if (ds->compactor != NULL)
261             {
262               case_create (&tmp, casewriter_get_value_cnt (ds->sink));
263               dict_compactor_compact (ds->compactor, &tmp, c);
264             }
265           else
266             case_clone (&tmp, c);
267           casewriter_write (ds->sink, &tmp);
268         }
269
270       /* Execute temporary transformations. */
271       if (ds->temporary_trns_chain != NULL)
272         {
273           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
274                                        c, ds->cases_written);
275           if (retval != TRNS_CONTINUE)
276             {
277               case_destroy (c);
278               continue;
279             }
280         }
281
282       return true;
283     }
284 }
285
286 /* "destroy" function for procedure casereader. */
287 static void
288 proc_casereader_destroy (struct casereader *reader, void *ds_)
289 {
290   struct dataset *ds = ds_;
291   struct ccase c;
292
293   /* Make sure transformations happen for every input case, in
294      case they have side effects, and ensure that the replacement
295      active file gets all the cases it should. */
296   while (casereader_read (reader, &c))
297     case_destroy (&c);
298
299   ds->proc_state = PROC_CLOSED;
300   ds->ok = casereader_destroy (ds->source) && ds->ok;
301   ds->source = NULL;
302   proc_set_active_file_data (ds, NULL);
303 }
304
305 /* Must return false if the source casereader, a transformation,
306    or the sink casewriter signaled an error.  (If a temporary
307    transformation signals an error, then the return value is
308    false, but the replacement active file may still be
309    untainted.) */
310 bool
311 proc_commit (struct dataset *ds)
312 {
313   assert (ds->proc_state == PROC_CLOSED);
314   ds->proc_state = PROC_COMMITTED;
315
316   /* Free memory for lagged cases. */
317   while (!deque_is_empty (&ds->lag))
318     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
319   free (ds->lag_cases);
320
321   /* Dictionary from before TEMPORARY becomes permanent. */
322   proc_cancel_temporary_transformations (ds);
323
324   if (!ds->discard_output)
325     {
326       /* Finish compacting. */
327       if (ds->compactor != NULL)
328         {
329           dict_compactor_destroy (ds->compactor);
330           ds->compactor = NULL;
331
332           dict_delete_scratch_vars (ds->dict);
333           dict_compact_values (ds->dict);
334         }
335
336       /* Old data sink becomes new data source. */
337       if (ds->sink != NULL)
338         ds->source = casewriter_make_reader (ds->sink);
339     }
340   else
341     {
342       ds->source = NULL;
343       ds->discard_output = false;
344     }
345   ds->sink = NULL;
346   if ( ds->replace_source) ds->replace_source (ds->source);
347
348   caseinit_clear (ds->caseinit);
349   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
350
351   dict_clear_vectors (ds->dict);
352   ds->permanent_dict = NULL;
353   return proc_cancel_all_transformations (ds) && ds->ok;
354 }
355
356 /* Casereader class for procedure execution. */
357 static struct casereader_class proc_casereader_class =
358   {
359     proc_casereader_read,
360     proc_casereader_destroy,
361     NULL,
362     NULL,
363   };
364
365 /* Updates last_proc_invocation. */
366 static void
367 update_last_proc_invocation (struct dataset *ds)
368 {
369   ds->last_proc_invocation = time (NULL);
370 }
371 \f
372 /* Returns a pointer to the lagged case from N_BEFORE cases before the
373    current one, or NULL if there haven't been that many cases yet. */
374 struct ccase *
375 lagged_case (const struct dataset *ds, int n_before)
376 {
377   assert (n_before >= 1);
378   assert (n_before <= ds->n_lag);
379
380   if (n_before <= deque_count (&ds->lag))
381     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
382   else
383     return NULL;
384 }
385 \f
386 /* Returns the current set of permanent transformations,
387    and clears the permanent transformations.
388    For use by INPUT PROGRAM. */
389 struct trns_chain *
390 proc_capture_transformations (struct dataset *ds)
391 {
392   struct trns_chain *chain;
393
394   assert (ds->temporary_trns_chain == NULL);
395   chain = ds->permanent_trns_chain;
396   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
397
398   if ( ds->xform_callback)
399     ds->xform_callback (false, ds->xform_callback_aux);
400
401   return chain;
402 }
403
404 /* Adds a transformation that processes a case with PROC and
405    frees itself with FREE to the current set of transformations.
406    The functions are passed AUX as auxiliary data. */
407 void
408 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
409 {
410   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
411   if ( ds->xform_callback)
412     ds->xform_callback (true, ds->xform_callback_aux);
413 }
414
415 /* Adds a transformation that processes a case with PROC and
416    frees itself with FREE to the current set of transformations.
417    When parsing of the block of transformations is complete,
418    FINALIZE will be called.
419    The functions are passed AUX as auxiliary data. */
420 void
421 add_transformation_with_finalizer (struct dataset *ds,
422                                    trns_finalize_func *finalize,
423                                    trns_proc_func *proc,
424                                    trns_free_func *free, void *aux)
425 {
426   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
427
428   if ( ds->xform_callback)
429     ds->xform_callback (true, ds->xform_callback_aux);
430 }
431
432 /* Returns the index of the next transformation.
433    This value can be returned by a transformation procedure
434    function to indicate a "jump" to that transformation. */
435 size_t
436 next_transformation (const struct dataset *ds)
437 {
438   return trns_chain_next (ds->cur_trns_chain);
439 }
440
441 /* Returns true if the next call to add_transformation() will add
442    a temporary transformation, false if it will add a permanent
443    transformation. */
444 bool
445 proc_in_temporary_transformations (const struct dataset *ds)
446 {
447   return ds->temporary_trns_chain != NULL;
448 }
449
450 /* Marks the start of temporary transformations.
451    Further calls to add_transformation() will add temporary
452    transformations. */
453 void
454 proc_start_temporary_transformations (struct dataset *ds)
455 {
456   if (!proc_in_temporary_transformations (ds))
457     {
458       add_case_limit_trns (ds);
459
460       ds->permanent_dict = dict_clone (ds->dict);
461
462       trns_chain_finalize (ds->permanent_trns_chain);
463       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
464
465       if ( ds->xform_callback)
466         ds->xform_callback (true, ds->xform_callback_aux);
467     }
468 }
469
470 /* Converts all the temporary transformations, if any, to
471    permanent transformations.  Further transformations will be
472    permanent.
473    Returns true if anything changed, false otherwise. */
474 bool
475 proc_make_temporary_transformations_permanent (struct dataset *ds)
476 {
477   if (proc_in_temporary_transformations (ds))
478     {
479       trns_chain_finalize (ds->temporary_trns_chain);
480       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
481       ds->temporary_trns_chain = NULL;
482
483       dict_destroy (ds->permanent_dict);
484       ds->permanent_dict = NULL;
485
486       return true;
487     }
488   else
489     return false;
490 }
491
492 /* Cancels all temporary transformations, if any.  Further
493    transformations will be permanent.
494    Returns true if anything changed, false otherwise. */
495 bool
496 proc_cancel_temporary_transformations (struct dataset *ds)
497 {
498   if (proc_in_temporary_transformations (ds))
499     {
500       dict_destroy (ds->dict);
501       ds->dict = ds->permanent_dict;
502       ds->permanent_dict = NULL;
503       if (ds->replace_dict) ds->replace_dict (ds->dict);
504
505       trns_chain_destroy (ds->temporary_trns_chain);
506       ds->temporary_trns_chain = NULL;
507
508       if ( ds->xform_callback)
509         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
510                             ds->xform_callback_aux);
511
512       return true;
513     }
514   else
515     return false;
516 }
517
518 /* Cancels all transformations, if any.
519    Returns true if successful, false on I/O error. */
520 bool
521 proc_cancel_all_transformations (struct dataset *ds)
522 {
523   bool ok;
524   assert (ds->proc_state == PROC_COMMITTED);
525   ok = trns_chain_destroy (ds->permanent_trns_chain);
526   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
527   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
528   ds->temporary_trns_chain = NULL;
529   if ( ds->xform_callback)
530     ds->xform_callback (false, ds->xform_callback_aux);
531
532   return ok;
533 }
534 \f
535 /* Initializes procedure handling. */
536 struct dataset *
537 create_dataset (transformation_change_callback_func *cb, void *aux)
538 {
539   struct dataset *ds = xzalloc (sizeof(*ds));
540   ds->dict = dict_create ();
541   ds->caseinit = caseinit_create ();
542   ds->xform_callback = cb;
543   ds->xform_callback_aux = aux;
544   proc_cancel_all_transformations (ds);
545   return ds;
546 }
547
548
549 void
550 dataset_add_transform_change_callback (struct dataset *ds,
551                                        transformation_change_callback_func *cb,
552                                        void *aux)
553 {
554   ds->xform_callback = cb;
555   ds->xform_callback_aux = aux;
556 }
557
558 /* Finishes up procedure handling. */
559 void
560 destroy_dataset (struct dataset *ds)
561 {
562   proc_discard_active_file (ds);
563   dict_destroy (ds->dict);
564   caseinit_destroy (ds->caseinit);
565   trns_chain_destroy (ds->permanent_trns_chain);
566
567   if ( ds->xform_callback)
568     ds->xform_callback (false, ds->xform_callback_aux);
569   free (ds);
570 }
571
572 /* Causes output from the next procedure to be discarded, instead
573    of being preserved for use as input for the next procedure. */
574 void
575 proc_discard_output (struct dataset *ds)
576 {
577   ds->discard_output = true;
578 }
579
580 /* Discards the active file dictionary, data, and
581    transformations. */
582 void
583 proc_discard_active_file (struct dataset *ds)
584 {
585   assert (ds->proc_state == PROC_COMMITTED);
586
587   dict_clear (ds->dict);
588   fh_set_default_handle (NULL);
589
590   ds->n_lag = 0;
591
592   casereader_destroy (ds->source);
593   ds->source = NULL;
594   if ( ds->replace_source) ds->replace_source (NULL);
595
596   proc_cancel_all_transformations (ds);
597 }
598
599 /* Sets SOURCE as the source for procedure input for the next
600    procedure. */
601 void
602 proc_set_active_file (struct dataset *ds,
603                       struct casereader *source,
604                       struct dictionary *dict)
605 {
606   assert (ds->proc_state == PROC_COMMITTED);
607   assert (ds->dict != dict);
608
609   proc_discard_active_file (ds);
610
611   dict_destroy (ds->dict);
612   ds->dict = dict;
613   if ( ds->replace_dict) ds->replace_dict (dict);
614
615   proc_set_active_file_data (ds, source);
616 }
617
618 /* Replaces the active file's data by READER without replacing
619    the associated dictionary. */
620 bool
621 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
622 {
623   casereader_destroy (ds->source);
624   ds->source = reader;
625   if (ds->replace_source) ds->replace_source (reader);
626
627   caseinit_clear (ds->caseinit);
628   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
629
630   return reader == NULL || !casereader_error (reader);
631 }
632
633 /* Returns true if an active file data source is available, false
634    otherwise. */
635 bool
636 proc_has_active_file (const struct dataset *ds)
637 {
638   return ds->source != NULL;
639 }
640
641 /* Checks whether DS has a corrupted active file.  If so,
642    discards it and returns false.  If not, returns true without
643    doing anything. */
644 bool
645 dataset_end_of_command (struct dataset *ds)
646 {
647   if (ds->source != NULL)
648     {
649       if (casereader_error (ds->source))
650         {
651           proc_discard_active_file (ds);
652           return false;
653         }
654       else
655         {
656           const struct taint *taint = casereader_get_taint (ds->source);
657           taint_reset_successor_taint ((struct taint *) taint);
658           assert (!taint_has_tainted_successor (taint));
659         }
660     }
661   return true;
662 }
663 \f
664 static trns_proc_func case_limit_trns_proc;
665 static trns_free_func case_limit_trns_free;
666
667 /* Adds a transformation that limits the number of cases that may
668    pass through, if DS->DICT has a case limit. */
669 static void
670 add_case_limit_trns (struct dataset *ds)
671 {
672   size_t case_limit = dict_get_case_limit (ds->dict);
673   if (case_limit != 0)
674     {
675       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
676       *cases_remaining = case_limit;
677       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
678                           cases_remaining);
679       dict_set_case_limit (ds->dict, 0);
680     }
681 }
682
683 /* Limits the maximum number of cases processed to
684    *CASES_REMAINING. */
685 static int
686 case_limit_trns_proc (void *cases_remaining_,
687                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
688 {
689   size_t *cases_remaining = cases_remaining_;
690   if (*cases_remaining > 0)
691     {
692       (*cases_remaining)--;
693       return TRNS_CONTINUE;
694     }
695   else
696     return TRNS_DROP_CASE;
697 }
698
699 /* Frees the data associated with a case limit transformation. */
700 static bool
701 case_limit_trns_free (void *cases_remaining_)
702 {
703   size_t *cases_remaining = cases_remaining_;
704   free (cases_remaining);
705   return true;
706 }
707 \f
708 static trns_proc_func filter_trns_proc;
709
710 /* Adds a temporary transformation to filter data according to
711    the variable specified on FILTER, if any. */
712 static void
713 add_filter_trns (struct dataset *ds)
714 {
715   struct variable *filter_var = dict_get_filter (ds->dict);
716   if (filter_var != NULL)
717     {
718       proc_start_temporary_transformations (ds);
719       add_transformation (ds, filter_trns_proc, NULL, filter_var);
720     }
721 }
722
723 /* FILTER transformation. */
724 static int
725 filter_trns_proc (void *filter_var_,
726                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
727
728 {
729   struct variable *filter_var = filter_var_;
730   double f = case_num (c, filter_var);
731   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
732           ? TRNS_CONTINUE : TRNS_DROP_CASE);
733 }
734
735
736 struct dictionary *
737 dataset_dict (const struct dataset *ds)
738 {
739   return ds->dict;
740 }
741
742 const struct casereader *
743 dataset_source (const struct dataset *ds)
744 {
745   return ds->source;
746 }
747
748 void
749 dataset_need_lag (struct dataset *ds, int n_before)
750 {
751   ds->n_lag = MAX (ds->n_lag, n_before);
752 }