1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* If true, cases are discarded instead of being written to
67 /* The transformation chain that the next transformation will be
69 struct trns_chain *cur_trns_chain;
71 /* The case map used to compact a case, if necessary;
72 otherwise a null pointer. */
73 struct case_map *compactor;
75 /* Time at which proc was last invoked. */
76 time_t last_proc_invocation;
78 /* Cases just before ("lagging") the current one. */
79 int n_lag; /* Number of cases to lag. */
80 struct deque lag; /* Deque of lagged cases. */
81 struct ccase **lag_cases; /* Lagged cases managed by deque. */
86 PROC_COMMITTED, /* No procedure in progress. */
87 PROC_OPEN, /* proc_open called, casereader still open. */
88 PROC_CLOSED /* casereader from proc_open destroyed,
89 but proc_commit not yet called. */
92 casenumber cases_written; /* Cases output so far. */
93 bool ok; /* Error status. */
94 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
96 const struct dataset_callbacks *callbacks;
99 /* Default encoding for reading syntax files. */
100 char *syntax_encoding;
103 static void dataset_changed__ (struct dataset *);
104 static void dataset_transformations_changed__ (struct dataset *,
107 static void add_case_limit_trns (struct dataset *ds);
108 static void add_filter_trns (struct dataset *ds);
110 static void update_last_proc_invocation (struct dataset *ds);
113 dict_callback (struct dictionary *d UNUSED, void *ds_)
115 struct dataset *ds = ds_;
116 dataset_changed__ (ds);
119 /* Creates and returns a new dataset. The dataset initially has an empty
120 dictionary and no data source. */
122 dataset_create (void)
126 ds = xzalloc (sizeof *ds);
127 ds->dict = dict_create ();
128 dict_set_change_callback (ds->dict, dict_callback, ds);
129 dict_set_encoding (ds->dict, get_default_encoding ());
131 ds->caseinit = caseinit_create ();
132 proc_cancel_all_transformations (ds);
133 ds->syntax_encoding = xstrdup ("Auto");
139 dataset_destroy (struct dataset *ds)
144 dict_destroy (ds->dict);
145 caseinit_destroy (ds->caseinit);
146 trns_chain_destroy (ds->permanent_trns_chain);
147 dataset_transformations_changed__ (ds, false);
148 free (ds->syntax_encoding);
153 /* Discards the active file dictionary, data, and transformations. */
155 dataset_clear (struct dataset *ds)
157 assert (ds->proc_state == PROC_COMMITTED);
159 dict_clear (ds->dict);
160 fh_set_default_handle (NULL);
164 casereader_destroy (ds->source);
167 proc_cancel_all_transformations (ds);
170 /* Returns the dictionary within DS. This is always nonnull, although it
171 might not contain any variables. */
173 dataset_dict (const struct dataset *ds)
178 /* Replaces DS's dictionary by DICT, discarding any source and
181 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
183 assert (ds->proc_state == PROC_COMMITTED);
184 assert (ds->dict != dict);
188 dict_destroy (ds->dict);
190 dict_set_change_callback (ds->dict, dict_callback, ds);
193 /* Returns the casereader that will be read when a procedure is executed on
194 DS. This can be NULL if none has been set up yet. */
195 const struct casereader *
196 dataset_source (const struct dataset *ds)
201 /* Returns true if DS has a data source, false otherwise. */
203 dataset_has_source (const struct dataset *ds)
205 return dataset_source (ds) != NULL;
208 /* Replaces the active file's data by READER. READER's cases must have an
209 appropriate format for DS's dictionary. */
211 dataset_set_source (struct dataset *ds, struct casereader *reader)
213 casereader_destroy (ds->source);
216 caseinit_clear (ds->caseinit);
217 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
219 return reader == NULL || !casereader_error (reader);
222 /* Returns the data source from DS and removes it from DS. Returns a null
223 pointer if DS has no data source. */
225 dataset_steal_source (struct dataset *ds)
227 struct casereader *reader = ds->source;
234 dataset_set_callbacks (struct dataset *ds,
235 const struct dataset_callbacks *callbacks,
238 ds->callbacks = callbacks;
239 ds->cb_data = cb_data;
243 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
245 free (ds->syntax_encoding);
246 ds->syntax_encoding = xstrdup (encoding);
250 dataset_get_default_syntax_encoding (const struct dataset *ds)
252 return ds->syntax_encoding;
255 /* Returns the last time the data was read. */
257 time_of_last_procedure (struct dataset *ds)
259 if (ds->last_proc_invocation == 0)
260 update_last_proc_invocation (ds);
261 return ds->last_proc_invocation;
264 /* Regular procedure. */
266 /* Executes any pending transformations, if necessary.
267 This is not identical to the EXECUTE command in that it won't
268 always read the source data. This can be important when the
269 source data is given inline within BEGIN DATA...END FILE. */
271 proc_execute (struct dataset *ds)
275 if ((ds->temporary_trns_chain == NULL
276 || trns_chain_is_empty (ds->temporary_trns_chain))
277 && trns_chain_is_empty (ds->permanent_trns_chain))
280 ds->discard_output = false;
281 dict_set_case_limit (ds->dict, 0);
282 dict_clear_vectors (ds->dict);
286 ok = casereader_destroy (proc_open (ds));
287 return proc_commit (ds) && ok;
290 static const struct casereader_class proc_casereader_class;
292 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
293 cases filtered out with FILTER BY will not be included in the casereader
294 (which is usually desirable). If FILTER is false, all cases will be
295 included regardless of FILTER BY settings.
297 proc_commit must be called when done. */
299 proc_open_filtering (struct dataset *ds, bool filter)
301 struct casereader *reader;
303 assert (ds->source != NULL);
304 assert (ds->proc_state == PROC_COMMITTED);
306 update_last_proc_invocation (ds);
308 caseinit_mark_for_init (ds->caseinit, ds->dict);
310 /* Finish up the collection of transformations. */
311 add_case_limit_trns (ds);
313 add_filter_trns (ds);
314 trns_chain_finalize (ds->cur_trns_chain);
316 /* Make permanent_dict refer to the dictionary right before
317 data reaches the sink. */
318 if (ds->permanent_dict == NULL)
319 ds->permanent_dict = ds->dict;
322 if (!ds->discard_output)
324 struct dictionary *pd = ds->permanent_dict;
325 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
326 if (compacted_value_cnt < dict_get_next_value_idx (pd))
328 struct caseproto *compacted_proto;
329 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
330 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
331 ds->sink = autopaging_writer_create (compacted_proto);
332 caseproto_unref (compacted_proto);
336 ds->compactor = NULL;
337 ds->sink = autopaging_writer_create (dict_get_proto (pd));
342 ds->compactor = NULL;
346 /* Allocate memory for lagged cases. */
347 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
349 ds->proc_state = PROC_OPEN;
350 ds->cases_written = 0;
353 /* FIXME: use taint in dataset in place of `ok'? */
354 /* FIXME: for trivial cases we can just return a clone of
357 /* Create casereader and insert a shim on top. The shim allows us to
358 arbitrarily extend the casereader's lifetime, by slurping the cases into
359 the shim's buffer in proc_commit(). That is especially useful when output
360 table_items are generated directly from the procedure casereader (e.g. by
361 the LIST procedure) when we are using an output driver that keeps a
362 reference to the output items passed to it (e.g. the GUI output driver in
364 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
366 &proc_casereader_class, ds);
367 ds->shim = casereader_shim_insert (reader);
371 /* Opens dataset DS for reading cases with proc_read.
372 proc_commit must be called when done. */
374 proc_open (struct dataset *ds)
376 return proc_open_filtering (ds, true);
379 /* Returns true if a procedure is in progress, that is, if
380 proc_open has been called but proc_commit has not. */
382 proc_is_open (const struct dataset *ds)
384 return ds->proc_state != PROC_COMMITTED;
387 /* "read" function for procedure casereader. */
388 static struct ccase *
389 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
391 struct dataset *ds = ds_;
392 enum trns_result retval = TRNS_DROP_CASE;
395 assert (ds->proc_state == PROC_OPEN);
396 for (; ; case_unref (c))
400 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
401 if (retval == TRNS_ERROR)
406 /* Read a case from source. */
407 c = casereader_read (ds->source);
410 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
411 caseinit_init_vars (ds->caseinit, c);
413 /* Execute permanent transformations. */
414 case_nr = ds->cases_written + 1;
415 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
417 caseinit_update_left_vars (ds->caseinit, c);
418 if (retval != TRNS_CONTINUE)
421 /* Write case to collection of lagged cases. */
424 while (deque_count (&ds->lag) >= ds->n_lag)
425 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
426 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
429 /* Write case to replacement active file. */
431 if (ds->sink != NULL)
432 casewriter_write (ds->sink,
433 case_map_execute (ds->compactor, case_ref (c)));
435 /* Execute temporary transformations. */
436 if (ds->temporary_trns_chain != NULL)
438 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
439 &c, ds->cases_written);
440 if (retval != TRNS_CONTINUE)
448 /* "destroy" function for procedure casereader. */
450 proc_casereader_destroy (struct casereader *reader, void *ds_)
452 struct dataset *ds = ds_;
455 /* We are always the subreader for a casereader_buffer, so if we're being
456 destroyed then it's because the casereader_buffer has read all the cases
457 that it ever will. */
460 /* Make sure transformations happen for every input case, in
461 case they have side effects, and ensure that the replacement
462 active file gets all the cases it should. */
463 while ((c = casereader_read (reader)) != NULL)
466 ds->proc_state = PROC_CLOSED;
467 ds->ok = casereader_destroy (ds->source) && ds->ok;
469 dataset_set_source (ds, NULL);
472 /* Must return false if the source casereader, a transformation,
473 or the sink casewriter signaled an error. (If a temporary
474 transformation signals an error, then the return value is
475 false, but the replacement active file may still be
478 proc_commit (struct dataset *ds)
480 if (ds->shim != NULL)
481 casereader_shim_slurp (ds->shim);
483 assert (ds->proc_state == PROC_CLOSED);
484 ds->proc_state = PROC_COMMITTED;
486 dataset_changed__ (ds);
488 /* Free memory for lagged cases. */
489 while (!deque_is_empty (&ds->lag))
490 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
491 free (ds->lag_cases);
493 /* Dictionary from before TEMPORARY becomes permanent. */
494 proc_cancel_temporary_transformations (ds);
496 if (!ds->discard_output)
498 /* Finish compacting. */
499 if (ds->compactor != NULL)
501 case_map_destroy (ds->compactor);
502 ds->compactor = NULL;
504 dict_delete_scratch_vars (ds->dict);
505 dict_compact_values (ds->dict);
508 /* Old data sink becomes new data source. */
509 if (ds->sink != NULL)
510 ds->source = casewriter_make_reader (ds->sink);
515 ds->discard_output = false;
519 caseinit_clear (ds->caseinit);
520 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
522 dict_clear_vectors (ds->dict);
523 ds->permanent_dict = NULL;
524 return proc_cancel_all_transformations (ds) && ds->ok;
527 /* Casereader class for procedure execution. */
528 static const struct casereader_class proc_casereader_class =
530 proc_casereader_read,
531 proc_casereader_destroy,
536 /* Updates last_proc_invocation. */
538 update_last_proc_invocation (struct dataset *ds)
540 ds->last_proc_invocation = time (NULL);
543 /* Returns a pointer to the lagged case from N_BEFORE cases before the
544 current one, or NULL if there haven't been that many cases yet. */
546 lagged_case (const struct dataset *ds, int n_before)
548 assert (n_before >= 1);
549 assert (n_before <= ds->n_lag);
551 if (n_before <= deque_count (&ds->lag))
552 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
557 /* Returns the current set of permanent transformations,
558 and clears the permanent transformations.
559 For use by INPUT PROGRAM. */
561 proc_capture_transformations (struct dataset *ds)
563 struct trns_chain *chain;
565 assert (ds->temporary_trns_chain == NULL);
566 chain = ds->permanent_trns_chain;
567 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
568 dataset_transformations_changed__ (ds, false);
573 /* Adds a transformation that processes a case with PROC and
574 frees itself with FREE to the current set of transformations.
575 The functions are passed AUX as auxiliary data. */
577 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
579 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
580 dataset_transformations_changed__ (ds, true);
583 /* Adds a transformation that processes a case with PROC and
584 frees itself with FREE to the current set of transformations.
585 When parsing of the block of transformations is complete,
586 FINALIZE will be called.
587 The functions are passed AUX as auxiliary data. */
589 add_transformation_with_finalizer (struct dataset *ds,
590 trns_finalize_func *finalize,
591 trns_proc_func *proc,
592 trns_free_func *free, void *aux)
594 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
595 dataset_transformations_changed__ (ds, true);
598 /* Returns the index of the next transformation.
599 This value can be returned by a transformation procedure
600 function to indicate a "jump" to that transformation. */
602 next_transformation (const struct dataset *ds)
604 return trns_chain_next (ds->cur_trns_chain);
607 /* Returns true if the next call to add_transformation() will add
608 a temporary transformation, false if it will add a permanent
611 proc_in_temporary_transformations (const struct dataset *ds)
613 return ds->temporary_trns_chain != NULL;
616 /* Marks the start of temporary transformations.
617 Further calls to add_transformation() will add temporary
620 proc_start_temporary_transformations (struct dataset *ds)
622 if (!proc_in_temporary_transformations (ds))
624 add_case_limit_trns (ds);
626 ds->permanent_dict = dict_clone (ds->dict);
628 trns_chain_finalize (ds->permanent_trns_chain);
629 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
630 dataset_transformations_changed__ (ds, true);
634 /* Converts all the temporary transformations, if any, to
635 permanent transformations. Further transformations will be
637 Returns true if anything changed, false otherwise. */
639 proc_make_temporary_transformations_permanent (struct dataset *ds)
641 if (proc_in_temporary_transformations (ds))
643 trns_chain_finalize (ds->temporary_trns_chain);
644 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
645 ds->temporary_trns_chain = NULL;
647 dict_destroy (ds->permanent_dict);
648 ds->permanent_dict = NULL;
656 /* Cancels all temporary transformations, if any. Further
657 transformations will be permanent.
658 Returns true if anything changed, false otherwise. */
660 proc_cancel_temporary_transformations (struct dataset *ds)
662 if (proc_in_temporary_transformations (ds))
664 dict_destroy (ds->dict);
665 ds->dict = ds->permanent_dict;
666 ds->permanent_dict = NULL;
668 trns_chain_destroy (ds->temporary_trns_chain);
669 ds->temporary_trns_chain = NULL;
670 dataset_transformations_changed__ (
671 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
678 /* Cancels all transformations, if any.
679 Returns true if successful, false on I/O error. */
681 proc_cancel_all_transformations (struct dataset *ds)
684 assert (ds->proc_state == PROC_COMMITTED);
685 ok = trns_chain_destroy (ds->permanent_trns_chain);
686 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
687 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
688 ds->temporary_trns_chain = NULL;
689 dataset_transformations_changed__ (ds, false);
694 /* Causes output from the next procedure to be discarded, instead
695 of being preserved for use as input for the next procedure. */
697 proc_discard_output (struct dataset *ds)
699 ds->discard_output = true;
703 /* Checks whether DS has a corrupted active file. If so,
704 discards it and returns false. If not, returns true without
707 dataset_end_of_command (struct dataset *ds)
709 if (ds->source != NULL)
711 if (casereader_error (ds->source))
718 const struct taint *taint = casereader_get_taint (ds->source);
719 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
720 assert (!taint_has_tainted_successor (taint));
726 static trns_proc_func case_limit_trns_proc;
727 static trns_free_func case_limit_trns_free;
729 /* Adds a transformation that limits the number of cases that may
730 pass through, if DS->DICT has a case limit. */
732 add_case_limit_trns (struct dataset *ds)
734 casenumber case_limit = dict_get_case_limit (ds->dict);
737 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
738 *cases_remaining = case_limit;
739 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
741 dict_set_case_limit (ds->dict, 0);
745 /* Limits the maximum number of cases processed to
748 case_limit_trns_proc (void *cases_remaining_,
749 struct ccase **c UNUSED, casenumber case_nr UNUSED)
751 size_t *cases_remaining = cases_remaining_;
752 if (*cases_remaining > 0)
754 (*cases_remaining)--;
755 return TRNS_CONTINUE;
758 return TRNS_DROP_CASE;
761 /* Frees the data associated with a case limit transformation. */
763 case_limit_trns_free (void *cases_remaining_)
765 size_t *cases_remaining = cases_remaining_;
766 free (cases_remaining);
770 static trns_proc_func filter_trns_proc;
772 /* Adds a temporary transformation to filter data according to
773 the variable specified on FILTER, if any. */
775 add_filter_trns (struct dataset *ds)
777 struct variable *filter_var = dict_get_filter (ds->dict);
778 if (filter_var != NULL)
780 proc_start_temporary_transformations (ds);
781 add_transformation (ds, filter_trns_proc, NULL, filter_var);
785 /* FILTER transformation. */
787 filter_trns_proc (void *filter_var_,
788 struct ccase **c UNUSED, casenumber case_nr UNUSED)
791 struct variable *filter_var = filter_var_;
792 double f = case_num (*c, filter_var);
793 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
794 ? TRNS_CONTINUE : TRNS_DROP_CASE);
799 dataset_need_lag (struct dataset *ds, int n_before)
801 ds->n_lag = MAX (ds->n_lag, n_before);
805 dataset_changed__ (struct dataset *ds)
807 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
808 ds->callbacks->changed (ds->cb_data);
812 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
814 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
815 ds->callbacks->transformations_changed (non_empty, ds->cb_data);