1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* If true, cases are discarded instead of being written to
67 /* The transformation chain that the next transformation will be
69 struct trns_chain *cur_trns_chain;
71 /* The case map used to compact a case, if necessary;
72 otherwise a null pointer. */
73 struct case_map *compactor;
75 /* Time at which proc was last invoked. */
76 time_t last_proc_invocation;
78 /* Cases just before ("lagging") the current one. */
79 int n_lag; /* Number of cases to lag. */
80 struct deque lag; /* Deque of lagged cases. */
81 struct ccase **lag_cases; /* Lagged cases managed by deque. */
86 PROC_COMMITTED, /* No procedure in progress. */
87 PROC_OPEN, /* proc_open called, casereader still open. */
88 PROC_CLOSED /* casereader from proc_open destroyed,
89 but proc_commit not yet called. */
92 casenumber cases_written; /* Cases output so far. */
93 bool ok; /* Error status. */
94 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
96 const struct dataset_callbacks *callbacks;
99 /* Default encoding for reading syntax files. */
100 char *syntax_encoding;
103 static void dataset_changed__ (struct dataset *);
104 static void dataset_transformations_changed__ (struct dataset *,
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
110 static void update_last_proc_invocation (struct dataset *ds);
113 dict_callback (struct dictionary *d UNUSED, void *ds_)
115 struct dataset *ds = ds_;
116 dataset_changed__ (ds);
119 /* Creates and returns a new dataset. The dataset initially has an empty
120 dictionary and no data source. */
122 dataset_create (void)
126 ds = xzalloc (sizeof *ds);
127 ds->dict = dict_create (get_default_encoding ());
128 dict_set_change_callback (ds->dict, dict_callback, ds);
130 ds->caseinit = caseinit_create ();
131 proc_cancel_all_transformations (ds);
132 ds->syntax_encoding = xstrdup ("Auto");
138 dataset_destroy (struct dataset *ds)
143 dict_destroy (ds->dict);
144 caseinit_destroy (ds->caseinit);
145 trns_chain_destroy (ds->permanent_trns_chain);
146 dataset_transformations_changed__ (ds, false);
147 free (ds->syntax_encoding);
152 /* Discards the active dataset's dictionary, data, and transformations. */
154 dataset_clear (struct dataset *ds)
156 assert (ds->proc_state == PROC_COMMITTED);
158 dict_clear (ds->dict);
159 fh_set_default_handle (NULL);
163 casereader_destroy (ds->source);
166 proc_cancel_all_transformations (ds);
169 /* Returns the dictionary within DS. This is always nonnull, although it
170 might not contain any variables. */
172 dataset_dict (const struct dataset *ds)
177 /* Replaces DS's dictionary by DICT, discarding any source and
180 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
182 assert (ds->proc_state == PROC_COMMITTED);
183 assert (ds->dict != dict);
187 dict_destroy (ds->dict);
189 dict_set_change_callback (ds->dict, dict_callback, ds);
192 /* Returns the casereader that will be read when a procedure is executed on
193 DS. This can be NULL if none has been set up yet. */
194 const struct casereader *
195 dataset_source (const struct dataset *ds)
200 /* Returns true if DS has a data source, false otherwise. */
202 dataset_has_source (const struct dataset *ds)
204 return dataset_source (ds) != NULL;
207 /* Replaces the active dataset's data by READER. READER's cases must have an
208 appropriate format for DS's dictionary. */
210 dataset_set_source (struct dataset *ds, struct casereader *reader)
212 casereader_destroy (ds->source);
215 caseinit_clear (ds->caseinit);
216 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
218 return reader == NULL || !casereader_error (reader);
221 /* Returns the data source from DS and removes it from DS. Returns a null
222 pointer if DS has no data source. */
224 dataset_steal_source (struct dataset *ds)
226 struct casereader *reader = ds->source;
233 dataset_set_callbacks (struct dataset *ds,
234 const struct dataset_callbacks *callbacks,
237 ds->callbacks = callbacks;
238 ds->cb_data = cb_data;
242 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
244 free (ds->syntax_encoding);
245 ds->syntax_encoding = xstrdup (encoding);
249 dataset_get_default_syntax_encoding (const struct dataset *ds)
251 return ds->syntax_encoding;
254 /* Returns the last time the data was read. */
256 time_of_last_procedure (struct dataset *ds)
258 if (ds->last_proc_invocation == 0)
259 update_last_proc_invocation (ds);
260 return ds->last_proc_invocation;
263 /* Regular procedure. */
265 /* Executes any pending transformations, if necessary.
266 This is not identical to the EXECUTE command in that it won't
267 always read the source data. This can be important when the
268 source data is given inline within BEGIN DATA...END FILE. */
270 proc_execute (struct dataset *ds)
274 if ((ds->temporary_trns_chain == NULL
275 || trns_chain_is_empty (ds->temporary_trns_chain))
276 && trns_chain_is_empty (ds->permanent_trns_chain))
279 ds->discard_output = false;
280 dict_set_case_limit (ds->dict, 0);
281 dict_clear_vectors (ds->dict);
285 ok = casereader_destroy (proc_open (ds));
286 return proc_commit (ds) && ok;
289 static const struct casereader_class proc_casereader_class;
291 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
292 cases filtered out with FILTER BY will not be included in the casereader
293 (which is usually desirable). If FILTER is false, all cases will be
294 included regardless of FILTER BY settings.
296 proc_commit must be called when done. */
298 proc_open_filtering (struct dataset *ds, bool filter)
300 struct casereader *reader;
302 assert (ds->source != NULL);
303 assert (ds->proc_state == PROC_COMMITTED);
305 update_last_proc_invocation (ds);
307 caseinit_mark_for_init (ds->caseinit, ds->dict);
309 /* Finish up the collection of transformations. */
310 add_case_limit_trns (ds);
312 add_filter_trns (ds);
313 trns_chain_finalize (ds->cur_trns_chain);
315 /* Make permanent_dict refer to the dictionary right before
316 data reaches the sink. */
317 if (ds->permanent_dict == NULL)
318 ds->permanent_dict = ds->dict;
321 if (!ds->discard_output)
323 struct dictionary *pd = ds->permanent_dict;
324 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
325 if (compacted_value_cnt < dict_get_next_value_idx (pd))
327 struct caseproto *compacted_proto;
328 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
329 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
330 ds->sink = autopaging_writer_create (compacted_proto);
331 caseproto_unref (compacted_proto);
335 ds->compactor = NULL;
336 ds->sink = autopaging_writer_create (dict_get_proto (pd));
341 ds->compactor = NULL;
345 /* Allocate memory for lagged cases. */
346 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
348 ds->proc_state = PROC_OPEN;
349 ds->cases_written = 0;
352 /* FIXME: use taint in dataset in place of `ok'? */
353 /* FIXME: for trivial cases we can just return a clone of
356 /* Create casereader and insert a shim on top. The shim allows us to
357 arbitrarily extend the casereader's lifetime, by slurping the cases into
358 the shim's buffer in proc_commit(). That is especially useful when output
359 table_items are generated directly from the procedure casereader (e.g. by
360 the LIST procedure) when we are using an output driver that keeps a
361 reference to the output items passed to it (e.g. the GUI output driver in
363 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
365 &proc_casereader_class, ds);
366 ds->shim = casereader_shim_insert (reader);
370 /* Opens dataset DS for reading cases with proc_read.
371 proc_commit must be called when done. */
373 proc_open (struct dataset *ds)
375 return proc_open_filtering (ds, true);
378 /* Returns true if a procedure is in progress, that is, if
379 proc_open has been called but proc_commit has not. */
381 proc_is_open (const struct dataset *ds)
383 return ds->proc_state != PROC_COMMITTED;
386 /* "read" function for procedure casereader. */
387 static struct ccase *
388 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
390 struct dataset *ds = ds_;
391 enum trns_result retval = TRNS_DROP_CASE;
394 assert (ds->proc_state == PROC_OPEN);
395 for (; ; case_unref (c))
399 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
400 if (retval == TRNS_ERROR)
405 /* Read a case from source. */
406 c = casereader_read (ds->source);
409 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
410 caseinit_init_vars (ds->caseinit, c);
412 /* Execute permanent transformations. */
413 case_nr = ds->cases_written + 1;
414 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
416 caseinit_update_left_vars (ds->caseinit, c);
417 if (retval != TRNS_CONTINUE)
420 /* Write case to collection of lagged cases. */
423 while (deque_count (&ds->lag) >= ds->n_lag)
424 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
425 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
428 /* Write case to replacement dataset. */
430 if (ds->sink != NULL)
431 casewriter_write (ds->sink,
432 case_map_execute (ds->compactor, case_ref (c)));
434 /* Execute temporary transformations. */
435 if (ds->temporary_trns_chain != NULL)
437 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
438 &c, ds->cases_written);
439 if (retval != TRNS_CONTINUE)
447 /* "destroy" function for procedure casereader. */
449 proc_casereader_destroy (struct casereader *reader, void *ds_)
451 struct dataset *ds = ds_;
454 /* We are always the subreader for a casereader_buffer, so if we're being
455 destroyed then it's because the casereader_buffer has read all the cases
456 that it ever will. */
459 /* Make sure transformations happen for every input case, in
460 case they have side effects, and ensure that the replacement
461 active dataset gets all the cases it should. */
462 while ((c = casereader_read (reader)) != NULL)
465 ds->proc_state = PROC_CLOSED;
466 ds->ok = casereader_destroy (ds->source) && ds->ok;
468 dataset_set_source (ds, NULL);
471 /* Must return false if the source casereader, a transformation,
472 or the sink casewriter signaled an error. (If a temporary
473 transformation signals an error, then the return value is
474 false, but the replacement active dataset may still be
477 proc_commit (struct dataset *ds)
479 if (ds->shim != NULL)
480 casereader_shim_slurp (ds->shim);
482 assert (ds->proc_state == PROC_CLOSED);
483 ds->proc_state = PROC_COMMITTED;
485 dataset_changed__ (ds);
487 /* Free memory for lagged cases. */
488 while (!deque_is_empty (&ds->lag))
489 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
490 free (ds->lag_cases);
492 /* Dictionary from before TEMPORARY becomes permanent. */
493 proc_cancel_temporary_transformations (ds);
495 if (!ds->discard_output)
497 /* Finish compacting. */
498 if (ds->compactor != NULL)
500 case_map_destroy (ds->compactor);
501 ds->compactor = NULL;
503 dict_delete_scratch_vars (ds->dict);
504 dict_compact_values (ds->dict);
507 /* Old data sink becomes new data source. */
508 if (ds->sink != NULL)
509 ds->source = casewriter_make_reader (ds->sink);
514 ds->discard_output = false;
518 caseinit_clear (ds->caseinit);
519 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
521 dict_clear_vectors (ds->dict);
522 ds->permanent_dict = NULL;
523 return proc_cancel_all_transformations (ds) && ds->ok;
526 /* Casereader class for procedure execution. */
527 static const struct casereader_class proc_casereader_class =
529 proc_casereader_read,
530 proc_casereader_destroy,
535 /* Updates last_proc_invocation. */
537 update_last_proc_invocation (struct dataset *ds)
539 ds->last_proc_invocation = time (NULL);
542 /* Returns a pointer to the lagged case from N_BEFORE cases before the
543 current one, or NULL if there haven't been that many cases yet. */
545 lagged_case (const struct dataset *ds, int n_before)
547 assert (n_before >= 1);
548 assert (n_before <= ds->n_lag);
550 if (n_before <= deque_count (&ds->lag))
551 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
556 /* Returns the current set of permanent transformations,
557 and clears the permanent transformations.
558 For use by INPUT PROGRAM. */
560 proc_capture_transformations (struct dataset *ds)
562 struct trns_chain *chain;
564 assert (ds->temporary_trns_chain == NULL);
565 chain = ds->permanent_trns_chain;
566 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
567 dataset_transformations_changed__ (ds, false);
572 /* Adds a transformation that processes a case with PROC and
573 frees itself with FREE to the current set of transformations.
574 The functions are passed AUX as auxiliary data. */
576 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
578 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
579 dataset_transformations_changed__ (ds, true);
582 /* Adds a transformation that processes a case with PROC and
583 frees itself with FREE to the current set of transformations.
584 When parsing of the block of transformations is complete,
585 FINALIZE will be called.
586 The functions are passed AUX as auxiliary data. */
588 add_transformation_with_finalizer (struct dataset *ds,
589 trns_finalize_func *finalize,
590 trns_proc_func *proc,
591 trns_free_func *free, void *aux)
593 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
594 dataset_transformations_changed__ (ds, true);
597 /* Returns the index of the next transformation.
598 This value can be returned by a transformation procedure
599 function to indicate a "jump" to that transformation. */
601 next_transformation (const struct dataset *ds)
603 return trns_chain_next (ds->cur_trns_chain);
606 /* Returns true if the next call to add_transformation() will add
607 a temporary transformation, false if it will add a permanent
610 proc_in_temporary_transformations (const struct dataset *ds)
612 return ds->temporary_trns_chain != NULL;
615 /* Marks the start of temporary transformations.
616 Further calls to add_transformation() will add temporary
619 proc_start_temporary_transformations (struct dataset *ds)
621 if (!proc_in_temporary_transformations (ds))
623 add_case_limit_trns (ds);
625 ds->permanent_dict = dict_clone (ds->dict);
627 trns_chain_finalize (ds->permanent_trns_chain);
628 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
629 dataset_transformations_changed__ (ds, true);
633 /* Converts all the temporary transformations, if any, to
634 permanent transformations. Further transformations will be
636 Returns true if anything changed, false otherwise. */
638 proc_make_temporary_transformations_permanent (struct dataset *ds)
640 if (proc_in_temporary_transformations (ds))
642 trns_chain_finalize (ds->temporary_trns_chain);
643 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
644 ds->temporary_trns_chain = NULL;
646 dict_destroy (ds->permanent_dict);
647 ds->permanent_dict = NULL;
655 /* Cancels all temporary transformations, if any. Further
656 transformations will be permanent.
657 Returns true if anything changed, false otherwise. */
659 proc_cancel_temporary_transformations (struct dataset *ds)
661 if (proc_in_temporary_transformations (ds))
663 dict_destroy (ds->dict);
664 ds->dict = ds->permanent_dict;
665 ds->permanent_dict = NULL;
667 trns_chain_destroy (ds->temporary_trns_chain);
668 ds->temporary_trns_chain = NULL;
669 dataset_transformations_changed__ (
670 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
677 /* Cancels all transformations, if any.
678 Returns true if successful, false on I/O error. */
680 proc_cancel_all_transformations (struct dataset *ds)
683 assert (ds->proc_state == PROC_COMMITTED);
684 ok = trns_chain_destroy (ds->permanent_trns_chain);
685 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
686 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
687 ds->temporary_trns_chain = NULL;
688 dataset_transformations_changed__ (ds, false);
693 /* Causes output from the next procedure to be discarded, instead
694 of being preserved for use as input for the next procedure. */
696 proc_discard_output (struct dataset *ds)
698 ds->discard_output = true;
702 /* Checks whether DS has a corrupted active dataset. If so,
703 discards it and returns false. If not, returns true without
706 dataset_end_of_command (struct dataset *ds)
708 if (ds->source != NULL)
710 if (casereader_error (ds->source))
717 const struct taint *taint = casereader_get_taint (ds->source);
718 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
719 assert (!taint_has_tainted_successor (taint));
725 static trns_proc_func case_limit_trns_proc;
726 static trns_free_func case_limit_trns_free;
728 /* Adds a transformation that limits the number of cases that may
729 pass through, if DS->DICT has a case limit. */
731 add_case_limit_trns (struct dataset *ds)
733 casenumber case_limit = dict_get_case_limit (ds->dict);
736 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
737 *cases_remaining = case_limit;
738 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
740 dict_set_case_limit (ds->dict, 0);
744 /* Limits the maximum number of cases processed to
747 case_limit_trns_proc (void *cases_remaining_,
748 struct ccase **c UNUSED, casenumber case_nr UNUSED)
750 size_t *cases_remaining = cases_remaining_;
751 if (*cases_remaining > 0)
753 (*cases_remaining)--;
754 return TRNS_CONTINUE;
757 return TRNS_DROP_CASE;
760 /* Frees the data associated with a case limit transformation. */
762 case_limit_trns_free (void *cases_remaining_)
764 size_t *cases_remaining = cases_remaining_;
765 free (cases_remaining);
769 static trns_proc_func filter_trns_proc;
771 /* Adds a temporary transformation to filter data according to
772 the variable specified on FILTER, if any. */
774 add_filter_trns (struct dataset *ds)
776 struct variable *filter_var = dict_get_filter (ds->dict);
777 if (filter_var != NULL)
779 proc_start_temporary_transformations (ds);
780 add_transformation (ds, filter_trns_proc, NULL, filter_var);
784 /* FILTER transformation. */
786 filter_trns_proc (void *filter_var_,
787 struct ccase **c UNUSED, casenumber case_nr UNUSED)
790 struct variable *filter_var = filter_var_;
791 double f = case_num (*c, filter_var);
792 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
793 ? TRNS_CONTINUE : TRNS_DROP_CASE);
798 dataset_need_lag (struct dataset *ds, int n_before)
800 ds->n_lag = MAX (ds->n_lag, n_before);
804 dataset_changed__ (struct dataset *ds)
806 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
807 ds->callbacks->changed (ds->cb_data);
811 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
813 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
814 ds->callbacks->transformations_changed (non_empty, ds->cb_data);