1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/procedure.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* Callback which occurs whenever the transformation chain(s) have
65 transformation_change_callback_func *xform_callback;
66 void *xform_callback_aux;
68 /* If true, cases are discarded instead of being written to
72 /* The transformation chain that the next transformation will be
74 struct trns_chain *cur_trns_chain;
76 /* The case map used to compact a case, if necessary;
77 otherwise a null pointer. */
78 struct case_map *compactor;
80 /* Time at which proc was last invoked. */
81 time_t last_proc_invocation;
83 /* Cases just before ("lagging") the current one. */
84 int n_lag; /* Number of cases to lag. */
85 struct deque lag; /* Deque of lagged cases. */
86 struct ccase **lag_cases; /* Lagged cases managed by deque. */
91 PROC_COMMITTED, /* No procedure in progress. */
92 PROC_OPEN, /* proc_open called, casereader still open. */
93 PROC_CLOSED /* casereader from proc_open destroyed,
94 but proc_commit not yet called. */
97 casenumber cases_written; /* Cases output so far. */
98 bool ok; /* Error status. */
99 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
101 void (*callback) (void *); /* Callback for when the dataset changes */
104 }; /* struct dataset */
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
110 static void update_last_proc_invocation (struct dataset *ds);
113 dataset_set_unsaved (const struct dataset *ds)
115 if (ds->callback) ds->callback (ds->cb_data);
119 /* Public functions. */
122 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
125 ds->cb_data = cb_data;
129 /* Returns the last time the data was read. */
131 time_of_last_procedure (struct dataset *ds)
133 if (ds->last_proc_invocation == 0)
134 update_last_proc_invocation (ds);
135 return ds->last_proc_invocation;
138 /* Regular procedure. */
140 /* Executes any pending transformations, if necessary.
141 This is not identical to the EXECUTE command in that it won't
142 always read the source data. This can be important when the
143 source data is given inline within BEGIN DATA...END FILE. */
145 proc_execute (struct dataset *ds)
149 if ((ds->temporary_trns_chain == NULL
150 || trns_chain_is_empty (ds->temporary_trns_chain))
151 && trns_chain_is_empty (ds->permanent_trns_chain))
154 ds->discard_output = false;
155 dict_set_case_limit (ds->dict, 0);
156 dict_clear_vectors (ds->dict);
160 ok = casereader_destroy (proc_open (ds));
161 return proc_commit (ds) && ok;
164 static const struct casereader_class proc_casereader_class;
166 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
167 cases filtered out with FILTER BY will not be included in the casereader
168 (which is usually desirable). If FILTER is false, all cases will be
169 included regardless of FILTER BY settings.
171 proc_commit must be called when done. */
173 proc_open_filtering (struct dataset *ds, bool filter)
175 struct casereader *reader;
177 assert (ds->source != NULL);
178 assert (ds->proc_state == PROC_COMMITTED);
180 update_last_proc_invocation (ds);
182 caseinit_mark_for_init (ds->caseinit, ds->dict);
184 /* Finish up the collection of transformations. */
185 add_case_limit_trns (ds);
187 add_filter_trns (ds);
188 trns_chain_finalize (ds->cur_trns_chain);
190 /* Make permanent_dict refer to the dictionary right before
191 data reaches the sink. */
192 if (ds->permanent_dict == NULL)
193 ds->permanent_dict = ds->dict;
196 if (!ds->discard_output)
198 struct dictionary *pd = ds->permanent_dict;
199 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
200 if (compacted_value_cnt < dict_get_next_value_idx (pd))
202 struct caseproto *compacted_proto;
203 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
204 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
205 ds->sink = autopaging_writer_create (compacted_proto);
206 caseproto_unref (compacted_proto);
210 ds->compactor = NULL;
211 ds->sink = autopaging_writer_create (dict_get_proto (pd));
216 ds->compactor = NULL;
220 /* Allocate memory for lagged cases. */
221 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
223 ds->proc_state = PROC_OPEN;
224 ds->cases_written = 0;
227 /* FIXME: use taint in dataset in place of `ok'? */
228 /* FIXME: for trivial cases we can just return a clone of
231 /* Create casereader and insert a shim on top. The shim allows us to
232 arbitrarily extend the casereader's lifetime, by slurping the cases into
233 the shim's buffer in proc_commit(). That is especially useful when output
234 table_items are generated directly from the procedure casereader (e.g. by
235 the LIST procedure) when we are using an output driver that keeps a
236 reference to the output items passed to it (e.g. the GUI output driver in
238 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
240 &proc_casereader_class, ds);
241 ds->shim = casereader_shim_insert (reader);
245 /* Opens dataset DS for reading cases with proc_read.
246 proc_commit must be called when done. */
248 proc_open (struct dataset *ds)
250 return proc_open_filtering (ds, true);
253 /* Returns true if a procedure is in progress, that is, if
254 proc_open has been called but proc_commit has not. */
256 proc_is_open (const struct dataset *ds)
258 return ds->proc_state != PROC_COMMITTED;
261 /* "read" function for procedure casereader. */
262 static struct ccase *
263 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
265 struct dataset *ds = ds_;
266 enum trns_result retval = TRNS_DROP_CASE;
269 assert (ds->proc_state == PROC_OPEN);
270 for (; ; case_unref (c))
274 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
275 if (retval == TRNS_ERROR)
280 /* Read a case from source. */
281 c = casereader_read (ds->source);
284 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
285 caseinit_init_vars (ds->caseinit, c);
287 /* Execute permanent transformations. */
288 case_nr = ds->cases_written + 1;
289 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
291 caseinit_update_left_vars (ds->caseinit, c);
292 if (retval != TRNS_CONTINUE)
295 /* Write case to collection of lagged cases. */
298 while (deque_count (&ds->lag) >= ds->n_lag)
299 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
300 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
303 /* Write case to replacement active file. */
305 if (ds->sink != NULL)
306 casewriter_write (ds->sink,
307 case_map_execute (ds->compactor, case_ref (c)));
309 /* Execute temporary transformations. */
310 if (ds->temporary_trns_chain != NULL)
312 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
313 &c, ds->cases_written);
314 if (retval != TRNS_CONTINUE)
322 /* "destroy" function for procedure casereader. */
324 proc_casereader_destroy (struct casereader *reader, void *ds_)
326 struct dataset *ds = ds_;
329 /* We are always the subreader for a casereader_buffer, so if we're being
330 destroyed then it's because the casereader_buffer has read all the cases
331 that it ever will. */
334 /* Make sure transformations happen for every input case, in
335 case they have side effects, and ensure that the replacement
336 active file gets all the cases it should. */
337 while ((c = casereader_read (reader)) != NULL)
340 ds->proc_state = PROC_CLOSED;
341 ds->ok = casereader_destroy (ds->source) && ds->ok;
343 proc_set_active_file_data (ds, NULL);
346 /* Must return false if the source casereader, a transformation,
347 or the sink casewriter signaled an error. (If a temporary
348 transformation signals an error, then the return value is
349 false, but the replacement active file may still be
352 proc_commit (struct dataset *ds)
354 if (ds->shim != NULL)
355 casereader_shim_slurp (ds->shim);
357 assert (ds->proc_state == PROC_CLOSED);
358 ds->proc_state = PROC_COMMITTED;
360 dataset_set_unsaved (ds);
362 /* Free memory for lagged cases. */
363 while (!deque_is_empty (&ds->lag))
364 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
365 free (ds->lag_cases);
367 /* Dictionary from before TEMPORARY becomes permanent. */
368 proc_cancel_temporary_transformations (ds);
370 if (!ds->discard_output)
372 /* Finish compacting. */
373 if (ds->compactor != NULL)
375 case_map_destroy (ds->compactor);
376 ds->compactor = NULL;
378 dict_delete_scratch_vars (ds->dict);
379 dict_compact_values (ds->dict);
382 /* Old data sink becomes new data source. */
383 if (ds->sink != NULL)
384 ds->source = casewriter_make_reader (ds->sink);
389 ds->discard_output = false;
393 caseinit_clear (ds->caseinit);
394 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
396 dict_clear_vectors (ds->dict);
397 ds->permanent_dict = NULL;
398 return proc_cancel_all_transformations (ds) && ds->ok;
401 /* Casereader class for procedure execution. */
402 static const struct casereader_class proc_casereader_class =
404 proc_casereader_read,
405 proc_casereader_destroy,
410 /* Updates last_proc_invocation. */
412 update_last_proc_invocation (struct dataset *ds)
414 ds->last_proc_invocation = time (NULL);
417 /* Returns a pointer to the lagged case from N_BEFORE cases before the
418 current one, or NULL if there haven't been that many cases yet. */
420 lagged_case (const struct dataset *ds, int n_before)
422 assert (n_before >= 1);
423 assert (n_before <= ds->n_lag);
425 if (n_before <= deque_count (&ds->lag))
426 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
431 /* Returns the current set of permanent transformations,
432 and clears the permanent transformations.
433 For use by INPUT PROGRAM. */
435 proc_capture_transformations (struct dataset *ds)
437 struct trns_chain *chain;
439 assert (ds->temporary_trns_chain == NULL);
440 chain = ds->permanent_trns_chain;
441 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
443 if ( ds->xform_callback)
444 ds->xform_callback (false, ds->xform_callback_aux);
449 /* Adds a transformation that processes a case with PROC and
450 frees itself with FREE to the current set of transformations.
451 The functions are passed AUX as auxiliary data. */
453 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
455 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
456 if ( ds->xform_callback)
457 ds->xform_callback (true, ds->xform_callback_aux);
460 /* Adds a transformation that processes a case with PROC and
461 frees itself with FREE to the current set of transformations.
462 When parsing of the block of transformations is complete,
463 FINALIZE will be called.
464 The functions are passed AUX as auxiliary data. */
466 add_transformation_with_finalizer (struct dataset *ds,
467 trns_finalize_func *finalize,
468 trns_proc_func *proc,
469 trns_free_func *free, void *aux)
471 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
473 if ( ds->xform_callback)
474 ds->xform_callback (true, ds->xform_callback_aux);
477 /* Returns the index of the next transformation.
478 This value can be returned by a transformation procedure
479 function to indicate a "jump" to that transformation. */
481 next_transformation (const struct dataset *ds)
483 return trns_chain_next (ds->cur_trns_chain);
486 /* Returns true if the next call to add_transformation() will add
487 a temporary transformation, false if it will add a permanent
490 proc_in_temporary_transformations (const struct dataset *ds)
492 return ds->temporary_trns_chain != NULL;
495 /* Marks the start of temporary transformations.
496 Further calls to add_transformation() will add temporary
499 proc_start_temporary_transformations (struct dataset *ds)
501 if (!proc_in_temporary_transformations (ds))
503 add_case_limit_trns (ds);
505 ds->permanent_dict = dict_clone (ds->dict);
507 trns_chain_finalize (ds->permanent_trns_chain);
508 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
510 if ( ds->xform_callback)
511 ds->xform_callback (true, ds->xform_callback_aux);
515 /* Converts all the temporary transformations, if any, to
516 permanent transformations. Further transformations will be
518 Returns true if anything changed, false otherwise. */
520 proc_make_temporary_transformations_permanent (struct dataset *ds)
522 if (proc_in_temporary_transformations (ds))
524 trns_chain_finalize (ds->temporary_trns_chain);
525 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
526 ds->temporary_trns_chain = NULL;
528 dict_destroy (ds->permanent_dict);
529 ds->permanent_dict = NULL;
537 /* Cancels all temporary transformations, if any. Further
538 transformations will be permanent.
539 Returns true if anything changed, false otherwise. */
541 proc_cancel_temporary_transformations (struct dataset *ds)
543 if (proc_in_temporary_transformations (ds))
545 dict_destroy (ds->dict);
546 ds->dict = ds->permanent_dict;
547 ds->permanent_dict = NULL;
549 trns_chain_destroy (ds->temporary_trns_chain);
550 ds->temporary_trns_chain = NULL;
552 if ( ds->xform_callback)
553 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
554 ds->xform_callback_aux);
562 /* Cancels all transformations, if any.
563 Returns true if successful, false on I/O error. */
565 proc_cancel_all_transformations (struct dataset *ds)
568 assert (ds->proc_state == PROC_COMMITTED);
569 ok = trns_chain_destroy (ds->permanent_trns_chain);
570 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
571 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
572 ds->temporary_trns_chain = NULL;
573 if ( ds->xform_callback)
574 ds->xform_callback (false, ds->xform_callback_aux);
581 dict_callback (struct dictionary *d UNUSED, void *ds_)
583 struct dataset *ds = ds_;
584 dataset_set_unsaved (ds);
587 /* Initializes procedure handling. */
589 create_dataset (void)
591 struct dataset *ds = xzalloc (sizeof(*ds));
592 ds->dict = dict_create ();
594 dict_set_change_callback (ds->dict, dict_callback, ds);
596 dict_set_encoding (ds->dict, get_default_encoding ());
598 ds->caseinit = caseinit_create ();
599 proc_cancel_all_transformations (ds);
605 dataset_add_transform_change_callback (struct dataset *ds,
606 transformation_change_callback_func *cb,
609 ds->xform_callback = cb;
610 ds->xform_callback_aux = aux;
613 /* Finishes up procedure handling. */
615 destroy_dataset (struct dataset *ds)
617 proc_discard_active_file (ds);
618 dict_destroy (ds->dict);
619 caseinit_destroy (ds->caseinit);
620 trns_chain_destroy (ds->permanent_trns_chain);
622 if ( ds->xform_callback)
623 ds->xform_callback (false, ds->xform_callback_aux);
627 /* Causes output from the next procedure to be discarded, instead
628 of being preserved for use as input for the next procedure. */
630 proc_discard_output (struct dataset *ds)
632 ds->discard_output = true;
635 /* Discards the active file dictionary, data, and
638 proc_discard_active_file (struct dataset *ds)
640 assert (ds->proc_state == PROC_COMMITTED);
642 dict_clear (ds->dict);
643 fh_set_default_handle (NULL);
647 casereader_destroy (ds->source);
650 proc_cancel_all_transformations (ds);
653 /* Sets SOURCE as the source for procedure input for the next
656 proc_set_active_file (struct dataset *ds,
657 struct casereader *source,
658 struct dictionary *dict)
660 assert (ds->proc_state == PROC_COMMITTED);
661 assert (ds->dict != dict);
663 proc_discard_active_file (ds);
665 dict_destroy (ds->dict);
667 dict_set_change_callback (ds->dict, dict_callback, ds);
669 proc_set_active_file_data (ds, source);
672 /* Replaces the active file's data by READER without replacing
673 the associated dictionary. */
675 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
677 casereader_destroy (ds->source);
680 caseinit_clear (ds->caseinit);
681 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
683 return reader == NULL || !casereader_error (reader);
686 /* Returns true if an active file data source is available, false
689 proc_has_active_file (const struct dataset *ds)
691 return ds->source != NULL;
694 /* Returns the active file data source from DS, or a null pointer
695 if DS has no data source, and removes it from DS. */
697 proc_extract_active_file_data (struct dataset *ds)
699 struct casereader *reader = ds->source;
705 /* Checks whether DS has a corrupted active file. If so,
706 discards it and returns false. If not, returns true without
709 dataset_end_of_command (struct dataset *ds)
711 if (ds->source != NULL)
713 if (casereader_error (ds->source))
715 proc_discard_active_file (ds);
720 const struct taint *taint = casereader_get_taint (ds->source);
721 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
722 assert (!taint_has_tainted_successor (taint));
728 static trns_proc_func case_limit_trns_proc;
729 static trns_free_func case_limit_trns_free;
731 /* Adds a transformation that limits the number of cases that may
732 pass through, if DS->DICT has a case limit. */
734 add_case_limit_trns (struct dataset *ds)
736 casenumber case_limit = dict_get_case_limit (ds->dict);
739 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
740 *cases_remaining = case_limit;
741 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
743 dict_set_case_limit (ds->dict, 0);
747 /* Limits the maximum number of cases processed to
750 case_limit_trns_proc (void *cases_remaining_,
751 struct ccase **c UNUSED, casenumber case_nr UNUSED)
753 size_t *cases_remaining = cases_remaining_;
754 if (*cases_remaining > 0)
756 (*cases_remaining)--;
757 return TRNS_CONTINUE;
760 return TRNS_DROP_CASE;
763 /* Frees the data associated with a case limit transformation. */
765 case_limit_trns_free (void *cases_remaining_)
767 size_t *cases_remaining = cases_remaining_;
768 free (cases_remaining);
772 static trns_proc_func filter_trns_proc;
774 /* Adds a temporary transformation to filter data according to
775 the variable specified on FILTER, if any. */
777 add_filter_trns (struct dataset *ds)
779 struct variable *filter_var = dict_get_filter (ds->dict);
780 if (filter_var != NULL)
782 proc_start_temporary_transformations (ds);
783 add_transformation (ds, filter_trns_proc, NULL, filter_var);
787 /* FILTER transformation. */
789 filter_trns_proc (void *filter_var_,
790 struct ccase **c UNUSED, casenumber case_nr UNUSED)
793 struct variable *filter_var = filter_var_;
794 double f = case_num (*c, filter_var);
795 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
796 ? TRNS_CONTINUE : TRNS_DROP_CASE);
801 dataset_dict (const struct dataset *ds)
806 const struct casereader *
807 dataset_source (const struct dataset *ds)
813 dataset_need_lag (struct dataset *ds, int n_before)
815 ds->n_lag = MAX (ds->n_lag, n_before);