1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs whenever the transformation chain(s) have
61 transformation_change_callback_func *xform_callback;
62 void *xform_callback_aux;
64 /* If true, cases are discarded instead of being written to
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The case map used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct case_map *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct deque lag; /* Deque of lagged cases. */
82 struct ccase *lag_cases; /* Lagged cases managed by deque. */
87 PROC_COMMITTED, /* No procedure in progress. */
88 PROC_OPEN, /* proc_open called, casereader still open. */
89 PROC_CLOSED /* casereader from proc_open destroyed,
90 but proc_commit not yet called. */
93 casenumber cases_written; /* Cases output so far. */
94 bool ok; /* Error status. */
95 }; /* struct dataset */
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
101 static void update_last_proc_invocation (struct dataset *ds);
103 /* Public functions. */
105 /* Returns the last time the data was read. */
107 time_of_last_procedure (struct dataset *ds)
109 if (ds->last_proc_invocation == 0)
110 update_last_proc_invocation (ds);
111 return ds->last_proc_invocation;
114 /* Regular procedure. */
116 /* Executes any pending transformations, if necessary.
117 This is not identical to the EXECUTE command in that it won't
118 always read the source data. This can be important when the
119 source data is given inline within BEGIN DATA...END FILE. */
121 proc_execute (struct dataset *ds)
125 if ((ds->temporary_trns_chain == NULL
126 || trns_chain_is_empty (ds->temporary_trns_chain))
127 && trns_chain_is_empty (ds->permanent_trns_chain))
130 ds->discard_output = false;
131 dict_set_case_limit (ds->dict, 0);
132 dict_clear_vectors (ds->dict);
136 ok = casereader_destroy (proc_open (ds));
137 return proc_commit (ds) && ok;
140 static struct casereader_class proc_casereader_class;
142 /* Opens dataset DS for reading cases with proc_read.
143 proc_commit must be called when done. */
145 proc_open (struct dataset *ds)
147 assert (ds->source != NULL);
148 assert (ds->proc_state == PROC_COMMITTED);
150 update_last_proc_invocation (ds);
152 caseinit_mark_for_init (ds->caseinit, ds->dict);
154 /* Finish up the collection of transformations. */
155 add_case_limit_trns (ds);
156 add_filter_trns (ds);
157 trns_chain_finalize (ds->cur_trns_chain);
159 /* Make permanent_dict refer to the dictionary right before
160 data reaches the sink. */
161 if (ds->permanent_dict == NULL)
162 ds->permanent_dict = ds->dict;
165 if (!ds->discard_output)
167 struct dictionary *pd = ds->permanent_dict;
168 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
169 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
170 ds->compactor = (should_compact
171 ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
173 ds->sink = autopaging_writer_create (compacted_value_cnt);
177 ds->compactor = NULL;
181 /* Allocate memory for lagged cases. */
182 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
184 ds->proc_state = PROC_OPEN;
185 ds->cases_written = 0;
188 /* FIXME: use taint in dataset in place of `ok'? */
189 /* FIXME: for trivial cases we can just return a clone of
191 return casereader_create_sequential (NULL,
192 dict_get_next_value_idx (ds->dict),
194 &proc_casereader_class, ds);
197 /* Returns true if a procedure is in progress, that is, if
198 proc_open has been called but proc_commit has not. */
200 proc_is_open (const struct dataset *ds)
202 return ds->proc_state != PROC_COMMITTED;
205 /* "read" function for procedure casereader. */
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
210 struct dataset *ds = ds_;
211 enum trns_result retval = TRNS_DROP_CASE;
213 assert (ds->proc_state == PROC_OPEN);
218 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219 if (retval == TRNS_ERROR)
224 /* Read a case from source. */
225 if (!casereader_read (ds->source, c))
227 case_resize (c, dict_get_next_value_idx (ds->dict));
228 caseinit_init_vars (ds->caseinit, c);
230 /* Execute permanent transformations. */
231 case_nr = ds->cases_written + 1;
232 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
234 caseinit_update_left_vars (ds->caseinit, c);
235 if (retval != TRNS_CONTINUE)
241 /* Write case to collection of lagged cases. */
244 while (deque_count (&ds->lag) >= ds->n_lag)
245 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
246 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
249 /* Write case to replacement active file. */
251 if (ds->sink != NULL)
254 if (ds->compactor != NULL)
255 case_map_execute (ds->compactor, c, &tmp);
257 case_clone (&tmp, c);
258 casewriter_write (ds->sink, &tmp);
261 /* Execute temporary transformations. */
262 if (ds->temporary_trns_chain != NULL)
264 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
265 c, ds->cases_written);
266 if (retval != TRNS_CONTINUE)
277 /* "destroy" function for procedure casereader. */
279 proc_casereader_destroy (struct casereader *reader, void *ds_)
281 struct dataset *ds = ds_;
284 /* Make sure transformations happen for every input case, in
285 case they have side effects, and ensure that the replacement
286 active file gets all the cases it should. */
287 while (casereader_read (reader, &c))
290 ds->proc_state = PROC_CLOSED;
291 ds->ok = casereader_destroy (ds->source) && ds->ok;
293 proc_set_active_file_data (ds, NULL);
296 /* Must return false if the source casereader, a transformation,
297 or the sink casewriter signaled an error. (If a temporary
298 transformation signals an error, then the return value is
299 false, but the replacement active file may still be
302 proc_commit (struct dataset *ds)
304 assert (ds->proc_state == PROC_CLOSED);
305 ds->proc_state = PROC_COMMITTED;
307 /* Free memory for lagged cases. */
308 while (!deque_is_empty (&ds->lag))
309 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
310 free (ds->lag_cases);
312 /* Dictionary from before TEMPORARY becomes permanent. */
313 proc_cancel_temporary_transformations (ds);
315 if (!ds->discard_output)
317 /* Finish compacting. */
318 if (ds->compactor != NULL)
320 case_map_destroy (ds->compactor);
321 ds->compactor = NULL;
323 dict_delete_scratch_vars (ds->dict);
324 dict_compact_values (ds->dict);
327 /* Old data sink becomes new data source. */
328 if (ds->sink != NULL)
329 ds->source = casewriter_make_reader (ds->sink);
334 ds->discard_output = false;
338 caseinit_clear (ds->caseinit);
339 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
341 dict_clear_vectors (ds->dict);
342 ds->permanent_dict = NULL;
343 return proc_cancel_all_transformations (ds) && ds->ok;
346 /* Casereader class for procedure execution. */
347 static struct casereader_class proc_casereader_class =
349 proc_casereader_read,
350 proc_casereader_destroy,
355 /* Updates last_proc_invocation. */
357 update_last_proc_invocation (struct dataset *ds)
359 ds->last_proc_invocation = time (NULL);
362 /* Returns a pointer to the lagged case from N_BEFORE cases before the
363 current one, or NULL if there haven't been that many cases yet. */
365 lagged_case (const struct dataset *ds, int n_before)
367 assert (n_before >= 1);
368 assert (n_before <= ds->n_lag);
370 if (n_before <= deque_count (&ds->lag))
371 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
376 /* Returns the current set of permanent transformations,
377 and clears the permanent transformations.
378 For use by INPUT PROGRAM. */
380 proc_capture_transformations (struct dataset *ds)
382 struct trns_chain *chain;
384 assert (ds->temporary_trns_chain == NULL);
385 chain = ds->permanent_trns_chain;
386 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
388 if ( ds->xform_callback)
389 ds->xform_callback (false, ds->xform_callback_aux);
394 /* Adds a transformation that processes a case with PROC and
395 frees itself with FREE to the current set of transformations.
396 The functions are passed AUX as auxiliary data. */
398 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
400 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
401 if ( ds->xform_callback)
402 ds->xform_callback (true, ds->xform_callback_aux);
405 /* Adds a transformation that processes a case with PROC and
406 frees itself with FREE to the current set of transformations.
407 When parsing of the block of transformations is complete,
408 FINALIZE will be called.
409 The functions are passed AUX as auxiliary data. */
411 add_transformation_with_finalizer (struct dataset *ds,
412 trns_finalize_func *finalize,
413 trns_proc_func *proc,
414 trns_free_func *free, void *aux)
416 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
418 if ( ds->xform_callback)
419 ds->xform_callback (true, ds->xform_callback_aux);
422 /* Returns the index of the next transformation.
423 This value can be returned by a transformation procedure
424 function to indicate a "jump" to that transformation. */
426 next_transformation (const struct dataset *ds)
428 return trns_chain_next (ds->cur_trns_chain);
431 /* Returns true if the next call to add_transformation() will add
432 a temporary transformation, false if it will add a permanent
435 proc_in_temporary_transformations (const struct dataset *ds)
437 return ds->temporary_trns_chain != NULL;
440 /* Marks the start of temporary transformations.
441 Further calls to add_transformation() will add temporary
444 proc_start_temporary_transformations (struct dataset *ds)
446 if (!proc_in_temporary_transformations (ds))
448 add_case_limit_trns (ds);
450 ds->permanent_dict = dict_clone (ds->dict);
452 trns_chain_finalize (ds->permanent_trns_chain);
453 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
455 if ( ds->xform_callback)
456 ds->xform_callback (true, ds->xform_callback_aux);
460 /* Converts all the temporary transformations, if any, to
461 permanent transformations. Further transformations will be
463 Returns true if anything changed, false otherwise. */
465 proc_make_temporary_transformations_permanent (struct dataset *ds)
467 if (proc_in_temporary_transformations (ds))
469 trns_chain_finalize (ds->temporary_trns_chain);
470 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
471 ds->temporary_trns_chain = NULL;
473 dict_destroy (ds->permanent_dict);
474 ds->permanent_dict = NULL;
482 /* Cancels all temporary transformations, if any. Further
483 transformations will be permanent.
484 Returns true if anything changed, false otherwise. */
486 proc_cancel_temporary_transformations (struct dataset *ds)
488 if (proc_in_temporary_transformations (ds))
490 dict_destroy (ds->dict);
491 ds->dict = ds->permanent_dict;
492 ds->permanent_dict = NULL;
494 trns_chain_destroy (ds->temporary_trns_chain);
495 ds->temporary_trns_chain = NULL;
497 if ( ds->xform_callback)
498 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
499 ds->xform_callback_aux);
507 /* Cancels all transformations, if any.
508 Returns true if successful, false on I/O error. */
510 proc_cancel_all_transformations (struct dataset *ds)
513 assert (ds->proc_state == PROC_COMMITTED);
514 ok = trns_chain_destroy (ds->permanent_trns_chain);
515 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
516 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
517 ds->temporary_trns_chain = NULL;
518 if ( ds->xform_callback)
519 ds->xform_callback (false, ds->xform_callback_aux);
524 /* Initializes procedure handling. */
526 create_dataset (void)
528 struct dataset *ds = xzalloc (sizeof(*ds));
529 ds->dict = dict_create ();
530 ds->caseinit = caseinit_create ();
531 proc_cancel_all_transformations (ds);
537 dataset_add_transform_change_callback (struct dataset *ds,
538 transformation_change_callback_func *cb,
541 ds->xform_callback = cb;
542 ds->xform_callback_aux = aux;
545 /* Finishes up procedure handling. */
547 destroy_dataset (struct dataset *ds)
549 proc_discard_active_file (ds);
550 dict_destroy (ds->dict);
551 caseinit_destroy (ds->caseinit);
552 trns_chain_destroy (ds->permanent_trns_chain);
554 if ( ds->xform_callback)
555 ds->xform_callback (false, ds->xform_callback_aux);
559 /* Causes output from the next procedure to be discarded, instead
560 of being preserved for use as input for the next procedure. */
562 proc_discard_output (struct dataset *ds)
564 ds->discard_output = true;
567 /* Discards the active file dictionary, data, and
570 proc_discard_active_file (struct dataset *ds)
572 assert (ds->proc_state == PROC_COMMITTED);
574 dict_clear (ds->dict);
575 fh_set_default_handle (NULL);
579 casereader_destroy (ds->source);
582 proc_cancel_all_transformations (ds);
585 /* Sets SOURCE as the source for procedure input for the next
588 proc_set_active_file (struct dataset *ds,
589 struct casereader *source,
590 struct dictionary *dict)
592 assert (ds->proc_state == PROC_COMMITTED);
593 assert (ds->dict != dict);
595 proc_discard_active_file (ds);
597 dict_destroy (ds->dict);
600 proc_set_active_file_data (ds, source);
603 /* Replaces the active file's data by READER without replacing
604 the associated dictionary. */
606 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
608 casereader_destroy (ds->source);
611 caseinit_clear (ds->caseinit);
612 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
614 return reader == NULL || !casereader_error (reader);
617 /* Returns true if an active file data source is available, false
620 proc_has_active_file (const struct dataset *ds)
622 return ds->source != NULL;
625 /* Returns the active file data source from DS, or a null pointer
626 if DS has no data source, and removes it from DS. */
628 proc_extract_active_file_data (struct dataset *ds)
630 struct casereader *reader = ds->source;
636 /* Checks whether DS has a corrupted active file. If so,
637 discards it and returns false. If not, returns true without
640 dataset_end_of_command (struct dataset *ds)
642 if (ds->source != NULL)
644 if (casereader_error (ds->source))
646 proc_discard_active_file (ds);
651 const struct taint *taint = casereader_get_taint (ds->source);
652 taint_reset_successor_taint ((struct taint *) taint);
653 assert (!taint_has_tainted_successor (taint));
659 static trns_proc_func case_limit_trns_proc;
660 static trns_free_func case_limit_trns_free;
662 /* Adds a transformation that limits the number of cases that may
663 pass through, if DS->DICT has a case limit. */
665 add_case_limit_trns (struct dataset *ds)
667 casenumber case_limit = dict_get_case_limit (ds->dict);
670 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
671 *cases_remaining = case_limit;
672 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
674 dict_set_case_limit (ds->dict, 0);
678 /* Limits the maximum number of cases processed to
681 case_limit_trns_proc (void *cases_remaining_,
682 struct ccase *c UNUSED, casenumber case_nr UNUSED)
684 size_t *cases_remaining = cases_remaining_;
685 if (*cases_remaining > 0)
687 (*cases_remaining)--;
688 return TRNS_CONTINUE;
691 return TRNS_DROP_CASE;
694 /* Frees the data associated with a case limit transformation. */
696 case_limit_trns_free (void *cases_remaining_)
698 size_t *cases_remaining = cases_remaining_;
699 free (cases_remaining);
703 static trns_proc_func filter_trns_proc;
705 /* Adds a temporary transformation to filter data according to
706 the variable specified on FILTER, if any. */
708 add_filter_trns (struct dataset *ds)
710 struct variable *filter_var = dict_get_filter (ds->dict);
711 if (filter_var != NULL)
713 proc_start_temporary_transformations (ds);
714 add_transformation (ds, filter_trns_proc, NULL, filter_var);
718 /* FILTER transformation. */
720 filter_trns_proc (void *filter_var_,
721 struct ccase *c UNUSED, casenumber case_nr UNUSED)
724 struct variable *filter_var = filter_var_;
725 double f = case_num (c, filter_var);
726 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
727 ? TRNS_CONTINUE : TRNS_DROP_CASE);
732 dataset_dict (const struct dataset *ds)
737 const struct casereader *
738 dataset_source (const struct dataset *ds)
744 dataset_need_lag (struct dataset *ds, int n_before)
746 ds->n_lag = MAX (ds->n_lag, n_before);