1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/procedure.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* Callback which occurs whenever the transformation chain(s) have
65 transformation_change_callback_func *xform_callback;
66 void *xform_callback_aux;
68 /* If true, cases are discarded instead of being written to
72 /* The transformation chain that the next transformation will be
74 struct trns_chain *cur_trns_chain;
76 /* The case map used to compact a case, if necessary;
77 otherwise a null pointer. */
78 struct case_map *compactor;
80 /* Time at which proc was last invoked. */
81 time_t last_proc_invocation;
83 /* Cases just before ("lagging") the current one. */
84 int n_lag; /* Number of cases to lag. */
85 struct deque lag; /* Deque of lagged cases. */
86 struct ccase **lag_cases; /* Lagged cases managed by deque. */
91 PROC_COMMITTED, /* No procedure in progress. */
92 PROC_OPEN, /* proc_open called, casereader still open. */
93 PROC_CLOSED /* casereader from proc_open destroyed,
94 but proc_commit not yet called. */
97 casenumber cases_written; /* Cases output so far. */
98 bool ok; /* Error status. */
99 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
101 void (*callback) (void *); /* Callback for when the dataset changes */
104 }; /* struct dataset */
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
110 static void update_last_proc_invocation (struct dataset *ds);
113 dataset_set_unsaved (const struct dataset *ds)
115 if (ds->callback) ds->callback (ds->cb_data);
119 /* Public functions. */
122 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
125 ds->cb_data = cb_data;
129 /* Returns the last time the data was read. */
131 time_of_last_procedure (struct dataset *ds)
133 if (ds->last_proc_invocation == 0)
134 update_last_proc_invocation (ds);
135 return ds->last_proc_invocation;
138 /* Regular procedure. */
140 /* Executes any pending transformations, if necessary.
141 This is not identical to the EXECUTE command in that it won't
142 always read the source data. This can be important when the
143 source data is given inline within BEGIN DATA...END FILE. */
145 proc_execute (struct dataset *ds)
149 if ((ds->temporary_trns_chain == NULL
150 || trns_chain_is_empty (ds->temporary_trns_chain))
151 && trns_chain_is_empty (ds->permanent_trns_chain))
154 ds->discard_output = false;
155 dict_set_case_limit (ds->dict, 0);
156 dict_clear_vectors (ds->dict);
160 ok = casereader_destroy (proc_open (ds));
161 return proc_commit (ds) && ok;
164 static const struct casereader_class proc_casereader_class;
166 /* Opens dataset DS for reading cases with proc_read.
167 proc_commit must be called when done. */
169 proc_open (struct dataset *ds)
171 struct casereader *reader;
173 assert (ds->source != NULL);
174 assert (ds->proc_state == PROC_COMMITTED);
176 update_last_proc_invocation (ds);
178 caseinit_mark_for_init (ds->caseinit, ds->dict);
180 /* Finish up the collection of transformations. */
181 add_case_limit_trns (ds);
182 add_filter_trns (ds);
183 trns_chain_finalize (ds->cur_trns_chain);
185 /* Make permanent_dict refer to the dictionary right before
186 data reaches the sink. */
187 if (ds->permanent_dict == NULL)
188 ds->permanent_dict = ds->dict;
191 if (!ds->discard_output)
193 struct dictionary *pd = ds->permanent_dict;
194 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
195 if (compacted_value_cnt < dict_get_next_value_idx (pd))
197 struct caseproto *compacted_proto;
198 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
199 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
200 ds->sink = autopaging_writer_create (compacted_proto);
201 caseproto_unref (compacted_proto);
205 ds->compactor = NULL;
206 ds->sink = autopaging_writer_create (dict_get_proto (pd));
211 ds->compactor = NULL;
215 /* Allocate memory for lagged cases. */
216 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
218 ds->proc_state = PROC_OPEN;
219 ds->cases_written = 0;
222 /* FIXME: use taint in dataset in place of `ok'? */
223 /* FIXME: for trivial cases we can just return a clone of
226 /* Create casereader and insert a shim on top. The shim allows us to
227 arbitrarily extend the casereader's lifetime, by slurping the cases into
228 the shim's buffer in proc_commit(). That is especially useful when output
229 table_items are generated directly from the procedure casereader (e.g. by
230 the LIST procedure) when we are using an output driver that keeps a
231 reference to the output items passed to it (e.g. the GUI output driver in
233 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
235 &proc_casereader_class, ds);
236 ds->shim = casereader_shim_insert (reader);
240 /* Returns true if a procedure is in progress, that is, if
241 proc_open has been called but proc_commit has not. */
243 proc_is_open (const struct dataset *ds)
245 return ds->proc_state != PROC_COMMITTED;
248 /* "read" function for procedure casereader. */
249 static struct ccase *
250 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
252 struct dataset *ds = ds_;
253 enum trns_result retval = TRNS_DROP_CASE;
256 assert (ds->proc_state == PROC_OPEN);
257 for (; ; case_unref (c))
261 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
262 if (retval == TRNS_ERROR)
267 /* Read a case from source. */
268 c = casereader_read (ds->source);
271 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
272 caseinit_init_vars (ds->caseinit, c);
274 /* Execute permanent transformations. */
275 case_nr = ds->cases_written + 1;
276 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
278 caseinit_update_left_vars (ds->caseinit, c);
279 if (retval != TRNS_CONTINUE)
282 /* Write case to collection of lagged cases. */
285 while (deque_count (&ds->lag) >= ds->n_lag)
286 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
287 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
290 /* Write case to replacement active file. */
292 if (ds->sink != NULL)
293 casewriter_write (ds->sink,
294 case_map_execute (ds->compactor, case_ref (c)));
296 /* Execute temporary transformations. */
297 if (ds->temporary_trns_chain != NULL)
299 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
300 &c, ds->cases_written);
301 if (retval != TRNS_CONTINUE)
309 /* "destroy" function for procedure casereader. */
311 proc_casereader_destroy (struct casereader *reader, void *ds_)
313 struct dataset *ds = ds_;
316 /* We are always the subreader for a casereader_buffer, so if we're being
317 destroyed then it's because the casereader_buffer has read all the cases
318 that it ever will. */
321 /* Make sure transformations happen for every input case, in
322 case they have side effects, and ensure that the replacement
323 active file gets all the cases it should. */
324 while ((c = casereader_read (reader)) != NULL)
327 ds->proc_state = PROC_CLOSED;
328 ds->ok = casereader_destroy (ds->source) && ds->ok;
330 proc_set_active_file_data (ds, NULL);
333 /* Must return false if the source casereader, a transformation,
334 or the sink casewriter signaled an error. (If a temporary
335 transformation signals an error, then the return value is
336 false, but the replacement active file may still be
339 proc_commit (struct dataset *ds)
341 if (ds->shim != NULL)
342 casereader_shim_slurp (ds->shim);
344 assert (ds->proc_state == PROC_CLOSED);
345 ds->proc_state = PROC_COMMITTED;
347 dataset_set_unsaved (ds);
349 /* Free memory for lagged cases. */
350 while (!deque_is_empty (&ds->lag))
351 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
352 free (ds->lag_cases);
354 /* Dictionary from before TEMPORARY becomes permanent. */
355 proc_cancel_temporary_transformations (ds);
357 if (!ds->discard_output)
359 /* Finish compacting. */
360 if (ds->compactor != NULL)
362 case_map_destroy (ds->compactor);
363 ds->compactor = NULL;
365 dict_delete_scratch_vars (ds->dict);
366 dict_compact_values (ds->dict);
369 /* Old data sink becomes new data source. */
370 if (ds->sink != NULL)
371 ds->source = casewriter_make_reader (ds->sink);
376 ds->discard_output = false;
380 caseinit_clear (ds->caseinit);
381 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
383 dict_clear_vectors (ds->dict);
384 ds->permanent_dict = NULL;
385 return proc_cancel_all_transformations (ds) && ds->ok;
388 /* Casereader class for procedure execution. */
389 static const struct casereader_class proc_casereader_class =
391 proc_casereader_read,
392 proc_casereader_destroy,
397 /* Updates last_proc_invocation. */
399 update_last_proc_invocation (struct dataset *ds)
401 ds->last_proc_invocation = time (NULL);
404 /* Returns a pointer to the lagged case from N_BEFORE cases before the
405 current one, or NULL if there haven't been that many cases yet. */
407 lagged_case (const struct dataset *ds, int n_before)
409 assert (n_before >= 1);
410 assert (n_before <= ds->n_lag);
412 if (n_before <= deque_count (&ds->lag))
413 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
418 /* Returns the current set of permanent transformations,
419 and clears the permanent transformations.
420 For use by INPUT PROGRAM. */
422 proc_capture_transformations (struct dataset *ds)
424 struct trns_chain *chain;
426 assert (ds->temporary_trns_chain == NULL);
427 chain = ds->permanent_trns_chain;
428 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
430 if ( ds->xform_callback)
431 ds->xform_callback (false, ds->xform_callback_aux);
436 /* Adds a transformation that processes a case with PROC and
437 frees itself with FREE to the current set of transformations.
438 The functions are passed AUX as auxiliary data. */
440 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
442 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
443 if ( ds->xform_callback)
444 ds->xform_callback (true, ds->xform_callback_aux);
447 /* Adds a transformation that processes a case with PROC and
448 frees itself with FREE to the current set of transformations.
449 When parsing of the block of transformations is complete,
450 FINALIZE will be called.
451 The functions are passed AUX as auxiliary data. */
453 add_transformation_with_finalizer (struct dataset *ds,
454 trns_finalize_func *finalize,
455 trns_proc_func *proc,
456 trns_free_func *free, void *aux)
458 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
460 if ( ds->xform_callback)
461 ds->xform_callback (true, ds->xform_callback_aux);
464 /* Returns the index of the next transformation.
465 This value can be returned by a transformation procedure
466 function to indicate a "jump" to that transformation. */
468 next_transformation (const struct dataset *ds)
470 return trns_chain_next (ds->cur_trns_chain);
473 /* Returns true if the next call to add_transformation() will add
474 a temporary transformation, false if it will add a permanent
477 proc_in_temporary_transformations (const struct dataset *ds)
479 return ds->temporary_trns_chain != NULL;
482 /* Marks the start of temporary transformations.
483 Further calls to add_transformation() will add temporary
486 proc_start_temporary_transformations (struct dataset *ds)
488 if (!proc_in_temporary_transformations (ds))
490 add_case_limit_trns (ds);
492 ds->permanent_dict = dict_clone (ds->dict);
494 trns_chain_finalize (ds->permanent_trns_chain);
495 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
497 if ( ds->xform_callback)
498 ds->xform_callback (true, ds->xform_callback_aux);
502 /* Converts all the temporary transformations, if any, to
503 permanent transformations. Further transformations will be
505 Returns true if anything changed, false otherwise. */
507 proc_make_temporary_transformations_permanent (struct dataset *ds)
509 if (proc_in_temporary_transformations (ds))
511 trns_chain_finalize (ds->temporary_trns_chain);
512 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
513 ds->temporary_trns_chain = NULL;
515 dict_destroy (ds->permanent_dict);
516 ds->permanent_dict = NULL;
524 /* Cancels all temporary transformations, if any. Further
525 transformations will be permanent.
526 Returns true if anything changed, false otherwise. */
528 proc_cancel_temporary_transformations (struct dataset *ds)
530 if (proc_in_temporary_transformations (ds))
532 dict_destroy (ds->dict);
533 ds->dict = ds->permanent_dict;
534 ds->permanent_dict = NULL;
536 trns_chain_destroy (ds->temporary_trns_chain);
537 ds->temporary_trns_chain = NULL;
539 if ( ds->xform_callback)
540 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
541 ds->xform_callback_aux);
549 /* Cancels all transformations, if any.
550 Returns true if successful, false on I/O error. */
552 proc_cancel_all_transformations (struct dataset *ds)
555 assert (ds->proc_state == PROC_COMMITTED);
556 ok = trns_chain_destroy (ds->permanent_trns_chain);
557 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
558 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
559 ds->temporary_trns_chain = NULL;
560 if ( ds->xform_callback)
561 ds->xform_callback (false, ds->xform_callback_aux);
568 dict_callback (struct dictionary *d UNUSED, void *ds_)
570 struct dataset *ds = ds_;
571 dataset_set_unsaved (ds);
574 /* Initializes procedure handling. */
576 create_dataset (void)
578 struct dataset *ds = xzalloc (sizeof(*ds));
579 ds->dict = dict_create ();
581 dict_set_change_callback (ds->dict, dict_callback, ds);
583 dict_set_encoding (ds->dict, get_default_encoding ());
585 ds->caseinit = caseinit_create ();
586 proc_cancel_all_transformations (ds);
592 dataset_add_transform_change_callback (struct dataset *ds,
593 transformation_change_callback_func *cb,
596 ds->xform_callback = cb;
597 ds->xform_callback_aux = aux;
600 /* Finishes up procedure handling. */
602 destroy_dataset (struct dataset *ds)
604 proc_discard_active_file (ds);
605 dict_destroy (ds->dict);
606 caseinit_destroy (ds->caseinit);
607 trns_chain_destroy (ds->permanent_trns_chain);
609 if ( ds->xform_callback)
610 ds->xform_callback (false, ds->xform_callback_aux);
614 /* Causes output from the next procedure to be discarded, instead
615 of being preserved for use as input for the next procedure. */
617 proc_discard_output (struct dataset *ds)
619 ds->discard_output = true;
622 /* Discards the active file dictionary, data, and
625 proc_discard_active_file (struct dataset *ds)
627 assert (ds->proc_state == PROC_COMMITTED);
629 dict_clear (ds->dict);
630 fh_set_default_handle (NULL);
634 casereader_destroy (ds->source);
637 proc_cancel_all_transformations (ds);
640 /* Sets SOURCE as the source for procedure input for the next
643 proc_set_active_file (struct dataset *ds,
644 struct casereader *source,
645 struct dictionary *dict)
647 assert (ds->proc_state == PROC_COMMITTED);
648 assert (ds->dict != dict);
650 proc_discard_active_file (ds);
652 dict_destroy (ds->dict);
654 dict_set_change_callback (ds->dict, dict_callback, ds);
656 proc_set_active_file_data (ds, source);
659 /* Replaces the active file's data by READER without replacing
660 the associated dictionary. */
662 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
664 casereader_destroy (ds->source);
667 caseinit_clear (ds->caseinit);
668 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
670 return reader == NULL || !casereader_error (reader);
673 /* Returns true if an active file data source is available, false
676 proc_has_active_file (const struct dataset *ds)
678 return ds->source != NULL;
681 /* Returns the active file data source from DS, or a null pointer
682 if DS has no data source, and removes it from DS. */
684 proc_extract_active_file_data (struct dataset *ds)
686 struct casereader *reader = ds->source;
692 /* Checks whether DS has a corrupted active file. If so,
693 discards it and returns false. If not, returns true without
696 dataset_end_of_command (struct dataset *ds)
698 if (ds->source != NULL)
700 if (casereader_error (ds->source))
702 proc_discard_active_file (ds);
707 const struct taint *taint = casereader_get_taint (ds->source);
708 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
709 assert (!taint_has_tainted_successor (taint));
715 static trns_proc_func case_limit_trns_proc;
716 static trns_free_func case_limit_trns_free;
718 /* Adds a transformation that limits the number of cases that may
719 pass through, if DS->DICT has a case limit. */
721 add_case_limit_trns (struct dataset *ds)
723 casenumber case_limit = dict_get_case_limit (ds->dict);
726 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
727 *cases_remaining = case_limit;
728 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
730 dict_set_case_limit (ds->dict, 0);
734 /* Limits the maximum number of cases processed to
737 case_limit_trns_proc (void *cases_remaining_,
738 struct ccase **c UNUSED, casenumber case_nr UNUSED)
740 size_t *cases_remaining = cases_remaining_;
741 if (*cases_remaining > 0)
743 (*cases_remaining)--;
744 return TRNS_CONTINUE;
747 return TRNS_DROP_CASE;
750 /* Frees the data associated with a case limit transformation. */
752 case_limit_trns_free (void *cases_remaining_)
754 size_t *cases_remaining = cases_remaining_;
755 free (cases_remaining);
759 static trns_proc_func filter_trns_proc;
761 /* Adds a temporary transformation to filter data according to
762 the variable specified on FILTER, if any. */
764 add_filter_trns (struct dataset *ds)
766 struct variable *filter_var = dict_get_filter (ds->dict);
767 if (filter_var != NULL)
769 proc_start_temporary_transformations (ds);
770 add_transformation (ds, filter_trns_proc, NULL, filter_var);
774 /* FILTER transformation. */
776 filter_trns_proc (void *filter_var_,
777 struct ccase **c UNUSED, casenumber case_nr UNUSED)
780 struct variable *filter_var = filter_var_;
781 double f = case_num (*c, filter_var);
782 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
783 ? TRNS_CONTINUE : TRNS_DROP_CASE);
788 dataset_dict (const struct dataset *ds)
793 const struct casereader *
794 dataset_source (const struct dataset *ds)
800 dataset_need_lag (struct dataset *ds, int n_before)
802 ds->n_lag = MAX (ds->n_lag, n_before);