1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/dataset.h"
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/transformations.h"
36 #include "data/variable.h"
37 #include "libpspp/deque.h"
38 #include "libpspp/misc.h"
39 #include "libpspp/str.h"
40 #include "libpspp/taint.h"
41 #include "libpspp/i18n.h"
43 #include "gl/minmax.h"
44 #include "gl/xalloc.h"
47 /* Cases are read from source,
48 their transformation variables are initialized,
49 pass through permanent_trns_chain (which transforms them into
50 the format described by permanent_dict),
52 pass through temporary_trns_chain (which transforms them into
53 the format described by dict),
54 and are finally passed to the procedure. */
55 struct casereader *source;
56 struct caseinit *caseinit;
57 struct trns_chain *permanent_trns_chain;
58 struct dictionary *permanent_dict;
59 struct casewriter *sink;
60 struct trns_chain *temporary_trns_chain;
61 struct dictionary *dict;
63 /* Callback which occurs whenever the transformation chain(s) have
65 transformation_change_callback_func *xform_callback;
66 void *xform_callback_aux;
68 /* If true, cases are discarded instead of being written to
72 /* The transformation chain that the next transformation will be
74 struct trns_chain *cur_trns_chain;
76 /* The case map used to compact a case, if necessary;
77 otherwise a null pointer. */
78 struct case_map *compactor;
80 /* Time at which proc was last invoked. */
81 time_t last_proc_invocation;
83 /* Cases just before ("lagging") the current one. */
84 int n_lag; /* Number of cases to lag. */
85 struct deque lag; /* Deque of lagged cases. */
86 struct ccase **lag_cases; /* Lagged cases managed by deque. */
91 PROC_COMMITTED, /* No procedure in progress. */
92 PROC_OPEN, /* proc_open called, casereader still open. */
93 PROC_CLOSED /* casereader from proc_open destroyed,
94 but proc_commit not yet called. */
97 casenumber cases_written; /* Cases output so far. */
98 bool ok; /* Error status. */
99 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
101 void (*callback) (void *); /* Callback for when the dataset changes */
104 /* Default encoding for reading syntax files. */
105 char *syntax_encoding;
106 }; /* struct dataset */
109 static void add_case_limit_trns (struct dataset *ds);
110 static void add_filter_trns (struct dataset *ds);
112 static void update_last_proc_invocation (struct dataset *ds);
115 dataset_set_unsaved (const struct dataset *ds)
117 if (ds->callback) ds->callback (ds->cb_data);
121 /* Public functions. */
124 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
127 ds->cb_data = cb_data;
131 dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding)
133 free (ds->syntax_encoding);
134 ds->syntax_encoding = xstrdup (encoding);
138 dataset_get_default_syntax_encoding (const struct dataset *ds)
140 return ds->syntax_encoding;
143 /* Returns the last time the data was read. */
145 time_of_last_procedure (struct dataset *ds)
147 if (ds->last_proc_invocation == 0)
148 update_last_proc_invocation (ds);
149 return ds->last_proc_invocation;
152 /* Regular procedure. */
154 /* Executes any pending transformations, if necessary.
155 This is not identical to the EXECUTE command in that it won't
156 always read the source data. This can be important when the
157 source data is given inline within BEGIN DATA...END FILE. */
159 proc_execute (struct dataset *ds)
163 if ((ds->temporary_trns_chain == NULL
164 || trns_chain_is_empty (ds->temporary_trns_chain))
165 && trns_chain_is_empty (ds->permanent_trns_chain))
168 ds->discard_output = false;
169 dict_set_case_limit (ds->dict, 0);
170 dict_clear_vectors (ds->dict);
174 ok = casereader_destroy (proc_open (ds));
175 return proc_commit (ds) && ok;
178 static const struct casereader_class proc_casereader_class;
180 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
181 cases filtered out with FILTER BY will not be included in the casereader
182 (which is usually desirable). If FILTER is false, all cases will be
183 included regardless of FILTER BY settings.
185 proc_commit must be called when done. */
187 proc_open_filtering (struct dataset *ds, bool filter)
189 struct casereader *reader;
191 assert (ds->source != NULL);
192 assert (ds->proc_state == PROC_COMMITTED);
194 update_last_proc_invocation (ds);
196 caseinit_mark_for_init (ds->caseinit, ds->dict);
198 /* Finish up the collection of transformations. */
199 add_case_limit_trns (ds);
201 add_filter_trns (ds);
202 trns_chain_finalize (ds->cur_trns_chain);
204 /* Make permanent_dict refer to the dictionary right before
205 data reaches the sink. */
206 if (ds->permanent_dict == NULL)
207 ds->permanent_dict = ds->dict;
210 if (!ds->discard_output)
212 struct dictionary *pd = ds->permanent_dict;
213 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
214 if (compacted_value_cnt < dict_get_next_value_idx (pd))
216 struct caseproto *compacted_proto;
217 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
218 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
219 ds->sink = autopaging_writer_create (compacted_proto);
220 caseproto_unref (compacted_proto);
224 ds->compactor = NULL;
225 ds->sink = autopaging_writer_create (dict_get_proto (pd));
230 ds->compactor = NULL;
234 /* Allocate memory for lagged cases. */
235 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
237 ds->proc_state = PROC_OPEN;
238 ds->cases_written = 0;
241 /* FIXME: use taint in dataset in place of `ok'? */
242 /* FIXME: for trivial cases we can just return a clone of
245 /* Create casereader and insert a shim on top. The shim allows us to
246 arbitrarily extend the casereader's lifetime, by slurping the cases into
247 the shim's buffer in proc_commit(). That is especially useful when output
248 table_items are generated directly from the procedure casereader (e.g. by
249 the LIST procedure) when we are using an output driver that keeps a
250 reference to the output items passed to it (e.g. the GUI output driver in
252 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
254 &proc_casereader_class, ds);
255 ds->shim = casereader_shim_insert (reader);
259 /* Opens dataset DS for reading cases with proc_read.
260 proc_commit must be called when done. */
262 proc_open (struct dataset *ds)
264 return proc_open_filtering (ds, true);
267 /* Returns true if a procedure is in progress, that is, if
268 proc_open has been called but proc_commit has not. */
270 proc_is_open (const struct dataset *ds)
272 return ds->proc_state != PROC_COMMITTED;
275 /* "read" function for procedure casereader. */
276 static struct ccase *
277 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
279 struct dataset *ds = ds_;
280 enum trns_result retval = TRNS_DROP_CASE;
283 assert (ds->proc_state == PROC_OPEN);
284 for (; ; case_unref (c))
288 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
289 if (retval == TRNS_ERROR)
294 /* Read a case from source. */
295 c = casereader_read (ds->source);
298 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
299 caseinit_init_vars (ds->caseinit, c);
301 /* Execute permanent transformations. */
302 case_nr = ds->cases_written + 1;
303 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
305 caseinit_update_left_vars (ds->caseinit, c);
306 if (retval != TRNS_CONTINUE)
309 /* Write case to collection of lagged cases. */
312 while (deque_count (&ds->lag) >= ds->n_lag)
313 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
314 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
317 /* Write case to replacement active file. */
319 if (ds->sink != NULL)
320 casewriter_write (ds->sink,
321 case_map_execute (ds->compactor, case_ref (c)));
323 /* Execute temporary transformations. */
324 if (ds->temporary_trns_chain != NULL)
326 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
327 &c, ds->cases_written);
328 if (retval != TRNS_CONTINUE)
336 /* "destroy" function for procedure casereader. */
338 proc_casereader_destroy (struct casereader *reader, void *ds_)
340 struct dataset *ds = ds_;
343 /* We are always the subreader for a casereader_buffer, so if we're being
344 destroyed then it's because the casereader_buffer has read all the cases
345 that it ever will. */
348 /* Make sure transformations happen for every input case, in
349 case they have side effects, and ensure that the replacement
350 active file gets all the cases it should. */
351 while ((c = casereader_read (reader)) != NULL)
354 ds->proc_state = PROC_CLOSED;
355 ds->ok = casereader_destroy (ds->source) && ds->ok;
357 proc_set_active_file_data (ds, NULL);
360 /* Must return false if the source casereader, a transformation,
361 or the sink casewriter signaled an error. (If a temporary
362 transformation signals an error, then the return value is
363 false, but the replacement active file may still be
366 proc_commit (struct dataset *ds)
368 if (ds->shim != NULL)
369 casereader_shim_slurp (ds->shim);
371 assert (ds->proc_state == PROC_CLOSED);
372 ds->proc_state = PROC_COMMITTED;
374 dataset_set_unsaved (ds);
376 /* Free memory for lagged cases. */
377 while (!deque_is_empty (&ds->lag))
378 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
379 free (ds->lag_cases);
381 /* Dictionary from before TEMPORARY becomes permanent. */
382 proc_cancel_temporary_transformations (ds);
384 if (!ds->discard_output)
386 /* Finish compacting. */
387 if (ds->compactor != NULL)
389 case_map_destroy (ds->compactor);
390 ds->compactor = NULL;
392 dict_delete_scratch_vars (ds->dict);
393 dict_compact_values (ds->dict);
396 /* Old data sink becomes new data source. */
397 if (ds->sink != NULL)
398 ds->source = casewriter_make_reader (ds->sink);
403 ds->discard_output = false;
407 caseinit_clear (ds->caseinit);
408 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
410 dict_clear_vectors (ds->dict);
411 ds->permanent_dict = NULL;
412 return proc_cancel_all_transformations (ds) && ds->ok;
415 /* Casereader class for procedure execution. */
416 static const struct casereader_class proc_casereader_class =
418 proc_casereader_read,
419 proc_casereader_destroy,
424 /* Updates last_proc_invocation. */
426 update_last_proc_invocation (struct dataset *ds)
428 ds->last_proc_invocation = time (NULL);
431 /* Returns a pointer to the lagged case from N_BEFORE cases before the
432 current one, or NULL if there haven't been that many cases yet. */
434 lagged_case (const struct dataset *ds, int n_before)
436 assert (n_before >= 1);
437 assert (n_before <= ds->n_lag);
439 if (n_before <= deque_count (&ds->lag))
440 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
445 /* Returns the current set of permanent transformations,
446 and clears the permanent transformations.
447 For use by INPUT PROGRAM. */
449 proc_capture_transformations (struct dataset *ds)
451 struct trns_chain *chain;
453 assert (ds->temporary_trns_chain == NULL);
454 chain = ds->permanent_trns_chain;
455 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
457 if ( ds->xform_callback)
458 ds->xform_callback (false, ds->xform_callback_aux);
463 /* Adds a transformation that processes a case with PROC and
464 frees itself with FREE to the current set of transformations.
465 The functions are passed AUX as auxiliary data. */
467 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
469 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
470 if ( ds->xform_callback)
471 ds->xform_callback (true, ds->xform_callback_aux);
474 /* Adds a transformation that processes a case with PROC and
475 frees itself with FREE to the current set of transformations.
476 When parsing of the block of transformations is complete,
477 FINALIZE will be called.
478 The functions are passed AUX as auxiliary data. */
480 add_transformation_with_finalizer (struct dataset *ds,
481 trns_finalize_func *finalize,
482 trns_proc_func *proc,
483 trns_free_func *free, void *aux)
485 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
487 if ( ds->xform_callback)
488 ds->xform_callback (true, ds->xform_callback_aux);
491 /* Returns the index of the next transformation.
492 This value can be returned by a transformation procedure
493 function to indicate a "jump" to that transformation. */
495 next_transformation (const struct dataset *ds)
497 return trns_chain_next (ds->cur_trns_chain);
500 /* Returns true if the next call to add_transformation() will add
501 a temporary transformation, false if it will add a permanent
504 proc_in_temporary_transformations (const struct dataset *ds)
506 return ds->temporary_trns_chain != NULL;
509 /* Marks the start of temporary transformations.
510 Further calls to add_transformation() will add temporary
513 proc_start_temporary_transformations (struct dataset *ds)
515 if (!proc_in_temporary_transformations (ds))
517 add_case_limit_trns (ds);
519 ds->permanent_dict = dict_clone (ds->dict);
521 trns_chain_finalize (ds->permanent_trns_chain);
522 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
524 if ( ds->xform_callback)
525 ds->xform_callback (true, ds->xform_callback_aux);
529 /* Converts all the temporary transformations, if any, to
530 permanent transformations. Further transformations will be
532 Returns true if anything changed, false otherwise. */
534 proc_make_temporary_transformations_permanent (struct dataset *ds)
536 if (proc_in_temporary_transformations (ds))
538 trns_chain_finalize (ds->temporary_trns_chain);
539 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
540 ds->temporary_trns_chain = NULL;
542 dict_destroy (ds->permanent_dict);
543 ds->permanent_dict = NULL;
551 /* Cancels all temporary transformations, if any. Further
552 transformations will be permanent.
553 Returns true if anything changed, false otherwise. */
555 proc_cancel_temporary_transformations (struct dataset *ds)
557 if (proc_in_temporary_transformations (ds))
559 dict_destroy (ds->dict);
560 ds->dict = ds->permanent_dict;
561 ds->permanent_dict = NULL;
563 trns_chain_destroy (ds->temporary_trns_chain);
564 ds->temporary_trns_chain = NULL;
566 if ( ds->xform_callback)
567 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
568 ds->xform_callback_aux);
576 /* Cancels all transformations, if any.
577 Returns true if successful, false on I/O error. */
579 proc_cancel_all_transformations (struct dataset *ds)
582 assert (ds->proc_state == PROC_COMMITTED);
583 ok = trns_chain_destroy (ds->permanent_trns_chain);
584 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
585 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
586 ds->temporary_trns_chain = NULL;
587 if ( ds->xform_callback)
588 ds->xform_callback (false, ds->xform_callback_aux);
595 dict_callback (struct dictionary *d UNUSED, void *ds_)
597 struct dataset *ds = ds_;
598 dataset_set_unsaved (ds);
601 /* Initializes procedure handling. */
603 create_dataset (void)
605 struct dataset *ds = xzalloc (sizeof(*ds));
606 ds->dict = dict_create ();
608 dict_set_change_callback (ds->dict, dict_callback, ds);
610 dict_set_encoding (ds->dict, get_default_encoding ());
612 ds->caseinit = caseinit_create ();
613 proc_cancel_all_transformations (ds);
615 ds->syntax_encoding = xstrdup ("Auto");
622 dataset_add_transform_change_callback (struct dataset *ds,
623 transformation_change_callback_func *cb,
626 ds->xform_callback = cb;
627 ds->xform_callback_aux = aux;
630 /* Finishes up procedure handling. */
632 destroy_dataset (struct dataset *ds)
634 proc_discard_active_file (ds);
635 dict_destroy (ds->dict);
636 caseinit_destroy (ds->caseinit);
637 trns_chain_destroy (ds->permanent_trns_chain);
639 if ( ds->xform_callback)
640 ds->xform_callback (false, ds->xform_callback_aux);
642 free (ds->syntax_encoding);
646 /* Causes output from the next procedure to be discarded, instead
647 of being preserved for use as input for the next procedure. */
649 proc_discard_output (struct dataset *ds)
651 ds->discard_output = true;
654 /* Discards the active file dictionary, data, and
657 proc_discard_active_file (struct dataset *ds)
659 assert (ds->proc_state == PROC_COMMITTED);
661 dict_clear (ds->dict);
662 fh_set_default_handle (NULL);
666 casereader_destroy (ds->source);
669 proc_cancel_all_transformations (ds);
672 /* Sets SOURCE as the source for procedure input for the next
675 proc_set_active_file (struct dataset *ds,
676 struct casereader *source,
677 struct dictionary *dict)
679 assert (ds->proc_state == PROC_COMMITTED);
680 assert (ds->dict != dict);
682 proc_discard_active_file (ds);
684 dict_destroy (ds->dict);
686 dict_set_change_callback (ds->dict, dict_callback, ds);
688 proc_set_active_file_data (ds, source);
691 /* Replaces the active file's data by READER without replacing
692 the associated dictionary. */
694 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
696 casereader_destroy (ds->source);
699 caseinit_clear (ds->caseinit);
700 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
702 return reader == NULL || !casereader_error (reader);
705 /* Returns true if an active file data source is available, false
708 proc_has_active_file (const struct dataset *ds)
710 return ds->source != NULL;
713 /* Returns the active file data source from DS, or a null pointer
714 if DS has no data source, and removes it from DS. */
716 proc_extract_active_file_data (struct dataset *ds)
718 struct casereader *reader = ds->source;
724 /* Checks whether DS has a corrupted active file. If so,
725 discards it and returns false. If not, returns true without
728 dataset_end_of_command (struct dataset *ds)
730 if (ds->source != NULL)
732 if (casereader_error (ds->source))
734 proc_discard_active_file (ds);
739 const struct taint *taint = casereader_get_taint (ds->source);
740 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
741 assert (!taint_has_tainted_successor (taint));
747 static trns_proc_func case_limit_trns_proc;
748 static trns_free_func case_limit_trns_free;
750 /* Adds a transformation that limits the number of cases that may
751 pass through, if DS->DICT has a case limit. */
753 add_case_limit_trns (struct dataset *ds)
755 casenumber case_limit = dict_get_case_limit (ds->dict);
758 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
759 *cases_remaining = case_limit;
760 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
762 dict_set_case_limit (ds->dict, 0);
766 /* Limits the maximum number of cases processed to
769 case_limit_trns_proc (void *cases_remaining_,
770 struct ccase **c UNUSED, casenumber case_nr UNUSED)
772 size_t *cases_remaining = cases_remaining_;
773 if (*cases_remaining > 0)
775 (*cases_remaining)--;
776 return TRNS_CONTINUE;
779 return TRNS_DROP_CASE;
782 /* Frees the data associated with a case limit transformation. */
784 case_limit_trns_free (void *cases_remaining_)
786 size_t *cases_remaining = cases_remaining_;
787 free (cases_remaining);
791 static trns_proc_func filter_trns_proc;
793 /* Adds a temporary transformation to filter data according to
794 the variable specified on FILTER, if any. */
796 add_filter_trns (struct dataset *ds)
798 struct variable *filter_var = dict_get_filter (ds->dict);
799 if (filter_var != NULL)
801 proc_start_temporary_transformations (ds);
802 add_transformation (ds, filter_trns_proc, NULL, filter_var);
806 /* FILTER transformation. */
808 filter_trns_proc (void *filter_var_,
809 struct ccase **c UNUSED, casenumber case_nr UNUSED)
812 struct variable *filter_var = filter_var_;
813 double f = case_num (*c, filter_var);
814 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
815 ? TRNS_CONTINUE : TRNS_DROP_CASE);
820 dataset_dict (const struct dataset *ds)
825 const struct casereader *
826 dataset_source (const struct dataset *ds)
832 dataset_need_lag (struct dataset *ds, int n_before)
834 ds->n_lag = MAX (ds->n_lag, n_before);