1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/alloc.h>
36 #include <libpspp/deque.h>
37 #include <libpspp/misc.h>
38 #include <libpspp/str.h>
39 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs when a procedure provides a new source for
61 replace_source_callback *replace_source ;
63 /* Callback which occurs whenever the DICT is replaced by a new one */
64 replace_dictionary_callback *replace_dict;
66 /* Callback which occurs whenever the transformation chain(s) have
68 transformation_change_callback_func *xform_callback;
69 void *xform_callback_aux;
71 /* If true, cases are discarded instead of being written to
75 /* The transformation chain that the next transformation will be
77 struct trns_chain *cur_trns_chain;
79 /* The case map used to compact a case, if necessary;
80 otherwise a null pointer. */
81 struct case_map *compactor;
83 /* Time at which proc was last invoked. */
84 time_t last_proc_invocation;
86 /* Cases just before ("lagging") the current one. */
87 int n_lag; /* Number of cases to lag. */
88 struct deque lag; /* Deque of lagged cases. */
89 struct ccase *lag_cases; /* Lagged cases managed by deque. */
94 PROC_COMMITTED, /* No procedure in progress. */
95 PROC_OPEN, /* proc_open called, casereader still open. */
96 PROC_CLOSED /* casereader from proc_open destroyed,
97 but proc_commit not yet called. */
100 casenumber cases_written; /* Cases output so far. */
101 bool ok; /* Error status. */
102 }; /* struct dataset */
105 static void add_case_limit_trns (struct dataset *ds);
106 static void add_filter_trns (struct dataset *ds);
108 static void update_last_proc_invocation (struct dataset *ds);
110 /* Public functions. */
112 /* Returns the last time the data was read. */
114 time_of_last_procedure (struct dataset *ds)
116 if (ds->last_proc_invocation == 0)
117 update_last_proc_invocation (ds);
118 return ds->last_proc_invocation;
121 /* Regular procedure. */
123 /* Executes any pending transformations, if necessary.
124 This is not identical to the EXECUTE command in that it won't
125 always read the source data. This can be important when the
126 source data is given inline within BEGIN DATA...END FILE. */
128 proc_execute (struct dataset *ds)
132 if ((ds->temporary_trns_chain == NULL
133 || trns_chain_is_empty (ds->temporary_trns_chain))
134 && trns_chain_is_empty (ds->permanent_trns_chain))
137 ds->discard_output = false;
138 dict_set_case_limit (ds->dict, 0);
139 dict_clear_vectors (ds->dict);
143 ok = casereader_destroy (proc_open (ds));
144 return proc_commit (ds) && ok;
147 static struct casereader_class proc_casereader_class;
149 /* Opens dataset DS for reading cases with proc_read.
150 proc_commit must be called when done. */
152 proc_open (struct dataset *ds)
154 assert (ds->source != NULL);
155 assert (ds->proc_state == PROC_COMMITTED);
157 update_last_proc_invocation (ds);
159 caseinit_mark_for_init (ds->caseinit, ds->dict);
161 /* Finish up the collection of transformations. */
162 add_case_limit_trns (ds);
163 add_filter_trns (ds);
164 trns_chain_finalize (ds->cur_trns_chain);
166 /* Make permanent_dict refer to the dictionary right before
167 data reaches the sink. */
168 if (ds->permanent_dict == NULL)
169 ds->permanent_dict = ds->dict;
172 if (!ds->discard_output)
174 struct dictionary *pd = ds->permanent_dict;
175 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
176 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
177 ds->compactor = (should_compact
178 ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
180 ds->sink = autopaging_writer_create (compacted_value_cnt);
184 ds->compactor = NULL;
188 /* Allocate memory for lagged cases. */
189 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
191 ds->proc_state = PROC_OPEN;
192 ds->cases_written = 0;
195 /* FIXME: use taint in dataset in place of `ok'? */
196 /* FIXME: for trivial cases we can just return a clone of
198 return casereader_create_sequential (NULL,
199 dict_get_next_value_idx (ds->dict),
201 &proc_casereader_class, ds);
204 /* Returns true if a procedure is in progress, that is, if
205 proc_open has been called but proc_commit has not. */
207 proc_is_open (const struct dataset *ds)
209 return ds->proc_state != PROC_COMMITTED;
212 /* "read" function for procedure casereader. */
214 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
217 struct dataset *ds = ds_;
218 enum trns_result retval = TRNS_DROP_CASE;
220 assert (ds->proc_state == PROC_OPEN);
225 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
226 if (retval == TRNS_ERROR)
231 /* Read a case from source. */
232 if (!casereader_read (ds->source, c))
234 case_resize (c, dict_get_next_value_idx (ds->dict));
235 caseinit_init_vars (ds->caseinit, c);
237 /* Execute permanent transformations. */
238 case_nr = ds->cases_written + 1;
239 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
241 caseinit_update_left_vars (ds->caseinit, c);
242 if (retval != TRNS_CONTINUE)
248 /* Write case to collection of lagged cases. */
251 while (deque_count (&ds->lag) >= ds->n_lag)
252 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
253 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
256 /* Write case to replacement active file. */
258 if (ds->sink != NULL)
261 if (ds->compactor != NULL)
262 case_map_execute (ds->compactor, c, &tmp);
264 case_clone (&tmp, c);
265 casewriter_write (ds->sink, &tmp);
268 /* Execute temporary transformations. */
269 if (ds->temporary_trns_chain != NULL)
271 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
272 c, ds->cases_written);
273 if (retval != TRNS_CONTINUE)
284 /* "destroy" function for procedure casereader. */
286 proc_casereader_destroy (struct casereader *reader, void *ds_)
288 struct dataset *ds = ds_;
291 /* Make sure transformations happen for every input case, in
292 case they have side effects, and ensure that the replacement
293 active file gets all the cases it should. */
294 while (casereader_read (reader, &c))
297 ds->proc_state = PROC_CLOSED;
298 ds->ok = casereader_destroy (ds->source) && ds->ok;
300 proc_set_active_file_data (ds, NULL);
303 /* Must return false if the source casereader, a transformation,
304 or the sink casewriter signaled an error. (If a temporary
305 transformation signals an error, then the return value is
306 false, but the replacement active file may still be
309 proc_commit (struct dataset *ds)
311 assert (ds->proc_state == PROC_CLOSED);
312 ds->proc_state = PROC_COMMITTED;
314 /* Free memory for lagged cases. */
315 while (!deque_is_empty (&ds->lag))
316 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
317 free (ds->lag_cases);
319 /* Dictionary from before TEMPORARY becomes permanent. */
320 proc_cancel_temporary_transformations (ds);
322 if (!ds->discard_output)
324 /* Finish compacting. */
325 if (ds->compactor != NULL)
327 case_map_destroy (ds->compactor);
328 ds->compactor = NULL;
330 dict_delete_scratch_vars (ds->dict);
331 dict_compact_values (ds->dict);
334 /* Old data sink becomes new data source. */
335 if (ds->sink != NULL)
336 ds->source = casewriter_make_reader (ds->sink);
341 ds->discard_output = false;
344 if ( ds->replace_source) ds->replace_source (ds->source);
346 caseinit_clear (ds->caseinit);
347 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
349 dict_clear_vectors (ds->dict);
350 ds->permanent_dict = NULL;
351 return proc_cancel_all_transformations (ds) && ds->ok;
354 /* Casereader class for procedure execution. */
355 static struct casereader_class proc_casereader_class =
357 proc_casereader_read,
358 proc_casereader_destroy,
363 /* Updates last_proc_invocation. */
365 update_last_proc_invocation (struct dataset *ds)
367 ds->last_proc_invocation = time (NULL);
370 /* Returns a pointer to the lagged case from N_BEFORE cases before the
371 current one, or NULL if there haven't been that many cases yet. */
373 lagged_case (const struct dataset *ds, int n_before)
375 assert (n_before >= 1);
376 assert (n_before <= ds->n_lag);
378 if (n_before <= deque_count (&ds->lag))
379 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
384 /* Returns the current set of permanent transformations,
385 and clears the permanent transformations.
386 For use by INPUT PROGRAM. */
388 proc_capture_transformations (struct dataset *ds)
390 struct trns_chain *chain;
392 assert (ds->temporary_trns_chain == NULL);
393 chain = ds->permanent_trns_chain;
394 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
396 if ( ds->xform_callback)
397 ds->xform_callback (false, ds->xform_callback_aux);
402 /* Adds a transformation that processes a case with PROC and
403 frees itself with FREE to the current set of transformations.
404 The functions are passed AUX as auxiliary data. */
406 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
408 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
409 if ( ds->xform_callback)
410 ds->xform_callback (true, ds->xform_callback_aux);
413 /* Adds a transformation that processes a case with PROC and
414 frees itself with FREE to the current set of transformations.
415 When parsing of the block of transformations is complete,
416 FINALIZE will be called.
417 The functions are passed AUX as auxiliary data. */
419 add_transformation_with_finalizer (struct dataset *ds,
420 trns_finalize_func *finalize,
421 trns_proc_func *proc,
422 trns_free_func *free, void *aux)
424 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
426 if ( ds->xform_callback)
427 ds->xform_callback (true, ds->xform_callback_aux);
430 /* Returns the index of the next transformation.
431 This value can be returned by a transformation procedure
432 function to indicate a "jump" to that transformation. */
434 next_transformation (const struct dataset *ds)
436 return trns_chain_next (ds->cur_trns_chain);
439 /* Returns true if the next call to add_transformation() will add
440 a temporary transformation, false if it will add a permanent
443 proc_in_temporary_transformations (const struct dataset *ds)
445 return ds->temporary_trns_chain != NULL;
448 /* Marks the start of temporary transformations.
449 Further calls to add_transformation() will add temporary
452 proc_start_temporary_transformations (struct dataset *ds)
454 if (!proc_in_temporary_transformations (ds))
456 add_case_limit_trns (ds);
458 ds->permanent_dict = dict_clone (ds->dict);
460 trns_chain_finalize (ds->permanent_trns_chain);
461 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
463 if ( ds->xform_callback)
464 ds->xform_callback (true, ds->xform_callback_aux);
468 /* Converts all the temporary transformations, if any, to
469 permanent transformations. Further transformations will be
471 Returns true if anything changed, false otherwise. */
473 proc_make_temporary_transformations_permanent (struct dataset *ds)
475 if (proc_in_temporary_transformations (ds))
477 trns_chain_finalize (ds->temporary_trns_chain);
478 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
479 ds->temporary_trns_chain = NULL;
481 dict_destroy (ds->permanent_dict);
482 ds->permanent_dict = NULL;
490 /* Cancels all temporary transformations, if any. Further
491 transformations will be permanent.
492 Returns true if anything changed, false otherwise. */
494 proc_cancel_temporary_transformations (struct dataset *ds)
496 if (proc_in_temporary_transformations (ds))
498 dict_destroy (ds->dict);
499 ds->dict = ds->permanent_dict;
500 ds->permanent_dict = NULL;
501 if (ds->replace_dict) ds->replace_dict (ds->dict);
503 trns_chain_destroy (ds->temporary_trns_chain);
504 ds->temporary_trns_chain = NULL;
506 if ( ds->xform_callback)
507 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
508 ds->xform_callback_aux);
516 /* Cancels all transformations, if any.
517 Returns true if successful, false on I/O error. */
519 proc_cancel_all_transformations (struct dataset *ds)
522 assert (ds->proc_state == PROC_COMMITTED);
523 ok = trns_chain_destroy (ds->permanent_trns_chain);
524 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
525 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
526 ds->temporary_trns_chain = NULL;
527 if ( ds->xform_callback)
528 ds->xform_callback (false, ds->xform_callback_aux);
533 /* Initializes procedure handling. */
535 create_dataset (transformation_change_callback_func *cb, void *aux)
537 struct dataset *ds = xzalloc (sizeof(*ds));
538 ds->dict = dict_create ();
539 ds->caseinit = caseinit_create ();
540 ds->xform_callback = cb;
541 ds->xform_callback_aux = aux;
542 proc_cancel_all_transformations (ds);
548 dataset_add_transform_change_callback (struct dataset *ds,
549 transformation_change_callback_func *cb,
552 ds->xform_callback = cb;
553 ds->xform_callback_aux = aux;
556 /* Finishes up procedure handling. */
558 destroy_dataset (struct dataset *ds)
560 proc_discard_active_file (ds);
561 dict_destroy (ds->dict);
562 caseinit_destroy (ds->caseinit);
563 trns_chain_destroy (ds->permanent_trns_chain);
565 if ( ds->xform_callback)
566 ds->xform_callback (false, ds->xform_callback_aux);
570 /* Causes output from the next procedure to be discarded, instead
571 of being preserved for use as input for the next procedure. */
573 proc_discard_output (struct dataset *ds)
575 ds->discard_output = true;
578 /* Discards the active file dictionary, data, and
581 proc_discard_active_file (struct dataset *ds)
583 assert (ds->proc_state == PROC_COMMITTED);
585 dict_clear (ds->dict);
586 fh_set_default_handle (NULL);
590 casereader_destroy (ds->source);
592 if ( ds->replace_source) ds->replace_source (NULL);
594 proc_cancel_all_transformations (ds);
597 /* Sets SOURCE as the source for procedure input for the next
600 proc_set_active_file (struct dataset *ds,
601 struct casereader *source,
602 struct dictionary *dict)
604 assert (ds->proc_state == PROC_COMMITTED);
605 assert (ds->dict != dict);
607 proc_discard_active_file (ds);
609 dict_destroy (ds->dict);
611 if ( ds->replace_dict) ds->replace_dict (dict);
613 proc_set_active_file_data (ds, source);
616 /* Replaces the active file's data by READER without replacing
617 the associated dictionary. */
619 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
621 casereader_destroy (ds->source);
623 if (ds->replace_source) ds->replace_source (reader);
625 caseinit_clear (ds->caseinit);
626 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
628 return reader == NULL || !casereader_error (reader);
631 /* Returns true if an active file data source is available, false
634 proc_has_active_file (const struct dataset *ds)
636 return ds->source != NULL;
639 /* Checks whether DS has a corrupted active file. If so,
640 discards it and returns false. If not, returns true without
643 dataset_end_of_command (struct dataset *ds)
645 if (ds->source != NULL)
647 if (casereader_error (ds->source))
649 proc_discard_active_file (ds);
654 const struct taint *taint = casereader_get_taint (ds->source);
655 taint_reset_successor_taint ((struct taint *) taint);
656 assert (!taint_has_tainted_successor (taint));
662 static trns_proc_func case_limit_trns_proc;
663 static trns_free_func case_limit_trns_free;
665 /* Adds a transformation that limits the number of cases that may
666 pass through, if DS->DICT has a case limit. */
668 add_case_limit_trns (struct dataset *ds)
670 size_t case_limit = dict_get_case_limit (ds->dict);
673 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
674 *cases_remaining = case_limit;
675 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
677 dict_set_case_limit (ds->dict, 0);
681 /* Limits the maximum number of cases processed to
684 case_limit_trns_proc (void *cases_remaining_,
685 struct ccase *c UNUSED, casenumber case_nr UNUSED)
687 size_t *cases_remaining = cases_remaining_;
688 if (*cases_remaining > 0)
690 (*cases_remaining)--;
691 return TRNS_CONTINUE;
694 return TRNS_DROP_CASE;
697 /* Frees the data associated with a case limit transformation. */
699 case_limit_trns_free (void *cases_remaining_)
701 size_t *cases_remaining = cases_remaining_;
702 free (cases_remaining);
706 static trns_proc_func filter_trns_proc;
708 /* Adds a temporary transformation to filter data according to
709 the variable specified on FILTER, if any. */
711 add_filter_trns (struct dataset *ds)
713 struct variable *filter_var = dict_get_filter (ds->dict);
714 if (filter_var != NULL)
716 proc_start_temporary_transformations (ds);
717 add_transformation (ds, filter_trns_proc, NULL, filter_var);
721 /* FILTER transformation. */
723 filter_trns_proc (void *filter_var_,
724 struct ccase *c UNUSED, casenumber case_nr UNUSED)
727 struct variable *filter_var = filter_var_;
728 double f = case_num (c, filter_var);
729 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
730 ? TRNS_CONTINUE : TRNS_DROP_CASE);
735 dataset_dict (const struct dataset *ds)
740 const struct casereader *
741 dataset_source (const struct dataset *ds)
747 dataset_need_lag (struct dataset *ds, int n_before)
749 ds->n_lag = MAX (ds->n_lag, n_before);