1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
41 /* Cases are read from source,
42 their transformation variables are initialized,
43 pass through permanent_trns_chain (which transforms them into
44 the format described by permanent_dict),
46 pass through temporary_trns_chain (which transforms them into
47 the format described by dict),
48 and are finally passed to the procedure. */
49 struct casereader *source;
50 struct caseinit *caseinit;
51 struct trns_chain *permanent_trns_chain;
52 struct dictionary *permanent_dict;
53 struct casewriter *sink;
54 struct trns_chain *temporary_trns_chain;
55 struct dictionary *dict;
57 /* Callback which occurs when a procedure provides a new source for
59 replace_source_callback *replace_source ;
61 /* Callback which occurs whenever the DICT is replaced by a new one */
62 replace_dictionary_callback *replace_dict;
64 /* If true, cases are discarded instead of being written to
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The compactor used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct dict_compactor *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct deque lag; /* Deque of lagged cases. */
82 struct ccase *lag_cases; /* Lagged cases managed by deque. */
87 PROC_COMMITTED, /* No procedure in progress. */
88 PROC_OPEN, /* proc_open called, casereader still open. */
89 PROC_CLOSED /* casereader from proc_open destroyed,
90 but proc_commit not yet called. */
93 casenumber cases_written; /* Cases output so far. */
94 bool ok; /* Error status. */
95 }; /* struct dataset */
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
101 static void update_last_proc_invocation (struct dataset *ds);
103 /* Public functions. */
105 /* Returns the last time the data was read. */
107 time_of_last_procedure (struct dataset *ds)
109 if (ds->last_proc_invocation == 0)
110 update_last_proc_invocation (ds);
111 return ds->last_proc_invocation;
114 /* Regular procedure. */
116 /* Executes any pending transformations, if necessary.
117 This is not identical to the EXECUTE command in that it won't
118 always read the source data. This can be important when the
119 source data is given inline within BEGIN DATA...END FILE. */
121 proc_execute (struct dataset *ds)
125 if ((ds->temporary_trns_chain == NULL
126 || trns_chain_is_empty (ds->temporary_trns_chain))
127 && trns_chain_is_empty (ds->permanent_trns_chain))
130 ds->discard_output = false;
131 dict_set_case_limit (ds->dict, 0);
132 dict_clear_vectors (ds->dict);
136 ok = casereader_destroy (proc_open (ds));
137 return proc_commit (ds) && ok;
140 static struct casereader_class proc_casereader_class;
142 /* Opens dataset DS for reading cases with proc_read.
143 proc_commit must be called when done. */
145 proc_open (struct dataset *ds)
147 assert (ds->source != NULL);
148 assert (ds->proc_state == PROC_COMMITTED);
150 update_last_proc_invocation (ds);
152 caseinit_mark_for_init (ds->caseinit, ds->dict);
154 /* Finish up the collection of transformations. */
155 add_case_limit_trns (ds);
156 add_filter_trns (ds);
157 trns_chain_finalize (ds->cur_trns_chain);
159 /* Make permanent_dict refer to the dictionary right before
160 data reaches the sink. */
161 if (ds->permanent_dict == NULL)
162 ds->permanent_dict = ds->dict;
165 if (!ds->discard_output)
167 ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
168 ? dict_make_compactor (ds->permanent_dict)
170 ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
171 ds->permanent_dict));
175 ds->compactor = NULL;
179 /* Allocate memory for lagged cases. */
180 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
182 ds->proc_state = PROC_OPEN;
183 ds->cases_written = 0;
186 /* FIXME: use taint in dataset in place of `ok'? */
187 /* FIXME: for trivial cases we can just return a clone of
189 return casereader_create_sequential (NULL,
190 dict_get_next_value_idx (ds->dict),
192 &proc_casereader_class, ds);
195 /* Returns true if a procedure is in progress, that is, if
196 proc_open has been called but proc_commit has not. */
198 proc_is_open (const struct dataset *ds)
200 return ds->proc_state != PROC_COMMITTED;
203 /* "read" function for procedure casereader. */
205 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
208 struct dataset *ds = ds_;
209 enum trns_result retval = TRNS_DROP_CASE;
211 assert (ds->proc_state == PROC_OPEN);
216 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
217 if (retval == TRNS_ERROR)
222 /* Read a case from source. */
223 if (!casereader_read (ds->source, c))
225 case_resize (c, dict_get_next_value_idx (ds->dict));
226 caseinit_init_vars (ds->caseinit, c);
228 /* Execute permanent transformations. */
229 case_nr = ds->cases_written + 1;
230 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
232 caseinit_update_left_vars (ds->caseinit, c);
233 if (retval != TRNS_CONTINUE)
239 /* Write case to collection of lagged cases. */
242 while (deque_count (&ds->lag) >= ds->n_lag)
243 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
244 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
247 /* Write case to replacement active file. */
249 if (ds->sink != NULL)
252 if (ds->compactor != NULL)
254 case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
255 dict_compactor_compact (ds->compactor, &tmp, c);
258 case_clone (&tmp, c);
259 casewriter_write (ds->sink, &tmp);
262 /* Execute temporary transformations. */
263 if (ds->temporary_trns_chain != NULL)
265 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
266 c, ds->cases_written);
267 if (retval != TRNS_CONTINUE)
278 /* "destroy" function for procedure casereader. */
280 proc_casereader_destroy (struct casereader *reader, void *ds_)
282 struct dataset *ds = ds_;
285 /* Make sure transformations happen for every input case, in
286 case they have side effects, and ensure that the replacement
287 active file gets all the cases it should. */
288 while (casereader_read (reader, &c))
291 ds->proc_state = PROC_CLOSED;
292 ds->ok = casereader_destroy (ds->source) && ds->ok;
294 proc_set_active_file_data (ds, NULL);
297 /* Must return false if the source casereader, a transformation,
298 or the sink casewriter signaled an error. (If a temporary
299 transformation signals an error, then the return value is
300 false, but the replacement active file may still be
303 proc_commit (struct dataset *ds)
305 assert (ds->proc_state == PROC_CLOSED);
306 ds->proc_state = PROC_COMMITTED;
308 /* Free memory for lagged cases. */
309 while (!deque_is_empty (&ds->lag))
310 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
311 free (ds->lag_cases);
313 /* Dictionary from before TEMPORARY becomes permanent. */
314 proc_cancel_temporary_transformations (ds);
316 if (!ds->discard_output)
318 /* Finish compacting. */
319 if (ds->compactor != NULL)
321 dict_compactor_destroy (ds->compactor);
322 dict_compact_values (ds->dict);
323 ds->compactor = NULL;
326 /* Old data sink becomes new data source. */
327 if (ds->sink != NULL)
328 ds->source = casewriter_make_reader (ds->sink);
333 ds->discard_output = false;
336 if ( ds->replace_source) ds->replace_source (ds->source);
338 caseinit_clear (ds->caseinit);
339 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
341 dict_clear_vectors (ds->dict);
342 ds->permanent_dict = NULL;
343 return proc_cancel_all_transformations (ds) && ds->ok;
346 /* Casereader class for procedure execution. */
347 static struct casereader_class proc_casereader_class =
349 proc_casereader_read,
350 proc_casereader_destroy,
355 /* Updates last_proc_invocation. */
357 update_last_proc_invocation (struct dataset *ds)
359 ds->last_proc_invocation = time (NULL);
362 /* Returns a pointer to the lagged case from N_BEFORE cases before the
363 current one, or NULL if there haven't been that many cases yet. */
365 lagged_case (const struct dataset *ds, int n_before)
367 assert (n_before >= 1);
368 assert (n_before <= ds->n_lag);
370 if (n_before <= deque_count (&ds->lag))
371 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
376 /* Returns the current set of permanent transformations,
377 and clears the permanent transformations.
378 For use by INPUT PROGRAM. */
380 proc_capture_transformations (struct dataset *ds)
382 struct trns_chain *chain;
384 assert (ds->temporary_trns_chain == NULL);
385 chain = ds->permanent_trns_chain;
386 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
390 /* Adds a transformation that processes a case with PROC and
391 frees itself with FREE to the current set of transformations.
392 The functions are passed AUX as auxiliary data. */
394 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
396 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
399 /* Adds a transformation that processes a case with PROC and
400 frees itself with FREE to the current set of transformations.
401 When parsing of the block of transformations is complete,
402 FINALIZE will be called.
403 The functions are passed AUX as auxiliary data. */
405 add_transformation_with_finalizer (struct dataset *ds,
406 trns_finalize_func *finalize,
407 trns_proc_func *proc,
408 trns_free_func *free, void *aux)
410 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
413 /* Returns the index of the next transformation.
414 This value can be returned by a transformation procedure
415 function to indicate a "jump" to that transformation. */
417 next_transformation (const struct dataset *ds)
419 return trns_chain_next (ds->cur_trns_chain);
422 /* Returns true if the next call to add_transformation() will add
423 a temporary transformation, false if it will add a permanent
426 proc_in_temporary_transformations (const struct dataset *ds)
428 return ds->temporary_trns_chain != NULL;
431 /* Marks the start of temporary transformations.
432 Further calls to add_transformation() will add temporary
435 proc_start_temporary_transformations (struct dataset *ds)
437 if (!proc_in_temporary_transformations (ds))
439 add_case_limit_trns (ds);
441 ds->permanent_dict = dict_clone (ds->dict);
443 trns_chain_finalize (ds->permanent_trns_chain);
444 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
448 /* Converts all the temporary transformations, if any, to
449 permanent transformations. Further transformations will be
451 Returns true if anything changed, false otherwise. */
453 proc_make_temporary_transformations_permanent (struct dataset *ds)
455 if (proc_in_temporary_transformations (ds))
457 trns_chain_finalize (ds->temporary_trns_chain);
458 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
459 ds->temporary_trns_chain = NULL;
461 dict_destroy (ds->permanent_dict);
462 ds->permanent_dict = NULL;
470 /* Cancels all temporary transformations, if any. Further
471 transformations will be permanent.
472 Returns true if anything changed, false otherwise. */
474 proc_cancel_temporary_transformations (struct dataset *ds)
476 if (proc_in_temporary_transformations (ds))
478 dict_destroy (ds->dict);
479 ds->dict = ds->permanent_dict;
480 ds->permanent_dict = NULL;
481 if (ds->replace_dict) ds->replace_dict (ds->dict);
483 trns_chain_destroy (ds->temporary_trns_chain);
484 ds->temporary_trns_chain = NULL;
492 /* Cancels all transformations, if any.
493 Returns true if successful, false on I/O error. */
495 proc_cancel_all_transformations (struct dataset *ds)
498 assert (ds->proc_state == PROC_COMMITTED);
499 ok = trns_chain_destroy (ds->permanent_trns_chain);
500 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
501 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
502 ds->temporary_trns_chain = NULL;
506 /* Initializes procedure handling. */
508 create_dataset (replace_source_callback *rps,
509 replace_dictionary_callback *rds)
511 struct dataset *ds = xzalloc (sizeof(*ds));
512 ds->dict = dict_create ();
513 ds->caseinit = caseinit_create ();
514 ds->replace_source = rps;
515 ds->replace_dict = rds;
516 proc_cancel_all_transformations (ds);
520 /* Finishes up procedure handling. */
522 destroy_dataset (struct dataset *ds)
524 proc_discard_active_file (ds);
525 dict_destroy (ds->dict);
526 caseinit_destroy (ds->caseinit);
527 trns_chain_destroy (ds->permanent_trns_chain);
531 /* Causes output from the next procedure to be discarded, instead
532 of being preserved for use as input for the next procedure. */
534 proc_discard_output (struct dataset *ds)
536 ds->discard_output = true;
539 /* Discards the active file dictionary, data, and
542 proc_discard_active_file (struct dataset *ds)
544 assert (ds->proc_state == PROC_COMMITTED);
546 dict_clear (ds->dict);
547 fh_set_default_handle (NULL);
551 casereader_destroy (ds->source);
553 if ( ds->replace_source) ds->replace_source (NULL);
555 proc_cancel_all_transformations (ds);
558 /* Sets SOURCE as the source for procedure input for the next
561 proc_set_active_file (struct dataset *ds,
562 struct casereader *source,
563 struct dictionary *dict)
565 assert (ds->proc_state == PROC_COMMITTED);
566 assert (ds->dict != dict);
568 proc_discard_active_file (ds);
570 dict_destroy (ds->dict);
572 if ( ds->replace_dict) ds->replace_dict (dict);
574 proc_set_active_file_data (ds, source);
577 /* Replaces the active file's data by READER without replacing
578 the associated dictionary. */
580 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
582 casereader_destroy (ds->source);
584 if (ds->replace_source) ds->replace_source (reader);
586 caseinit_clear (ds->caseinit);
587 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
589 return reader == NULL || !casereader_error (reader);
592 /* Returns true if an active file data source is available, false
595 proc_has_active_file (const struct dataset *ds)
597 return ds->source != NULL;
600 /* Checks whether DS has a corrupted active file. If so,
601 discards it and returns false. If not, returns true without
604 dataset_end_of_command (struct dataset *ds)
606 if (ds->source != NULL)
608 if (casereader_error (ds->source))
610 proc_discard_active_file (ds);
615 const struct taint *taint = casereader_get_taint (ds->source);
616 taint_reset_successor_taint ((struct taint *) taint);
617 assert (!taint_has_tainted_successor (taint));
623 static trns_proc_func case_limit_trns_proc;
624 static trns_free_func case_limit_trns_free;
626 /* Adds a transformation that limits the number of cases that may
627 pass through, if DS->DICT has a case limit. */
629 add_case_limit_trns (struct dataset *ds)
631 size_t case_limit = dict_get_case_limit (ds->dict);
634 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
635 *cases_remaining = case_limit;
636 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
638 dict_set_case_limit (ds->dict, 0);
642 /* Limits the maximum number of cases processed to
645 case_limit_trns_proc (void *cases_remaining_,
646 struct ccase *c UNUSED, casenumber case_nr UNUSED)
648 size_t *cases_remaining = cases_remaining_;
649 if (*cases_remaining > 0)
651 (*cases_remaining)--;
652 return TRNS_CONTINUE;
655 return TRNS_DROP_CASE;
658 /* Frees the data associated with a case limit transformation. */
660 case_limit_trns_free (void *cases_remaining_)
662 size_t *cases_remaining = cases_remaining_;
663 free (cases_remaining);
667 static trns_proc_func filter_trns_proc;
669 /* Adds a temporary transformation to filter data according to
670 the variable specified on FILTER, if any. */
672 add_filter_trns (struct dataset *ds)
674 struct variable *filter_var = dict_get_filter (ds->dict);
675 if (filter_var != NULL)
677 proc_start_temporary_transformations (ds);
678 add_transformation (ds, filter_trns_proc, NULL, filter_var);
682 /* FILTER transformation. */
684 filter_trns_proc (void *filter_var_,
685 struct ccase *c UNUSED, casenumber case_nr UNUSED)
688 struct variable *filter_var = filter_var_;
689 double f = case_num (c, filter_var);
690 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
691 ? TRNS_CONTINUE : TRNS_DROP_CASE);
696 dataset_dict (const struct dataset *ds)
701 const struct casereader *
702 dataset_source (const struct dataset *ds)
708 dataset_need_lag (struct dataset *ds, int n_before)
710 ds->n_lag = MAX (ds->n_lag, n_before);