1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
42 /* Cases are read from source,
43 their transformation variables are initialized,
44 pass through permanent_trns_chain (which transforms them into
45 the format described by permanent_dict),
47 pass through temporary_trns_chain (which transforms them into
48 the format described by dict),
49 and are finally passed to the procedure. */
50 struct casereader *source;
51 struct caseinit *caseinit;
52 struct trns_chain *permanent_trns_chain;
53 struct dictionary *permanent_dict;
54 struct casewriter *sink;
55 struct trns_chain *temporary_trns_chain;
56 struct dictionary *dict;
58 /* Callback which occurs when a procedure provides a new source for
60 replace_source_callback *replace_source ;
62 /* Callback which occurs whenever the DICT is replaced by a new one */
63 replace_dictionary_callback *replace_dict;
65 /* Callback which occurs whenever the transformation chain(s) have
67 transformation_change_callback_func *xform_callback;
68 void *xform_callback_aux;
70 /* If true, cases are discarded instead of being written to
74 /* The transformation chain that the next transformation will be
76 struct trns_chain *cur_trns_chain;
78 /* The compactor used to compact a case, if necessary;
79 otherwise a null pointer. */
80 struct dict_compactor *compactor;
82 /* Time at which proc was last invoked. */
83 time_t last_proc_invocation;
85 /* Cases just before ("lagging") the current one. */
86 int n_lag; /* Number of cases to lag. */
87 struct deque lag; /* Deque of lagged cases. */
88 struct ccase *lag_cases; /* Lagged cases managed by deque. */
93 PROC_COMMITTED, /* No procedure in progress. */
94 PROC_OPEN, /* proc_open called, casereader still open. */
95 PROC_CLOSED /* casereader from proc_open destroyed,
96 but proc_commit not yet called. */
99 casenumber cases_written; /* Cases output so far. */
100 bool ok; /* Error status. */
101 }; /* struct dataset */
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
107 static void update_last_proc_invocation (struct dataset *ds);
109 /* Public functions. */
111 /* Returns the last time the data was read. */
113 time_of_last_procedure (struct dataset *ds)
115 if (ds->last_proc_invocation == 0)
116 update_last_proc_invocation (ds);
117 return ds->last_proc_invocation;
120 /* Regular procedure. */
122 /* Executes any pending transformations, if necessary.
123 This is not identical to the EXECUTE command in that it won't
124 always read the source data. This can be important when the
125 source data is given inline within BEGIN DATA...END FILE. */
127 proc_execute (struct dataset *ds)
131 if ((ds->temporary_trns_chain == NULL
132 || trns_chain_is_empty (ds->temporary_trns_chain))
133 && trns_chain_is_empty (ds->permanent_trns_chain))
136 ds->discard_output = false;
137 dict_set_case_limit (ds->dict, 0);
138 dict_clear_vectors (ds->dict);
142 ok = casereader_destroy (proc_open (ds));
143 return proc_commit (ds) && ok;
146 static struct casereader_class proc_casereader_class;
148 /* Opens dataset DS for reading cases with proc_read.
149 proc_commit must be called when done. */
151 proc_open (struct dataset *ds)
153 assert (ds->source != NULL);
154 assert (ds->proc_state == PROC_COMMITTED);
156 update_last_proc_invocation (ds);
158 caseinit_mark_for_init (ds->caseinit, ds->dict);
160 /* Finish up the collection of transformations. */
161 add_case_limit_trns (ds);
162 add_filter_trns (ds);
163 trns_chain_finalize (ds->cur_trns_chain);
165 /* Make permanent_dict refer to the dictionary right before
166 data reaches the sink. */
167 if (ds->permanent_dict == NULL)
168 ds->permanent_dict = ds->dict;
171 if (!ds->discard_output)
173 ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
174 ? dict_make_compactor (ds->permanent_dict)
176 ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
177 ds->permanent_dict));
181 ds->compactor = NULL;
185 /* Allocate memory for lagged cases. */
186 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
188 ds->proc_state = PROC_OPEN;
189 ds->cases_written = 0;
192 /* FIXME: use taint in dataset in place of `ok'? */
193 /* FIXME: for trivial cases we can just return a clone of
195 return casereader_create_sequential (NULL,
196 dict_get_next_value_idx (ds->dict),
198 &proc_casereader_class, ds);
201 /* Returns true if a procedure is in progress, that is, if
202 proc_open has been called but proc_commit has not. */
204 proc_is_open (const struct dataset *ds)
206 return ds->proc_state != PROC_COMMITTED;
209 /* "read" function for procedure casereader. */
211 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
214 struct dataset *ds = ds_;
215 enum trns_result retval = TRNS_DROP_CASE;
217 assert (ds->proc_state == PROC_OPEN);
222 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
223 if (retval == TRNS_ERROR)
228 /* Read a case from source. */
229 if (!casereader_read (ds->source, c))
231 case_resize (c, dict_get_next_value_idx (ds->dict));
232 caseinit_init_vars (ds->caseinit, c);
234 /* Execute permanent transformations. */
235 case_nr = ds->cases_written + 1;
236 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
238 caseinit_update_left_vars (ds->caseinit, c);
239 if (retval != TRNS_CONTINUE)
245 /* Write case to collection of lagged cases. */
248 while (deque_count (&ds->lag) >= ds->n_lag)
249 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
250 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
253 /* Write case to replacement active file. */
255 if (ds->sink != NULL)
258 if (ds->compactor != NULL)
260 case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
261 dict_compactor_compact (ds->compactor, &tmp, c);
264 case_clone (&tmp, c);
265 casewriter_write (ds->sink, &tmp);
268 /* Execute temporary transformations. */
269 if (ds->temporary_trns_chain != NULL)
271 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
272 c, ds->cases_written);
273 if (retval != TRNS_CONTINUE)
284 /* "destroy" function for procedure casereader. */
286 proc_casereader_destroy (struct casereader *reader, void *ds_)
288 struct dataset *ds = ds_;
291 /* Make sure transformations happen for every input case, in
292 case they have side effects, and ensure that the replacement
293 active file gets all the cases it should. */
294 while (casereader_read (reader, &c))
297 ds->proc_state = PROC_CLOSED;
298 ds->ok = casereader_destroy (ds->source) && ds->ok;
300 proc_set_active_file_data (ds, NULL);
303 /* Must return false if the source casereader, a transformation,
304 or the sink casewriter signaled an error. (If a temporary
305 transformation signals an error, then the return value is
306 false, but the replacement active file may still be
309 proc_commit (struct dataset *ds)
311 assert (ds->proc_state == PROC_CLOSED);
312 ds->proc_state = PROC_COMMITTED;
314 /* Free memory for lagged cases. */
315 while (!deque_is_empty (&ds->lag))
316 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
317 free (ds->lag_cases);
319 /* Dictionary from before TEMPORARY becomes permanent. */
320 proc_cancel_temporary_transformations (ds);
322 if (!ds->discard_output)
324 /* Finish compacting. */
325 if (ds->compactor != NULL)
327 dict_compactor_destroy (ds->compactor);
328 dict_compact_values (ds->dict);
329 ds->compactor = NULL;
332 /* Old data sink becomes new data source. */
333 if (ds->sink != NULL)
334 ds->source = casewriter_make_reader (ds->sink);
339 ds->discard_output = false;
342 if ( ds->replace_source) ds->replace_source (ds->source);
344 caseinit_clear (ds->caseinit);
345 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
347 dict_clear_vectors (ds->dict);
348 ds->permanent_dict = NULL;
349 return proc_cancel_all_transformations (ds) && ds->ok;
352 /* Casereader class for procedure execution. */
353 static struct casereader_class proc_casereader_class =
355 proc_casereader_read,
356 proc_casereader_destroy,
361 /* Updates last_proc_invocation. */
363 update_last_proc_invocation (struct dataset *ds)
365 ds->last_proc_invocation = time (NULL);
368 /* Returns a pointer to the lagged case from N_BEFORE cases before the
369 current one, or NULL if there haven't been that many cases yet. */
371 lagged_case (const struct dataset *ds, int n_before)
373 assert (n_before >= 1);
374 assert (n_before <= ds->n_lag);
376 if (n_before <= deque_count (&ds->lag))
377 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
382 /* Returns the current set of permanent transformations,
383 and clears the permanent transformations.
384 For use by INPUT PROGRAM. */
386 proc_capture_transformations (struct dataset *ds)
388 struct trns_chain *chain;
390 assert (ds->temporary_trns_chain == NULL);
391 chain = ds->permanent_trns_chain;
392 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
394 if ( ds->xform_callback)
395 ds->xform_callback (false, ds->xform_callback_aux);
400 /* Adds a transformation that processes a case with PROC and
401 frees itself with FREE to the current set of transformations.
402 The functions are passed AUX as auxiliary data. */
404 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
406 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
407 if ( ds->xform_callback)
408 ds->xform_callback (true, ds->xform_callback_aux);
411 /* Adds a transformation that processes a case with PROC and
412 frees itself with FREE to the current set of transformations.
413 When parsing of the block of transformations is complete,
414 FINALIZE will be called.
415 The functions are passed AUX as auxiliary data. */
417 add_transformation_with_finalizer (struct dataset *ds,
418 trns_finalize_func *finalize,
419 trns_proc_func *proc,
420 trns_free_func *free, void *aux)
422 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
424 if ( ds->xform_callback)
425 ds->xform_callback (true, ds->xform_callback_aux);
428 /* Returns the index of the next transformation.
429 This value can be returned by a transformation procedure
430 function to indicate a "jump" to that transformation. */
432 next_transformation (const struct dataset *ds)
434 return trns_chain_next (ds->cur_trns_chain);
437 /* Returns true if the next call to add_transformation() will add
438 a temporary transformation, false if it will add a permanent
441 proc_in_temporary_transformations (const struct dataset *ds)
443 return ds->temporary_trns_chain != NULL;
446 /* Marks the start of temporary transformations.
447 Further calls to add_transformation() will add temporary
450 proc_start_temporary_transformations (struct dataset *ds)
452 if (!proc_in_temporary_transformations (ds))
454 add_case_limit_trns (ds);
456 ds->permanent_dict = dict_clone (ds->dict);
458 trns_chain_finalize (ds->permanent_trns_chain);
459 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
461 if ( ds->xform_callback)
462 ds->xform_callback (true, ds->xform_callback_aux);
466 /* Converts all the temporary transformations, if any, to
467 permanent transformations. Further transformations will be
469 Returns true if anything changed, false otherwise. */
471 proc_make_temporary_transformations_permanent (struct dataset *ds)
473 if (proc_in_temporary_transformations (ds))
475 trns_chain_finalize (ds->temporary_trns_chain);
476 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
477 ds->temporary_trns_chain = NULL;
479 dict_destroy (ds->permanent_dict);
480 ds->permanent_dict = NULL;
488 /* Cancels all temporary transformations, if any. Further
489 transformations will be permanent.
490 Returns true if anything changed, false otherwise. */
492 proc_cancel_temporary_transformations (struct dataset *ds)
494 if (proc_in_temporary_transformations (ds))
496 dict_destroy (ds->dict);
497 ds->dict = ds->permanent_dict;
498 ds->permanent_dict = NULL;
499 if (ds->replace_dict) ds->replace_dict (ds->dict);
501 trns_chain_destroy (ds->temporary_trns_chain);
502 ds->temporary_trns_chain = NULL;
504 if ( ds->xform_callback)
505 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
506 ds->xform_callback_aux);
514 /* Cancels all transformations, if any.
515 Returns true if successful, false on I/O error. */
517 proc_cancel_all_transformations (struct dataset *ds)
520 assert (ds->proc_state == PROC_COMMITTED);
521 ok = trns_chain_destroy (ds->permanent_trns_chain);
522 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
523 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
524 ds->temporary_trns_chain = NULL;
525 if ( ds->xform_callback)
526 ds->xform_callback (false, ds->xform_callback_aux);
531 /* Initializes procedure handling. */
533 create_dataset (transformation_change_callback_func *cb, void *aux)
535 struct dataset *ds = xzalloc (sizeof(*ds));
536 ds->dict = dict_create ();
537 ds->caseinit = caseinit_create ();
538 ds->xform_callback = cb;
539 ds->xform_callback_aux = aux;
540 proc_cancel_all_transformations (ds);
546 dataset_add_transform_change_callback (struct dataset *ds,
547 transformation_change_callback_func *cb,
550 ds->xform_callback = cb;
551 ds->xform_callback_aux = aux;
554 /* Finishes up procedure handling. */
556 destroy_dataset (struct dataset *ds)
558 proc_discard_active_file (ds);
559 dict_destroy (ds->dict);
560 caseinit_destroy (ds->caseinit);
561 trns_chain_destroy (ds->permanent_trns_chain);
563 if ( ds->xform_callback)
564 ds->xform_callback (false, ds->xform_callback_aux);
568 /* Causes output from the next procedure to be discarded, instead
569 of being preserved for use as input for the next procedure. */
571 proc_discard_output (struct dataset *ds)
573 ds->discard_output = true;
576 /* Discards the active file dictionary, data, and
579 proc_discard_active_file (struct dataset *ds)
581 assert (ds->proc_state == PROC_COMMITTED);
583 dict_clear (ds->dict);
584 fh_set_default_handle (NULL);
588 casereader_destroy (ds->source);
590 if ( ds->replace_source) ds->replace_source (NULL);
592 proc_cancel_all_transformations (ds);
595 /* Sets SOURCE as the source for procedure input for the next
598 proc_set_active_file (struct dataset *ds,
599 struct casereader *source,
600 struct dictionary *dict)
602 assert (ds->proc_state == PROC_COMMITTED);
603 assert (ds->dict != dict);
605 proc_discard_active_file (ds);
607 dict_destroy (ds->dict);
609 if ( ds->replace_dict) ds->replace_dict (dict);
611 proc_set_active_file_data (ds, source);
614 /* Replaces the active file's data by READER without replacing
615 the associated dictionary. */
617 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
619 casereader_destroy (ds->source);
621 if (ds->replace_source) ds->replace_source (reader);
623 caseinit_clear (ds->caseinit);
624 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
626 return reader == NULL || !casereader_error (reader);
629 /* Returns true if an active file data source is available, false
632 proc_has_active_file (const struct dataset *ds)
634 return ds->source != NULL;
637 /* Checks whether DS has a corrupted active file. If so,
638 discards it and returns false. If not, returns true without
641 dataset_end_of_command (struct dataset *ds)
643 if (ds->source != NULL)
645 if (casereader_error (ds->source))
647 proc_discard_active_file (ds);
652 const struct taint *taint = casereader_get_taint (ds->source);
653 taint_reset_successor_taint ((struct taint *) taint);
654 assert (!taint_has_tainted_successor (taint));
660 static trns_proc_func case_limit_trns_proc;
661 static trns_free_func case_limit_trns_free;
663 /* Adds a transformation that limits the number of cases that may
664 pass through, if DS->DICT has a case limit. */
666 add_case_limit_trns (struct dataset *ds)
668 size_t case_limit = dict_get_case_limit (ds->dict);
671 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
672 *cases_remaining = case_limit;
673 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
675 dict_set_case_limit (ds->dict, 0);
679 /* Limits the maximum number of cases processed to
682 case_limit_trns_proc (void *cases_remaining_,
683 struct ccase *c UNUSED, casenumber case_nr UNUSED)
685 size_t *cases_remaining = cases_remaining_;
686 if (*cases_remaining > 0)
688 (*cases_remaining)--;
689 return TRNS_CONTINUE;
692 return TRNS_DROP_CASE;
695 /* Frees the data associated with a case limit transformation. */
697 case_limit_trns_free (void *cases_remaining_)
699 size_t *cases_remaining = cases_remaining_;
700 free (cases_remaining);
704 static trns_proc_func filter_trns_proc;
706 /* Adds a temporary transformation to filter data according to
707 the variable specified on FILTER, if any. */
709 add_filter_trns (struct dataset *ds)
711 struct variable *filter_var = dict_get_filter (ds->dict);
712 if (filter_var != NULL)
714 proc_start_temporary_transformations (ds);
715 add_transformation (ds, filter_trns_proc, NULL, filter_var);
719 /* FILTER transformation. */
721 filter_trns_proc (void *filter_var_,
722 struct ccase *c UNUSED, casenumber case_nr UNUSED)
725 struct variable *filter_var = filter_var_;
726 double f = case_num (c, filter_var);
727 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
728 ? TRNS_CONTINUE : TRNS_DROP_CASE);
733 dataset_dict (const struct dataset *ds)
738 const struct casereader *
739 dataset_source (const struct dataset *ds)
745 dataset_need_lag (struct dataset *ds, int n_before)
747 ds->n_lag = MAX (ds->n_lag, n_before);