1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/caseinit.h>
26 #include <data/casereader.h>
27 #include <data/casereader-provider.h>
28 #include <data/casewriter.h>
29 #include <data/dictionary.h>
30 #include <data/file-handle-def.h>
31 #include <data/procedure.h>
32 #include <data/transformations.h>
33 #include <data/variable.h>
34 #include <libpspp/alloc.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
42 /* Cases are read from source,
43 their transformation variables are initialized,
44 pass through permanent_trns_chain (which transforms them into
45 the format described by permanent_dict),
47 pass through temporary_trns_chain (which transforms them into
48 the format described by dict),
49 and are finally passed to the procedure. */
50 struct casereader *source;
51 struct caseinit *caseinit;
52 struct trns_chain *permanent_trns_chain;
53 struct dictionary *permanent_dict;
54 struct casewriter *sink;
55 struct trns_chain *temporary_trns_chain;
56 struct dictionary *dict;
58 /* Callback which occurs when a procedure provides a new source for
60 replace_source_callback *replace_source ;
62 /* Callback which occurs whenever the DICT is replaced by a new one */
63 replace_dictionary_callback *replace_dict;
65 /* Callback which occurs whenever the transformation chain(s) have
67 transformation_change_callback_func *xform_callback;
68 void *xform_callback_aux;
70 /* If true, cases are discarded instead of being written to
74 /* The transformation chain that the next transformation will be
76 struct trns_chain *cur_trns_chain;
78 /* The compactor used to compact a case, if necessary;
79 otherwise a null pointer. */
80 struct dict_compactor *compactor;
82 /* Time at which proc was last invoked. */
83 time_t last_proc_invocation;
85 /* Cases just before ("lagging") the current one. */
86 int n_lag; /* Number of cases to lag. */
87 struct deque lag; /* Deque of lagged cases. */
88 struct ccase *lag_cases; /* Lagged cases managed by deque. */
93 PROC_COMMITTED, /* No procedure in progress. */
94 PROC_OPEN, /* proc_open called, casereader still open. */
95 PROC_CLOSED /* casereader from proc_open destroyed,
96 but proc_commit not yet called. */
99 casenumber cases_written; /* Cases output so far. */
100 bool ok; /* Error status. */
101 }; /* struct dataset */
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
107 static void update_last_proc_invocation (struct dataset *ds);
109 /* Public functions. */
111 /* Returns the last time the data was read. */
113 time_of_last_procedure (struct dataset *ds)
115 if (ds->last_proc_invocation == 0)
116 update_last_proc_invocation (ds);
117 return ds->last_proc_invocation;
120 /* Regular procedure. */
122 /* Executes any pending transformations, if necessary.
123 This is not identical to the EXECUTE command in that it won't
124 always read the source data. This can be important when the
125 source data is given inline within BEGIN DATA...END FILE. */
127 proc_execute (struct dataset *ds)
131 if ((ds->temporary_trns_chain == NULL
132 || trns_chain_is_empty (ds->temporary_trns_chain))
133 && trns_chain_is_empty (ds->permanent_trns_chain))
136 ds->discard_output = false;
137 dict_set_case_limit (ds->dict, 0);
138 dict_clear_vectors (ds->dict);
142 ok = casereader_destroy (proc_open (ds));
143 return proc_commit (ds) && ok;
146 static struct casereader_class proc_casereader_class;
148 /* Opens dataset DS for reading cases with proc_read.
149 proc_commit must be called when done. */
151 proc_open (struct dataset *ds)
153 assert (ds->source != NULL);
154 assert (ds->proc_state == PROC_COMMITTED);
156 update_last_proc_invocation (ds);
158 caseinit_mark_for_init (ds->caseinit, ds->dict);
160 /* Finish up the collection of transformations. */
161 add_case_limit_trns (ds);
162 add_filter_trns (ds);
163 trns_chain_finalize (ds->cur_trns_chain);
165 /* Make permanent_dict refer to the dictionary right before
166 data reaches the sink. */
167 if (ds->permanent_dict == NULL)
168 ds->permanent_dict = ds->dict;
171 if (!ds->discard_output)
173 struct dictionary *pd = ds->permanent_dict;
174 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
175 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
176 ds->compactor = (should_compact
177 ? dict_make_compactor (pd, 1u << DC_SCRATCH)
179 ds->sink = autopaging_writer_create (compacted_value_cnt);
183 ds->compactor = NULL;
187 /* Allocate memory for lagged cases. */
188 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
190 ds->proc_state = PROC_OPEN;
191 ds->cases_written = 0;
194 /* FIXME: use taint in dataset in place of `ok'? */
195 /* FIXME: for trivial cases we can just return a clone of
197 return casereader_create_sequential (NULL,
198 dict_get_next_value_idx (ds->dict),
200 &proc_casereader_class, ds);
203 /* Returns true if a procedure is in progress, that is, if
204 proc_open has been called but proc_commit has not. */
206 proc_is_open (const struct dataset *ds)
208 return ds->proc_state != PROC_COMMITTED;
211 /* "read" function for procedure casereader. */
213 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
216 struct dataset *ds = ds_;
217 enum trns_result retval = TRNS_DROP_CASE;
219 assert (ds->proc_state == PROC_OPEN);
224 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
225 if (retval == TRNS_ERROR)
230 /* Read a case from source. */
231 if (!casereader_read (ds->source, c))
233 case_resize (c, dict_get_next_value_idx (ds->dict));
234 caseinit_init_vars (ds->caseinit, c);
236 /* Execute permanent transformations. */
237 case_nr = ds->cases_written + 1;
238 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
240 caseinit_update_left_vars (ds->caseinit, c);
241 if (retval != TRNS_CONTINUE)
247 /* Write case to collection of lagged cases. */
250 while (deque_count (&ds->lag) >= ds->n_lag)
251 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
252 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
255 /* Write case to replacement active file. */
257 if (ds->sink != NULL)
260 if (ds->compactor != NULL)
262 case_create (&tmp, casewriter_get_value_cnt (ds->sink));
263 dict_compactor_compact (ds->compactor, &tmp, c);
266 case_clone (&tmp, c);
267 casewriter_write (ds->sink, &tmp);
270 /* Execute temporary transformations. */
271 if (ds->temporary_trns_chain != NULL)
273 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
274 c, ds->cases_written);
275 if (retval != TRNS_CONTINUE)
286 /* "destroy" function for procedure casereader. */
288 proc_casereader_destroy (struct casereader *reader, void *ds_)
290 struct dataset *ds = ds_;
293 /* Make sure transformations happen for every input case, in
294 case they have side effects, and ensure that the replacement
295 active file gets all the cases it should. */
296 while (casereader_read (reader, &c))
299 ds->proc_state = PROC_CLOSED;
300 ds->ok = casereader_destroy (ds->source) && ds->ok;
302 proc_set_active_file_data (ds, NULL);
305 /* Must return false if the source casereader, a transformation,
306 or the sink casewriter signaled an error. (If a temporary
307 transformation signals an error, then the return value is
308 false, but the replacement active file may still be
311 proc_commit (struct dataset *ds)
313 assert (ds->proc_state == PROC_CLOSED);
314 ds->proc_state = PROC_COMMITTED;
316 /* Free memory for lagged cases. */
317 while (!deque_is_empty (&ds->lag))
318 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
319 free (ds->lag_cases);
321 /* Dictionary from before TEMPORARY becomes permanent. */
322 proc_cancel_temporary_transformations (ds);
324 if (!ds->discard_output)
326 /* Finish compacting. */
327 if (ds->compactor != NULL)
329 dict_compactor_destroy (ds->compactor);
330 ds->compactor = NULL;
332 dict_delete_scratch_vars (ds->dict);
333 dict_compact_values (ds->dict);
336 /* Old data sink becomes new data source. */
337 if (ds->sink != NULL)
338 ds->source = casewriter_make_reader (ds->sink);
343 ds->discard_output = false;
346 if ( ds->replace_source) ds->replace_source (ds->source);
348 caseinit_clear (ds->caseinit);
349 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
351 dict_clear_vectors (ds->dict);
352 ds->permanent_dict = NULL;
353 return proc_cancel_all_transformations (ds) && ds->ok;
356 /* Casereader class for procedure execution. */
357 static struct casereader_class proc_casereader_class =
359 proc_casereader_read,
360 proc_casereader_destroy,
365 /* Updates last_proc_invocation. */
367 update_last_proc_invocation (struct dataset *ds)
369 ds->last_proc_invocation = time (NULL);
372 /* Returns a pointer to the lagged case from N_BEFORE cases before the
373 current one, or NULL if there haven't been that many cases yet. */
375 lagged_case (const struct dataset *ds, int n_before)
377 assert (n_before >= 1);
378 assert (n_before <= ds->n_lag);
380 if (n_before <= deque_count (&ds->lag))
381 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
386 /* Returns the current set of permanent transformations,
387 and clears the permanent transformations.
388 For use by INPUT PROGRAM. */
390 proc_capture_transformations (struct dataset *ds)
392 struct trns_chain *chain;
394 assert (ds->temporary_trns_chain == NULL);
395 chain = ds->permanent_trns_chain;
396 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
398 if ( ds->xform_callback)
399 ds->xform_callback (false, ds->xform_callback_aux);
404 /* Adds a transformation that processes a case with PROC and
405 frees itself with FREE to the current set of transformations.
406 The functions are passed AUX as auxiliary data. */
408 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
410 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
411 if ( ds->xform_callback)
412 ds->xform_callback (true, ds->xform_callback_aux);
415 /* Adds a transformation that processes a case with PROC and
416 frees itself with FREE to the current set of transformations.
417 When parsing of the block of transformations is complete,
418 FINALIZE will be called.
419 The functions are passed AUX as auxiliary data. */
421 add_transformation_with_finalizer (struct dataset *ds,
422 trns_finalize_func *finalize,
423 trns_proc_func *proc,
424 trns_free_func *free, void *aux)
426 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
428 if ( ds->xform_callback)
429 ds->xform_callback (true, ds->xform_callback_aux);
432 /* Returns the index of the next transformation.
433 This value can be returned by a transformation procedure
434 function to indicate a "jump" to that transformation. */
436 next_transformation (const struct dataset *ds)
438 return trns_chain_next (ds->cur_trns_chain);
441 /* Returns true if the next call to add_transformation() will add
442 a temporary transformation, false if it will add a permanent
445 proc_in_temporary_transformations (const struct dataset *ds)
447 return ds->temporary_trns_chain != NULL;
450 /* Marks the start of temporary transformations.
451 Further calls to add_transformation() will add temporary
454 proc_start_temporary_transformations (struct dataset *ds)
456 if (!proc_in_temporary_transformations (ds))
458 add_case_limit_trns (ds);
460 ds->permanent_dict = dict_clone (ds->dict);
462 trns_chain_finalize (ds->permanent_trns_chain);
463 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
465 if ( ds->xform_callback)
466 ds->xform_callback (true, ds->xform_callback_aux);
470 /* Converts all the temporary transformations, if any, to
471 permanent transformations. Further transformations will be
473 Returns true if anything changed, false otherwise. */
475 proc_make_temporary_transformations_permanent (struct dataset *ds)
477 if (proc_in_temporary_transformations (ds))
479 trns_chain_finalize (ds->temporary_trns_chain);
480 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
481 ds->temporary_trns_chain = NULL;
483 dict_destroy (ds->permanent_dict);
484 ds->permanent_dict = NULL;
492 /* Cancels all temporary transformations, if any. Further
493 transformations will be permanent.
494 Returns true if anything changed, false otherwise. */
496 proc_cancel_temporary_transformations (struct dataset *ds)
498 if (proc_in_temporary_transformations (ds))
500 dict_destroy (ds->dict);
501 ds->dict = ds->permanent_dict;
502 ds->permanent_dict = NULL;
503 if (ds->replace_dict) ds->replace_dict (ds->dict);
505 trns_chain_destroy (ds->temporary_trns_chain);
506 ds->temporary_trns_chain = NULL;
508 if ( ds->xform_callback)
509 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
510 ds->xform_callback_aux);
518 /* Cancels all transformations, if any.
519 Returns true if successful, false on I/O error. */
521 proc_cancel_all_transformations (struct dataset *ds)
524 assert (ds->proc_state == PROC_COMMITTED);
525 ok = trns_chain_destroy (ds->permanent_trns_chain);
526 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
527 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
528 ds->temporary_trns_chain = NULL;
529 if ( ds->xform_callback)
530 ds->xform_callback (false, ds->xform_callback_aux);
535 /* Initializes procedure handling. */
537 create_dataset (transformation_change_callback_func *cb, void *aux)
539 struct dataset *ds = xzalloc (sizeof(*ds));
540 ds->dict = dict_create ();
541 ds->caseinit = caseinit_create ();
542 ds->xform_callback = cb;
543 ds->xform_callback_aux = aux;
544 proc_cancel_all_transformations (ds);
550 dataset_add_transform_change_callback (struct dataset *ds,
551 transformation_change_callback_func *cb,
554 ds->xform_callback = cb;
555 ds->xform_callback_aux = aux;
558 /* Finishes up procedure handling. */
560 destroy_dataset (struct dataset *ds)
562 proc_discard_active_file (ds);
563 dict_destroy (ds->dict);
564 caseinit_destroy (ds->caseinit);
565 trns_chain_destroy (ds->permanent_trns_chain);
567 if ( ds->xform_callback)
568 ds->xform_callback (false, ds->xform_callback_aux);
572 /* Causes output from the next procedure to be discarded, instead
573 of being preserved for use as input for the next procedure. */
575 proc_discard_output (struct dataset *ds)
577 ds->discard_output = true;
580 /* Discards the active file dictionary, data, and
583 proc_discard_active_file (struct dataset *ds)
585 assert (ds->proc_state == PROC_COMMITTED);
587 dict_clear (ds->dict);
588 fh_set_default_handle (NULL);
592 casereader_destroy (ds->source);
594 if ( ds->replace_source) ds->replace_source (NULL);
596 proc_cancel_all_transformations (ds);
599 /* Sets SOURCE as the source for procedure input for the next
602 proc_set_active_file (struct dataset *ds,
603 struct casereader *source,
604 struct dictionary *dict)
606 assert (ds->proc_state == PROC_COMMITTED);
607 assert (ds->dict != dict);
609 proc_discard_active_file (ds);
611 dict_destroy (ds->dict);
613 if ( ds->replace_dict) ds->replace_dict (dict);
615 proc_set_active_file_data (ds, source);
618 /* Replaces the active file's data by READER without replacing
619 the associated dictionary. */
621 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
623 casereader_destroy (ds->source);
625 if (ds->replace_source) ds->replace_source (reader);
627 caseinit_clear (ds->caseinit);
628 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
630 return reader == NULL || !casereader_error (reader);
633 /* Returns true if an active file data source is available, false
636 proc_has_active_file (const struct dataset *ds)
638 return ds->source != NULL;
641 /* Checks whether DS has a corrupted active file. If so,
642 discards it and returns false. If not, returns true without
645 dataset_end_of_command (struct dataset *ds)
647 if (ds->source != NULL)
649 if (casereader_error (ds->source))
651 proc_discard_active_file (ds);
656 const struct taint *taint = casereader_get_taint (ds->source);
657 taint_reset_successor_taint ((struct taint *) taint);
658 assert (!taint_has_tainted_successor (taint));
664 static trns_proc_func case_limit_trns_proc;
665 static trns_free_func case_limit_trns_free;
667 /* Adds a transformation that limits the number of cases that may
668 pass through, if DS->DICT has a case limit. */
670 add_case_limit_trns (struct dataset *ds)
672 size_t case_limit = dict_get_case_limit (ds->dict);
675 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
676 *cases_remaining = case_limit;
677 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
679 dict_set_case_limit (ds->dict, 0);
683 /* Limits the maximum number of cases processed to
686 case_limit_trns_proc (void *cases_remaining_,
687 struct ccase *c UNUSED, casenumber case_nr UNUSED)
689 size_t *cases_remaining = cases_remaining_;
690 if (*cases_remaining > 0)
692 (*cases_remaining)--;
693 return TRNS_CONTINUE;
696 return TRNS_DROP_CASE;
699 /* Frees the data associated with a case limit transformation. */
701 case_limit_trns_free (void *cases_remaining_)
703 size_t *cases_remaining = cases_remaining_;
704 free (cases_remaining);
708 static trns_proc_func filter_trns_proc;
710 /* Adds a temporary transformation to filter data according to
711 the variable specified on FILTER, if any. */
713 add_filter_trns (struct dataset *ds)
715 struct variable *filter_var = dict_get_filter (ds->dict);
716 if (filter_var != NULL)
718 proc_start_temporary_transformations (ds);
719 add_transformation (ds, filter_trns_proc, NULL, filter_var);
723 /* FILTER transformation. */
725 filter_trns_proc (void *filter_var_,
726 struct ccase *c UNUSED, casenumber case_nr UNUSED)
729 struct variable *filter_var = filter_var_;
730 double f = case_num (c, filter_var);
731 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
732 ? TRNS_CONTINUE : TRNS_DROP_CASE);
737 dataset_dict (const struct dataset *ds)
742 const struct casereader *
743 dataset_source (const struct dataset *ds)
749 dataset_need_lag (struct dataset *ds, int n_before)
751 ds->n_lag = MAX (ds->n_lag, n_before);