1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* Callback which occurs whenever the transformation chain(s) have
65 transformation_change_callback_func *xform_callback;
66 void *xform_callback_aux;
68 /* If true, cases are discarded instead of being written to
72 /* The transformation chain that the next transformation will be
74 struct trns_chain *cur_trns_chain;
76 /* The case map used to compact a case, if necessary;
77 otherwise a null pointer. */
78 struct case_map *compactor;
80 /* Time at which proc was last invoked. */
81 time_t last_proc_invocation;
83 /* Cases just before ("lagging") the current one. */
84 int n_lag; /* Number of cases to lag. */
85 struct deque lag; /* Deque of lagged cases. */
86 struct ccase **lag_cases; /* Lagged cases managed by deque. */
91 PROC_COMMITTED, /* No procedure in progress. */
92 PROC_OPEN, /* proc_open called, casereader still open. */
93 PROC_CLOSED /* casereader from proc_open destroyed,
94 but proc_commit not yet called. */
97 casenumber cases_written; /* Cases output so far. */
98 bool ok; /* Error status. */
99 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
101 void (*callback) (void *); /* Callback for when the dataset changes */
104 /* Default encoding for reading syntax files. */
105 char *syntax_encoding;
108 static void add_case_limit_trns (struct dataset *ds);
109 static void add_filter_trns (struct dataset *ds);
111 static void update_last_proc_invocation (struct dataset *ds);
114 dataset_set_unsaved (const struct dataset *ds)
116 if (ds->callback) ds->callback (ds->cb_data);
120 dict_callback (struct dictionary *d UNUSED, void *ds_)
122 struct dataset *ds = ds_;
123 dataset_set_unsaved (ds);
126 /* Creates and returns a new dataset. The dataset initially has an empty
127 dictionary and no data source. */
129 dataset_create (void)
133 ds = xzalloc (sizeof *ds);
134 ds->dict = dict_create ();
135 dict_set_change_callback (ds->dict, dict_callback, ds);
136 dict_set_encoding (ds->dict, get_default_encoding ());
138 ds->caseinit = caseinit_create ();
139 proc_cancel_all_transformations (ds);
140 ds->syntax_encoding = xstrdup ("Auto");
146 dataset_destroy (struct dataset *ds)
151 dict_destroy (ds->dict);
152 caseinit_destroy (ds->caseinit);
153 trns_chain_destroy (ds->permanent_trns_chain);
155 if ( ds->xform_callback)
156 ds->xform_callback (false, ds->xform_callback_aux);
157 free (ds->syntax_encoding);
162 /* Discards the active file dictionary, data, and transformations. */
164 dataset_clear (struct dataset *ds)
166 assert (ds->proc_state == PROC_COMMITTED);
168 dict_clear (ds->dict);
169 fh_set_default_handle (NULL);
173 casereader_destroy (ds->source);
176 proc_cancel_all_transformations (ds);
179 /* Returns the dictionary within DS. This is always nonnull, although it
180 might not contain any variables. */
182 dataset_dict (const struct dataset *ds)
187 /* Replaces DS's dictionary by DICT, discarding any source and
190 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
192 assert (ds->proc_state == PROC_COMMITTED);
193 assert (ds->dict != dict);
197 dict_destroy (ds->dict);
199 dict_set_change_callback (ds->dict, dict_callback, ds);
202 /* Returns the casereader that will be read when a procedure is executed on
203 DS. This can be NULL if none has been set up yet. */
204 const struct casereader *
205 dataset_source (const struct dataset *ds)
210 /* Returns true if DS has a data source, false otherwise. */
212 dataset_has_source (const struct dataset *ds)
214 return dataset_source (ds) != NULL;
217 /* Replaces the active file's data by READER. READER's cases must have an
218 appropriate format for DS's dictionary. */
220 dataset_set_source (struct dataset *ds, struct casereader *reader)
222 casereader_destroy (ds->source);
225 caseinit_clear (ds->caseinit);
226 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
228 return reader == NULL || !casereader_error (reader);
231 /* Returns the data source from DS and removes it from DS. Returns a null
232 pointer if DS has no data source. */
234 dataset_steal_source (struct dataset *ds)
236 struct casereader *reader = ds->source;
243 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
246 ds->cb_data = cb_data;
250 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
252 free (ds->syntax_encoding);
253 ds->syntax_encoding = xstrdup (encoding);
257 dataset_get_default_syntax_encoding (const struct dataset *ds)
259 return ds->syntax_encoding;
262 /* Returns the last time the data was read. */
264 time_of_last_procedure (struct dataset *ds)
266 if (ds->last_proc_invocation == 0)
267 update_last_proc_invocation (ds);
268 return ds->last_proc_invocation;
271 /* Regular procedure. */
273 /* Executes any pending transformations, if necessary.
274 This is not identical to the EXECUTE command in that it won't
275 always read the source data. This can be important when the
276 source data is given inline within BEGIN DATA...END FILE. */
278 proc_execute (struct dataset *ds)
282 if ((ds->temporary_trns_chain == NULL
283 || trns_chain_is_empty (ds->temporary_trns_chain))
284 && trns_chain_is_empty (ds->permanent_trns_chain))
287 ds->discard_output = false;
288 dict_set_case_limit (ds->dict, 0);
289 dict_clear_vectors (ds->dict);
293 ok = casereader_destroy (proc_open (ds));
294 return proc_commit (ds) && ok;
297 static const struct casereader_class proc_casereader_class;
299 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
300 cases filtered out with FILTER BY will not be included in the casereader
301 (which is usually desirable). If FILTER is false, all cases will be
302 included regardless of FILTER BY settings.
304 proc_commit must be called when done. */
306 proc_open_filtering (struct dataset *ds, bool filter)
308 struct casereader *reader;
310 assert (ds->source != NULL);
311 assert (ds->proc_state == PROC_COMMITTED);
313 update_last_proc_invocation (ds);
315 caseinit_mark_for_init (ds->caseinit, ds->dict);
317 /* Finish up the collection of transformations. */
318 add_case_limit_trns (ds);
320 add_filter_trns (ds);
321 trns_chain_finalize (ds->cur_trns_chain);
323 /* Make permanent_dict refer to the dictionary right before
324 data reaches the sink. */
325 if (ds->permanent_dict == NULL)
326 ds->permanent_dict = ds->dict;
329 if (!ds->discard_output)
331 struct dictionary *pd = ds->permanent_dict;
332 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
333 if (compacted_value_cnt < dict_get_next_value_idx (pd))
335 struct caseproto *compacted_proto;
336 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
337 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
338 ds->sink = autopaging_writer_create (compacted_proto);
339 caseproto_unref (compacted_proto);
343 ds->compactor = NULL;
344 ds->sink = autopaging_writer_create (dict_get_proto (pd));
349 ds->compactor = NULL;
353 /* Allocate memory for lagged cases. */
354 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
356 ds->proc_state = PROC_OPEN;
357 ds->cases_written = 0;
360 /* FIXME: use taint in dataset in place of `ok'? */
361 /* FIXME: for trivial cases we can just return a clone of
364 /* Create casereader and insert a shim on top. The shim allows us to
365 arbitrarily extend the casereader's lifetime, by slurping the cases into
366 the shim's buffer in proc_commit(). That is especially useful when output
367 table_items are generated directly from the procedure casereader (e.g. by
368 the LIST procedure) when we are using an output driver that keeps a
369 reference to the output items passed to it (e.g. the GUI output driver in
371 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
373 &proc_casereader_class, ds);
374 ds->shim = casereader_shim_insert (reader);
378 /* Opens dataset DS for reading cases with proc_read.
379 proc_commit must be called when done. */
381 proc_open (struct dataset *ds)
383 return proc_open_filtering (ds, true);
386 /* Returns true if a procedure is in progress, that is, if
387 proc_open has been called but proc_commit has not. */
389 proc_is_open (const struct dataset *ds)
391 return ds->proc_state != PROC_COMMITTED;
394 /* "read" function for procedure casereader. */
395 static struct ccase *
396 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
398 struct dataset *ds = ds_;
399 enum trns_result retval = TRNS_DROP_CASE;
402 assert (ds->proc_state == PROC_OPEN);
403 for (; ; case_unref (c))
407 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
408 if (retval == TRNS_ERROR)
413 /* Read a case from source. */
414 c = casereader_read (ds->source);
417 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
418 caseinit_init_vars (ds->caseinit, c);
420 /* Execute permanent transformations. */
421 case_nr = ds->cases_written + 1;
422 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
424 caseinit_update_left_vars (ds->caseinit, c);
425 if (retval != TRNS_CONTINUE)
428 /* Write case to collection of lagged cases. */
431 while (deque_count (&ds->lag) >= ds->n_lag)
432 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
433 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
436 /* Write case to replacement active file. */
438 if (ds->sink != NULL)
439 casewriter_write (ds->sink,
440 case_map_execute (ds->compactor, case_ref (c)));
442 /* Execute temporary transformations. */
443 if (ds->temporary_trns_chain != NULL)
445 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
446 &c, ds->cases_written);
447 if (retval != TRNS_CONTINUE)
455 /* "destroy" function for procedure casereader. */
457 proc_casereader_destroy (struct casereader *reader, void *ds_)
459 struct dataset *ds = ds_;
462 /* We are always the subreader for a casereader_buffer, so if we're being
463 destroyed then it's because the casereader_buffer has read all the cases
464 that it ever will. */
467 /* Make sure transformations happen for every input case, in
468 case they have side effects, and ensure that the replacement
469 active file gets all the cases it should. */
470 while ((c = casereader_read (reader)) != NULL)
473 ds->proc_state = PROC_CLOSED;
474 ds->ok = casereader_destroy (ds->source) && ds->ok;
476 dataset_set_source (ds, NULL);
479 /* Must return false if the source casereader, a transformation,
480 or the sink casewriter signaled an error. (If a temporary
481 transformation signals an error, then the return value is
482 false, but the replacement active file may still be
485 proc_commit (struct dataset *ds)
487 if (ds->shim != NULL)
488 casereader_shim_slurp (ds->shim);
490 assert (ds->proc_state == PROC_CLOSED);
491 ds->proc_state = PROC_COMMITTED;
493 dataset_set_unsaved (ds);
495 /* Free memory for lagged cases. */
496 while (!deque_is_empty (&ds->lag))
497 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
498 free (ds->lag_cases);
500 /* Dictionary from before TEMPORARY becomes permanent. */
501 proc_cancel_temporary_transformations (ds);
503 if (!ds->discard_output)
505 /* Finish compacting. */
506 if (ds->compactor != NULL)
508 case_map_destroy (ds->compactor);
509 ds->compactor = NULL;
511 dict_delete_scratch_vars (ds->dict);
512 dict_compact_values (ds->dict);
515 /* Old data sink becomes new data source. */
516 if (ds->sink != NULL)
517 ds->source = casewriter_make_reader (ds->sink);
522 ds->discard_output = false;
526 caseinit_clear (ds->caseinit);
527 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
529 dict_clear_vectors (ds->dict);
530 ds->permanent_dict = NULL;
531 return proc_cancel_all_transformations (ds) && ds->ok;
534 /* Casereader class for procedure execution. */
535 static const struct casereader_class proc_casereader_class =
537 proc_casereader_read,
538 proc_casereader_destroy,
543 /* Updates last_proc_invocation. */
545 update_last_proc_invocation (struct dataset *ds)
547 ds->last_proc_invocation = time (NULL);
550 /* Returns a pointer to the lagged case from N_BEFORE cases before the
551 current one, or NULL if there haven't been that many cases yet. */
553 lagged_case (const struct dataset *ds, int n_before)
555 assert (n_before >= 1);
556 assert (n_before <= ds->n_lag);
558 if (n_before <= deque_count (&ds->lag))
559 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
564 /* Returns the current set of permanent transformations,
565 and clears the permanent transformations.
566 For use by INPUT PROGRAM. */
568 proc_capture_transformations (struct dataset *ds)
570 struct trns_chain *chain;
572 assert (ds->temporary_trns_chain == NULL);
573 chain = ds->permanent_trns_chain;
574 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
576 if ( ds->xform_callback)
577 ds->xform_callback (false, ds->xform_callback_aux);
582 /* Adds a transformation that processes a case with PROC and
583 frees itself with FREE to the current set of transformations.
584 The functions are passed AUX as auxiliary data. */
586 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
588 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
589 if ( ds->xform_callback)
590 ds->xform_callback (true, ds->xform_callback_aux);
593 /* Adds a transformation that processes a case with PROC and
594 frees itself with FREE to the current set of transformations.
595 When parsing of the block of transformations is complete,
596 FINALIZE will be called.
597 The functions are passed AUX as auxiliary data. */
599 add_transformation_with_finalizer (struct dataset *ds,
600 trns_finalize_func *finalize,
601 trns_proc_func *proc,
602 trns_free_func *free, void *aux)
604 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
606 if ( ds->xform_callback)
607 ds->xform_callback (true, ds->xform_callback_aux);
610 /* Returns the index of the next transformation.
611 This value can be returned by a transformation procedure
612 function to indicate a "jump" to that transformation. */
614 next_transformation (const struct dataset *ds)
616 return trns_chain_next (ds->cur_trns_chain);
619 /* Returns true if the next call to add_transformation() will add
620 a temporary transformation, false if it will add a permanent
623 proc_in_temporary_transformations (const struct dataset *ds)
625 return ds->temporary_trns_chain != NULL;
628 /* Marks the start of temporary transformations.
629 Further calls to add_transformation() will add temporary
632 proc_start_temporary_transformations (struct dataset *ds)
634 if (!proc_in_temporary_transformations (ds))
636 add_case_limit_trns (ds);
638 ds->permanent_dict = dict_clone (ds->dict);
640 trns_chain_finalize (ds->permanent_trns_chain);
641 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
643 if ( ds->xform_callback)
644 ds->xform_callback (true, ds->xform_callback_aux);
648 /* Converts all the temporary transformations, if any, to
649 permanent transformations. Further transformations will be
651 Returns true if anything changed, false otherwise. */
653 proc_make_temporary_transformations_permanent (struct dataset *ds)
655 if (proc_in_temporary_transformations (ds))
657 trns_chain_finalize (ds->temporary_trns_chain);
658 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
659 ds->temporary_trns_chain = NULL;
661 dict_destroy (ds->permanent_dict);
662 ds->permanent_dict = NULL;
670 /* Cancels all temporary transformations, if any. Further
671 transformations will be permanent.
672 Returns true if anything changed, false otherwise. */
674 proc_cancel_temporary_transformations (struct dataset *ds)
676 if (proc_in_temporary_transformations (ds))
678 dict_destroy (ds->dict);
679 ds->dict = ds->permanent_dict;
680 ds->permanent_dict = NULL;
682 trns_chain_destroy (ds->temporary_trns_chain);
683 ds->temporary_trns_chain = NULL;
685 if ( ds->xform_callback)
686 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
687 ds->xform_callback_aux);
695 /* Cancels all transformations, if any.
696 Returns true if successful, false on I/O error. */
698 proc_cancel_all_transformations (struct dataset *ds)
701 assert (ds->proc_state == PROC_COMMITTED);
702 ok = trns_chain_destroy (ds->permanent_trns_chain);
703 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
704 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
705 ds->temporary_trns_chain = NULL;
706 if ( ds->xform_callback)
707 ds->xform_callback (false, ds->xform_callback_aux);
714 dataset_add_transform_change_callback (struct dataset *ds,
715 transformation_change_callback_func *cb,
718 ds->xform_callback = cb;
719 ds->xform_callback_aux = aux;
722 /* Causes output from the next procedure to be discarded, instead
723 of being preserved for use as input for the next procedure. */
725 proc_discard_output (struct dataset *ds)
727 ds->discard_output = true;
731 /* Checks whether DS has a corrupted active file. If so,
732 discards it and returns false. If not, returns true without
735 dataset_end_of_command (struct dataset *ds)
737 if (ds->source != NULL)
739 if (casereader_error (ds->source))
746 const struct taint *taint = casereader_get_taint (ds->source);
747 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
748 assert (!taint_has_tainted_successor (taint));
754 static trns_proc_func case_limit_trns_proc;
755 static trns_free_func case_limit_trns_free;
757 /* Adds a transformation that limits the number of cases that may
758 pass through, if DS->DICT has a case limit. */
760 add_case_limit_trns (struct dataset *ds)
762 casenumber case_limit = dict_get_case_limit (ds->dict);
765 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
766 *cases_remaining = case_limit;
767 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
769 dict_set_case_limit (ds->dict, 0);
773 /* Limits the maximum number of cases processed to
776 case_limit_trns_proc (void *cases_remaining_,
777 struct ccase **c UNUSED, casenumber case_nr UNUSED)
779 size_t *cases_remaining = cases_remaining_;
780 if (*cases_remaining > 0)
782 (*cases_remaining)--;
783 return TRNS_CONTINUE;
786 return TRNS_DROP_CASE;
789 /* Frees the data associated with a case limit transformation. */
791 case_limit_trns_free (void *cases_remaining_)
793 size_t *cases_remaining = cases_remaining_;
794 free (cases_remaining);
798 static trns_proc_func filter_trns_proc;
800 /* Adds a temporary transformation to filter data according to
801 the variable specified on FILTER, if any. */
803 add_filter_trns (struct dataset *ds)
805 struct variable *filter_var = dict_get_filter (ds->dict);
806 if (filter_var != NULL)
808 proc_start_temporary_transformations (ds);
809 add_transformation (ds, filter_trns_proc, NULL, filter_var);
813 /* FILTER transformation. */
815 filter_trns_proc (void *filter_var_,
816 struct ccase **c UNUSED, casenumber case_nr UNUSED)
819 struct variable *filter_var = filter_var_;
820 double f = case_num (*c, filter_var);
821 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
822 ? TRNS_CONTINUE : TRNS_DROP_CASE);
827 dataset_need_lag (struct dataset *ds, int n_before)
829 ds->n_lag = MAX (ds->n_lag, n_before);