1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39 #include <libpspp/i18n.h>
44 /* Cases are read from source,
45 their transformation variables are initialized,
46 pass through permanent_trns_chain (which transforms them into
47 the format described by permanent_dict),
49 pass through temporary_trns_chain (which transforms them into
50 the format described by dict),
51 and are finally passed to the procedure. */
52 struct casereader *source;
53 struct caseinit *caseinit;
54 struct trns_chain *permanent_trns_chain;
55 struct dictionary *permanent_dict;
56 struct casewriter *sink;
57 struct trns_chain *temporary_trns_chain;
58 struct dictionary *dict;
60 /* Callback which occurs whenever the transformation chain(s) have
62 transformation_change_callback_func *xform_callback;
63 void *xform_callback_aux;
65 /* If true, cases are discarded instead of being written to
69 /* The transformation chain that the next transformation will be
71 struct trns_chain *cur_trns_chain;
73 /* The case map used to compact a case, if necessary;
74 otherwise a null pointer. */
75 struct case_map *compactor;
77 /* Time at which proc was last invoked. */
78 time_t last_proc_invocation;
80 /* Cases just before ("lagging") the current one. */
81 int n_lag; /* Number of cases to lag. */
82 struct deque lag; /* Deque of lagged cases. */
83 struct ccase **lag_cases; /* Lagged cases managed by deque. */
88 PROC_COMMITTED, /* No procedure in progress. */
89 PROC_OPEN, /* proc_open called, casereader still open. */
90 PROC_CLOSED /* casereader from proc_open destroyed,
91 but proc_commit not yet called. */
94 casenumber cases_written; /* Cases output so far. */
95 bool ok; /* Error status. */
97 void (*callback) (void *); /* Callback for when the dataset changes */
100 }; /* struct dataset */
103 static void add_case_limit_trns (struct dataset *ds);
104 static void add_filter_trns (struct dataset *ds);
106 static void update_last_proc_invocation (struct dataset *ds);
109 dataset_set_unsaved (const struct dataset *ds)
111 if (ds->callback) ds->callback (ds->cb_data);
115 /* Public functions. */
118 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
121 ds->cb_data = cb_data;
125 /* Returns the last time the data was read. */
127 time_of_last_procedure (struct dataset *ds)
129 if (ds->last_proc_invocation == 0)
130 update_last_proc_invocation (ds);
131 return ds->last_proc_invocation;
134 /* Regular procedure. */
136 /* Executes any pending transformations, if necessary.
137 This is not identical to the EXECUTE command in that it won't
138 always read the source data. This can be important when the
139 source data is given inline within BEGIN DATA...END FILE. */
141 proc_execute (struct dataset *ds)
145 if ((ds->temporary_trns_chain == NULL
146 || trns_chain_is_empty (ds->temporary_trns_chain))
147 && trns_chain_is_empty (ds->permanent_trns_chain))
150 ds->discard_output = false;
151 dict_set_case_limit (ds->dict, 0);
152 dict_clear_vectors (ds->dict);
156 ok = casereader_destroy (proc_open (ds));
157 return proc_commit (ds) && ok;
160 static const struct casereader_class proc_casereader_class;
162 /* Opens dataset DS for reading cases with proc_read.
163 proc_commit must be called when done. */
165 proc_open (struct dataset *ds)
167 assert (ds->source != NULL);
168 assert (ds->proc_state == PROC_COMMITTED);
170 update_last_proc_invocation (ds);
172 caseinit_mark_for_init (ds->caseinit, ds->dict);
174 /* Finish up the collection of transformations. */
175 add_case_limit_trns (ds);
176 add_filter_trns (ds);
177 trns_chain_finalize (ds->cur_trns_chain);
179 /* Make permanent_dict refer to the dictionary right before
180 data reaches the sink. */
181 if (ds->permanent_dict == NULL)
182 ds->permanent_dict = ds->dict;
185 if (!ds->discard_output)
187 struct dictionary *pd = ds->permanent_dict;
188 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
189 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
190 ds->compactor = (should_compact
191 ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
193 ds->sink = autopaging_writer_create (compacted_value_cnt);
197 ds->compactor = NULL;
201 /* Allocate memory for lagged cases. */
202 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
204 ds->proc_state = PROC_OPEN;
205 ds->cases_written = 0;
208 /* FIXME: use taint in dataset in place of `ok'? */
209 /* FIXME: for trivial cases we can just return a clone of
211 return casereader_create_sequential (NULL,
212 dict_get_next_value_idx (ds->dict),
214 &proc_casereader_class, ds);
217 /* Returns true if a procedure is in progress, that is, if
218 proc_open has been called but proc_commit has not. */
220 proc_is_open (const struct dataset *ds)
222 return ds->proc_state != PROC_COMMITTED;
225 /* "read" function for procedure casereader. */
226 static struct ccase *
227 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
229 struct dataset *ds = ds_;
230 enum trns_result retval = TRNS_DROP_CASE;
233 assert (ds->proc_state == PROC_OPEN);
234 for (; ; case_unref (c))
238 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
239 if (retval == TRNS_ERROR)
244 /* Read a case from source. */
245 c = casereader_read (ds->source);
248 c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
249 caseinit_init_vars (ds->caseinit, c);
251 /* Execute permanent transformations. */
252 case_nr = ds->cases_written + 1;
253 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
255 caseinit_update_left_vars (ds->caseinit, c);
256 if (retval != TRNS_CONTINUE)
259 /* Write case to collection of lagged cases. */
262 while (deque_count (&ds->lag) >= ds->n_lag)
263 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
264 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
267 /* Write case to replacement active file. */
269 if (ds->sink != NULL)
270 casewriter_write (ds->sink,
271 case_map_execute (ds->compactor, case_ref (c)));
273 /* Execute temporary transformations. */
274 if (ds->temporary_trns_chain != NULL)
276 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
277 &c, ds->cases_written);
278 if (retval != TRNS_CONTINUE)
286 /* "destroy" function for procedure casereader. */
288 proc_casereader_destroy (struct casereader *reader, void *ds_)
290 struct dataset *ds = ds_;
293 /* Make sure transformations happen for every input case, in
294 case they have side effects, and ensure that the replacement
295 active file gets all the cases it should. */
296 while ((c = casereader_read (reader)) != NULL)
299 ds->proc_state = PROC_CLOSED;
300 ds->ok = casereader_destroy (ds->source) && ds->ok;
302 proc_set_active_file_data (ds, NULL);
305 /* Must return false if the source casereader, a transformation,
306 or the sink casewriter signaled an error. (If a temporary
307 transformation signals an error, then the return value is
308 false, but the replacement active file may still be
311 proc_commit (struct dataset *ds)
313 assert (ds->proc_state == PROC_CLOSED);
314 ds->proc_state = PROC_COMMITTED;
316 dataset_set_unsaved (ds);
318 /* Free memory for lagged cases. */
319 while (!deque_is_empty (&ds->lag))
320 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
321 free (ds->lag_cases);
323 /* Dictionary from before TEMPORARY becomes permanent. */
324 proc_cancel_temporary_transformations (ds);
326 if (!ds->discard_output)
328 /* Finish compacting. */
329 if (ds->compactor != NULL)
331 case_map_destroy (ds->compactor);
332 ds->compactor = NULL;
334 dict_delete_scratch_vars (ds->dict);
335 dict_compact_values (ds->dict);
338 /* Old data sink becomes new data source. */
339 if (ds->sink != NULL)
340 ds->source = casewriter_make_reader (ds->sink);
345 ds->discard_output = false;
349 caseinit_clear (ds->caseinit);
350 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
352 dict_clear_vectors (ds->dict);
353 ds->permanent_dict = NULL;
354 return proc_cancel_all_transformations (ds) && ds->ok;
357 /* Casereader class for procedure execution. */
358 static const struct casereader_class proc_casereader_class =
360 proc_casereader_read,
361 proc_casereader_destroy,
366 /* Updates last_proc_invocation. */
368 update_last_proc_invocation (struct dataset *ds)
370 ds->last_proc_invocation = time (NULL);
373 /* Returns a pointer to the lagged case from N_BEFORE cases before the
374 current one, or NULL if there haven't been that many cases yet. */
376 lagged_case (const struct dataset *ds, int n_before)
378 assert (n_before >= 1);
379 assert (n_before <= ds->n_lag);
381 if (n_before <= deque_count (&ds->lag))
382 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
387 /* Returns the current set of permanent transformations,
388 and clears the permanent transformations.
389 For use by INPUT PROGRAM. */
391 proc_capture_transformations (struct dataset *ds)
393 struct trns_chain *chain;
395 assert (ds->temporary_trns_chain == NULL);
396 chain = ds->permanent_trns_chain;
397 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
399 if ( ds->xform_callback)
400 ds->xform_callback (false, ds->xform_callback_aux);
405 /* Adds a transformation that processes a case with PROC and
406 frees itself with FREE to the current set of transformations.
407 The functions are passed AUX as auxiliary data. */
409 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
411 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
412 if ( ds->xform_callback)
413 ds->xform_callback (true, ds->xform_callback_aux);
416 /* Adds a transformation that processes a case with PROC and
417 frees itself with FREE to the current set of transformations.
418 When parsing of the block of transformations is complete,
419 FINALIZE will be called.
420 The functions are passed AUX as auxiliary data. */
422 add_transformation_with_finalizer (struct dataset *ds,
423 trns_finalize_func *finalize,
424 trns_proc_func *proc,
425 trns_free_func *free, void *aux)
427 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
429 if ( ds->xform_callback)
430 ds->xform_callback (true, ds->xform_callback_aux);
433 /* Returns the index of the next transformation.
434 This value can be returned by a transformation procedure
435 function to indicate a "jump" to that transformation. */
437 next_transformation (const struct dataset *ds)
439 return trns_chain_next (ds->cur_trns_chain);
442 /* Returns true if the next call to add_transformation() will add
443 a temporary transformation, false if it will add a permanent
446 proc_in_temporary_transformations (const struct dataset *ds)
448 return ds->temporary_trns_chain != NULL;
451 /* Marks the start of temporary transformations.
452 Further calls to add_transformation() will add temporary
455 proc_start_temporary_transformations (struct dataset *ds)
457 if (!proc_in_temporary_transformations (ds))
459 add_case_limit_trns (ds);
461 ds->permanent_dict = dict_clone (ds->dict);
463 trns_chain_finalize (ds->permanent_trns_chain);
464 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
466 if ( ds->xform_callback)
467 ds->xform_callback (true, ds->xform_callback_aux);
471 /* Converts all the temporary transformations, if any, to
472 permanent transformations. Further transformations will be
474 Returns true if anything changed, false otherwise. */
476 proc_make_temporary_transformations_permanent (struct dataset *ds)
478 if (proc_in_temporary_transformations (ds))
480 trns_chain_finalize (ds->temporary_trns_chain);
481 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
482 ds->temporary_trns_chain = NULL;
484 dict_destroy (ds->permanent_dict);
485 ds->permanent_dict = NULL;
493 /* Cancels all temporary transformations, if any. Further
494 transformations will be permanent.
495 Returns true if anything changed, false otherwise. */
497 proc_cancel_temporary_transformations (struct dataset *ds)
499 if (proc_in_temporary_transformations (ds))
501 dict_destroy (ds->dict);
502 ds->dict = ds->permanent_dict;
503 ds->permanent_dict = NULL;
505 trns_chain_destroy (ds->temporary_trns_chain);
506 ds->temporary_trns_chain = NULL;
508 if ( ds->xform_callback)
509 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
510 ds->xform_callback_aux);
518 /* Cancels all transformations, if any.
519 Returns true if successful, false on I/O error. */
521 proc_cancel_all_transformations (struct dataset *ds)
524 assert (ds->proc_state == PROC_COMMITTED);
525 ok = trns_chain_destroy (ds->permanent_trns_chain);
526 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
527 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
528 ds->temporary_trns_chain = NULL;
529 if ( ds->xform_callback)
530 ds->xform_callback (false, ds->xform_callback_aux);
537 dict_callback (struct dictionary *d UNUSED, void *ds_)
539 struct dataset *ds = ds_;
540 dataset_set_unsaved (ds);
543 /* Initializes procedure handling. */
545 create_dataset (void)
547 struct dataset *ds = xzalloc (sizeof(*ds));
548 ds->dict = dict_create ();
550 dict_set_change_callback (ds->dict, dict_callback, ds);
552 dict_set_encoding (ds->dict, get_default_encoding ());
554 ds->caseinit = caseinit_create ();
555 proc_cancel_all_transformations (ds);
561 dataset_add_transform_change_callback (struct dataset *ds,
562 transformation_change_callback_func *cb,
565 ds->xform_callback = cb;
566 ds->xform_callback_aux = aux;
569 /* Finishes up procedure handling. */
571 destroy_dataset (struct dataset *ds)
573 proc_discard_active_file (ds);
574 dict_destroy (ds->dict);
575 caseinit_destroy (ds->caseinit);
576 trns_chain_destroy (ds->permanent_trns_chain);
578 if ( ds->xform_callback)
579 ds->xform_callback (false, ds->xform_callback_aux);
583 /* Causes output from the next procedure to be discarded, instead
584 of being preserved for use as input for the next procedure. */
586 proc_discard_output (struct dataset *ds)
588 ds->discard_output = true;
591 /* Discards the active file dictionary, data, and
594 proc_discard_active_file (struct dataset *ds)
596 assert (ds->proc_state == PROC_COMMITTED);
598 dict_clear (ds->dict);
599 fh_set_default_handle (NULL);
603 casereader_destroy (ds->source);
606 proc_cancel_all_transformations (ds);
609 /* Sets SOURCE as the source for procedure input for the next
612 proc_set_active_file (struct dataset *ds,
613 struct casereader *source,
614 struct dictionary *dict)
616 assert (ds->proc_state == PROC_COMMITTED);
617 assert (ds->dict != dict);
619 proc_discard_active_file (ds);
621 dict_destroy (ds->dict);
623 dict_set_change_callback (ds->dict, dict_callback, ds);
625 proc_set_active_file_data (ds, source);
628 /* Replaces the active file's data by READER without replacing
629 the associated dictionary. */
631 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
633 casereader_destroy (ds->source);
636 caseinit_clear (ds->caseinit);
637 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
639 return reader == NULL || !casereader_error (reader);
642 /* Returns true if an active file data source is available, false
645 proc_has_active_file (const struct dataset *ds)
647 return ds->source != NULL;
650 /* Returns the active file data source from DS, or a null pointer
651 if DS has no data source, and removes it from DS. */
653 proc_extract_active_file_data (struct dataset *ds)
655 struct casereader *reader = ds->source;
661 /* Checks whether DS has a corrupted active file. If so,
662 discards it and returns false. If not, returns true without
665 dataset_end_of_command (struct dataset *ds)
667 if (ds->source != NULL)
669 if (casereader_error (ds->source))
671 proc_discard_active_file (ds);
676 const struct taint *taint = casereader_get_taint (ds->source);
677 taint_reset_successor_taint ((struct taint *) taint);
678 assert (!taint_has_tainted_successor (taint));
684 static trns_proc_func case_limit_trns_proc;
685 static trns_free_func case_limit_trns_free;
687 /* Adds a transformation that limits the number of cases that may
688 pass through, if DS->DICT has a case limit. */
690 add_case_limit_trns (struct dataset *ds)
692 casenumber case_limit = dict_get_case_limit (ds->dict);
695 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
696 *cases_remaining = case_limit;
697 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
699 dict_set_case_limit (ds->dict, 0);
703 /* Limits the maximum number of cases processed to
706 case_limit_trns_proc (void *cases_remaining_,
707 struct ccase **c UNUSED, casenumber case_nr UNUSED)
709 size_t *cases_remaining = cases_remaining_;
710 if (*cases_remaining > 0)
712 (*cases_remaining)--;
713 return TRNS_CONTINUE;
716 return TRNS_DROP_CASE;
719 /* Frees the data associated with a case limit transformation. */
721 case_limit_trns_free (void *cases_remaining_)
723 size_t *cases_remaining = cases_remaining_;
724 free (cases_remaining);
728 static trns_proc_func filter_trns_proc;
730 /* Adds a temporary transformation to filter data according to
731 the variable specified on FILTER, if any. */
733 add_filter_trns (struct dataset *ds)
735 struct variable *filter_var = dict_get_filter (ds->dict);
736 if (filter_var != NULL)
738 proc_start_temporary_transformations (ds);
739 add_transformation (ds, filter_trns_proc, NULL, filter_var);
743 /* FILTER transformation. */
745 filter_trns_proc (void *filter_var_,
746 struct ccase **c UNUSED, casenumber case_nr UNUSED)
749 struct variable *filter_var = filter_var_;
750 double f = case_num (*c, filter_var);
751 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
752 ? TRNS_CONTINUE : TRNS_DROP_CASE);
757 dataset_dict (const struct dataset *ds)
762 const struct casereader *
763 dataset_source (const struct dataset *ds)
769 dataset_need_lag (struct dataset *ds, int n_before)
771 ds->n_lag = MAX (ds->n_lag, n_before);