1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39 #include <libpspp/i18n.h>
45 /* Cases are read from source,
46 their transformation variables are initialized,
47 pass through permanent_trns_chain (which transforms them into
48 the format described by permanent_dict),
50 pass through temporary_trns_chain (which transforms them into
51 the format described by dict),
52 and are finally passed to the procedure. */
53 struct casereader *source;
54 struct caseinit *caseinit;
55 struct trns_chain *permanent_trns_chain;
56 struct dictionary *permanent_dict;
57 struct casewriter *sink;
58 struct trns_chain *temporary_trns_chain;
59 struct dictionary *dict;
61 /* Callback which occurs whenever the transformation chain(s) have
63 transformation_change_callback_func *xform_callback;
64 void *xform_callback_aux;
66 /* If true, cases are discarded instead of being written to
70 /* The transformation chain that the next transformation will be
72 struct trns_chain *cur_trns_chain;
74 /* The case map used to compact a case, if necessary;
75 otherwise a null pointer. */
76 struct case_map *compactor;
78 /* Time at which proc was last invoked. */
79 time_t last_proc_invocation;
81 /* Cases just before ("lagging") the current one. */
82 int n_lag; /* Number of cases to lag. */
83 struct deque lag; /* Deque of lagged cases. */
84 struct ccase **lag_cases; /* Lagged cases managed by deque. */
89 PROC_COMMITTED, /* No procedure in progress. */
90 PROC_OPEN, /* proc_open called, casereader still open. */
91 PROC_CLOSED /* casereader from proc_open destroyed,
92 but proc_commit not yet called. */
95 casenumber cases_written; /* Cases output so far. */
96 bool ok; /* Error status. */
98 void (*callback) (void *); /* Callback for when the dataset changes */
101 }; /* struct dataset */
104 static void add_case_limit_trns (struct dataset *ds);
105 static void add_filter_trns (struct dataset *ds);
107 static void update_last_proc_invocation (struct dataset *ds);
110 dataset_set_unsaved (const struct dataset *ds)
112 if (ds->callback) ds->callback (ds->cb_data);
116 /* Public functions. */
119 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
122 ds->cb_data = cb_data;
126 /* Returns the last time the data was read. */
128 time_of_last_procedure (struct dataset *ds)
130 if (ds->last_proc_invocation == 0)
131 update_last_proc_invocation (ds);
132 return ds->last_proc_invocation;
135 /* Regular procedure. */
137 /* Executes any pending transformations, if necessary.
138 This is not identical to the EXECUTE command in that it won't
139 always read the source data. This can be important when the
140 source data is given inline within BEGIN DATA...END FILE. */
142 proc_execute (struct dataset *ds)
146 if ((ds->temporary_trns_chain == NULL
147 || trns_chain_is_empty (ds->temporary_trns_chain))
148 && trns_chain_is_empty (ds->permanent_trns_chain))
151 ds->discard_output = false;
152 dict_set_case_limit (ds->dict, 0);
153 dict_clear_vectors (ds->dict);
157 ok = casereader_destroy (proc_open (ds));
158 return proc_commit (ds) && ok;
161 static const struct casereader_class proc_casereader_class;
163 /* Opens dataset DS for reading cases with proc_read.
164 proc_commit must be called when done. */
166 proc_open (struct dataset *ds)
168 assert (ds->source != NULL);
169 assert (ds->proc_state == PROC_COMMITTED);
171 update_last_proc_invocation (ds);
173 caseinit_mark_for_init (ds->caseinit, ds->dict);
175 /* Finish up the collection of transformations. */
176 add_case_limit_trns (ds);
177 add_filter_trns (ds);
178 trns_chain_finalize (ds->cur_trns_chain);
180 /* Make permanent_dict refer to the dictionary right before
181 data reaches the sink. */
182 if (ds->permanent_dict == NULL)
183 ds->permanent_dict = ds->dict;
186 if (!ds->discard_output)
188 struct dictionary *pd = ds->permanent_dict;
189 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
190 if (compacted_value_cnt < dict_get_next_value_idx (pd))
192 struct caseproto *compacted_proto;
193 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
194 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
195 ds->sink = autopaging_writer_create (compacted_proto);
196 caseproto_unref (compacted_proto);
200 ds->compactor = NULL;
201 ds->sink = autopaging_writer_create (dict_get_proto (pd));
206 ds->compactor = NULL;
210 /* Allocate memory for lagged cases. */
211 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
213 ds->proc_state = PROC_OPEN;
214 ds->cases_written = 0;
217 /* FIXME: use taint in dataset in place of `ok'? */
218 /* FIXME: for trivial cases we can just return a clone of
220 return casereader_create_sequential (NULL, dict_get_proto (ds->dict),
222 &proc_casereader_class, ds);
225 /* Returns true if a procedure is in progress, that is, if
226 proc_open has been called but proc_commit has not. */
228 proc_is_open (const struct dataset *ds)
230 return ds->proc_state != PROC_COMMITTED;
233 /* "read" function for procedure casereader. */
234 static struct ccase *
235 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
237 struct dataset *ds = ds_;
238 enum trns_result retval = TRNS_DROP_CASE;
241 assert (ds->proc_state == PROC_OPEN);
242 for (; ; case_unref (c))
246 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
247 if (retval == TRNS_ERROR)
252 /* Read a case from source. */
253 c = casereader_read (ds->source);
256 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
257 caseinit_init_vars (ds->caseinit, c);
259 /* Execute permanent transformations. */
260 case_nr = ds->cases_written + 1;
261 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
263 caseinit_update_left_vars (ds->caseinit, c);
264 if (retval != TRNS_CONTINUE)
267 /* Write case to collection of lagged cases. */
270 while (deque_count (&ds->lag) >= ds->n_lag)
271 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
272 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
275 /* Write case to replacement active file. */
277 if (ds->sink != NULL)
278 casewriter_write (ds->sink,
279 case_map_execute (ds->compactor, case_ref (c)));
281 /* Execute temporary transformations. */
282 if (ds->temporary_trns_chain != NULL)
284 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
285 &c, ds->cases_written);
286 if (retval != TRNS_CONTINUE)
294 /* "destroy" function for procedure casereader. */
296 proc_casereader_destroy (struct casereader *reader, void *ds_)
298 struct dataset *ds = ds_;
301 /* Make sure transformations happen for every input case, in
302 case they have side effects, and ensure that the replacement
303 active file gets all the cases it should. */
304 while ((c = casereader_read (reader)) != NULL)
307 ds->proc_state = PROC_CLOSED;
308 ds->ok = casereader_destroy (ds->source) && ds->ok;
310 proc_set_active_file_data (ds, NULL);
313 /* Must return false if the source casereader, a transformation,
314 or the sink casewriter signaled an error. (If a temporary
315 transformation signals an error, then the return value is
316 false, but the replacement active file may still be
319 proc_commit (struct dataset *ds)
321 assert (ds->proc_state == PROC_CLOSED);
322 ds->proc_state = PROC_COMMITTED;
324 dataset_set_unsaved (ds);
326 /* Free memory for lagged cases. */
327 while (!deque_is_empty (&ds->lag))
328 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
329 free (ds->lag_cases);
331 /* Dictionary from before TEMPORARY becomes permanent. */
332 proc_cancel_temporary_transformations (ds);
334 if (!ds->discard_output)
336 /* Finish compacting. */
337 if (ds->compactor != NULL)
339 case_map_destroy (ds->compactor);
340 ds->compactor = NULL;
342 dict_delete_scratch_vars (ds->dict);
343 dict_compact_values (ds->dict);
346 /* Old data sink becomes new data source. */
347 if (ds->sink != NULL)
348 ds->source = casewriter_make_reader (ds->sink);
353 ds->discard_output = false;
357 caseinit_clear (ds->caseinit);
358 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
360 dict_clear_vectors (ds->dict);
361 ds->permanent_dict = NULL;
362 return proc_cancel_all_transformations (ds) && ds->ok;
365 /* Casereader class for procedure execution. */
366 static const struct casereader_class proc_casereader_class =
368 proc_casereader_read,
369 proc_casereader_destroy,
374 /* Updates last_proc_invocation. */
376 update_last_proc_invocation (struct dataset *ds)
378 ds->last_proc_invocation = time (NULL);
381 /* Returns a pointer to the lagged case from N_BEFORE cases before the
382 current one, or NULL if there haven't been that many cases yet. */
384 lagged_case (const struct dataset *ds, int n_before)
386 assert (n_before >= 1);
387 assert (n_before <= ds->n_lag);
389 if (n_before <= deque_count (&ds->lag))
390 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
395 /* Returns the current set of permanent transformations,
396 and clears the permanent transformations.
397 For use by INPUT PROGRAM. */
399 proc_capture_transformations (struct dataset *ds)
401 struct trns_chain *chain;
403 assert (ds->temporary_trns_chain == NULL);
404 chain = ds->permanent_trns_chain;
405 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
407 if ( ds->xform_callback)
408 ds->xform_callback (false, ds->xform_callback_aux);
413 /* Adds a transformation that processes a case with PROC and
414 frees itself with FREE to the current set of transformations.
415 The functions are passed AUX as auxiliary data. */
417 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
419 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
420 if ( ds->xform_callback)
421 ds->xform_callback (true, ds->xform_callback_aux);
424 /* Adds a transformation that processes a case with PROC and
425 frees itself with FREE to the current set of transformations.
426 When parsing of the block of transformations is complete,
427 FINALIZE will be called.
428 The functions are passed AUX as auxiliary data. */
430 add_transformation_with_finalizer (struct dataset *ds,
431 trns_finalize_func *finalize,
432 trns_proc_func *proc,
433 trns_free_func *free, void *aux)
435 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
437 if ( ds->xform_callback)
438 ds->xform_callback (true, ds->xform_callback_aux);
441 /* Returns the index of the next transformation.
442 This value can be returned by a transformation procedure
443 function to indicate a "jump" to that transformation. */
445 next_transformation (const struct dataset *ds)
447 return trns_chain_next (ds->cur_trns_chain);
450 /* Returns true if the next call to add_transformation() will add
451 a temporary transformation, false if it will add a permanent
454 proc_in_temporary_transformations (const struct dataset *ds)
456 return ds->temporary_trns_chain != NULL;
459 /* Marks the start of temporary transformations.
460 Further calls to add_transformation() will add temporary
463 proc_start_temporary_transformations (struct dataset *ds)
465 if (!proc_in_temporary_transformations (ds))
467 add_case_limit_trns (ds);
469 ds->permanent_dict = dict_clone (ds->dict);
471 trns_chain_finalize (ds->permanent_trns_chain);
472 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
474 if ( ds->xform_callback)
475 ds->xform_callback (true, ds->xform_callback_aux);
479 /* Converts all the temporary transformations, if any, to
480 permanent transformations. Further transformations will be
482 Returns true if anything changed, false otherwise. */
484 proc_make_temporary_transformations_permanent (struct dataset *ds)
486 if (proc_in_temporary_transformations (ds))
488 trns_chain_finalize (ds->temporary_trns_chain);
489 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
490 ds->temporary_trns_chain = NULL;
492 dict_destroy (ds->permanent_dict);
493 ds->permanent_dict = NULL;
501 /* Cancels all temporary transformations, if any. Further
502 transformations will be permanent.
503 Returns true if anything changed, false otherwise. */
505 proc_cancel_temporary_transformations (struct dataset *ds)
507 if (proc_in_temporary_transformations (ds))
509 dict_destroy (ds->dict);
510 ds->dict = ds->permanent_dict;
511 ds->permanent_dict = NULL;
513 trns_chain_destroy (ds->temporary_trns_chain);
514 ds->temporary_trns_chain = NULL;
516 if ( ds->xform_callback)
517 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
518 ds->xform_callback_aux);
526 /* Cancels all transformations, if any.
527 Returns true if successful, false on I/O error. */
529 proc_cancel_all_transformations (struct dataset *ds)
532 assert (ds->proc_state == PROC_COMMITTED);
533 ok = trns_chain_destroy (ds->permanent_trns_chain);
534 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
535 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
536 ds->temporary_trns_chain = NULL;
537 if ( ds->xform_callback)
538 ds->xform_callback (false, ds->xform_callback_aux);
545 dict_callback (struct dictionary *d UNUSED, void *ds_)
547 struct dataset *ds = ds_;
548 dataset_set_unsaved (ds);
551 /* Initializes procedure handling. */
553 create_dataset (void)
555 struct dataset *ds = xzalloc (sizeof(*ds));
556 ds->dict = dict_create ();
558 dict_set_change_callback (ds->dict, dict_callback, ds);
560 dict_set_encoding (ds->dict, get_default_encoding ());
562 ds->caseinit = caseinit_create ();
563 proc_cancel_all_transformations (ds);
569 dataset_add_transform_change_callback (struct dataset *ds,
570 transformation_change_callback_func *cb,
573 ds->xform_callback = cb;
574 ds->xform_callback_aux = aux;
577 /* Finishes up procedure handling. */
579 destroy_dataset (struct dataset *ds)
581 proc_discard_active_file (ds);
582 dict_destroy (ds->dict);
583 caseinit_destroy (ds->caseinit);
584 trns_chain_destroy (ds->permanent_trns_chain);
586 if ( ds->xform_callback)
587 ds->xform_callback (false, ds->xform_callback_aux);
591 /* Causes output from the next procedure to be discarded, instead
592 of being preserved for use as input for the next procedure. */
594 proc_discard_output (struct dataset *ds)
596 ds->discard_output = true;
599 /* Discards the active file dictionary, data, and
602 proc_discard_active_file (struct dataset *ds)
604 assert (ds->proc_state == PROC_COMMITTED);
606 dict_clear (ds->dict);
607 fh_set_default_handle (NULL);
611 casereader_destroy (ds->source);
614 proc_cancel_all_transformations (ds);
617 /* Sets SOURCE as the source for procedure input for the next
620 proc_set_active_file (struct dataset *ds,
621 struct casereader *source,
622 struct dictionary *dict)
624 assert (ds->proc_state == PROC_COMMITTED);
625 assert (ds->dict != dict);
627 proc_discard_active_file (ds);
629 dict_destroy (ds->dict);
631 dict_set_change_callback (ds->dict, dict_callback, ds);
633 proc_set_active_file_data (ds, source);
636 /* Replaces the active file's data by READER without replacing
637 the associated dictionary. */
639 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
641 casereader_destroy (ds->source);
644 caseinit_clear (ds->caseinit);
645 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
647 return reader == NULL || !casereader_error (reader);
650 /* Returns true if an active file data source is available, false
653 proc_has_active_file (const struct dataset *ds)
655 return ds->source != NULL;
658 /* Returns the active file data source from DS, or a null pointer
659 if DS has no data source, and removes it from DS. */
661 proc_extract_active_file_data (struct dataset *ds)
663 struct casereader *reader = ds->source;
669 /* Checks whether DS has a corrupted active file. If so,
670 discards it and returns false. If not, returns true without
673 dataset_end_of_command (struct dataset *ds)
675 if (ds->source != NULL)
677 if (casereader_error (ds->source))
679 proc_discard_active_file (ds);
684 const struct taint *taint = casereader_get_taint (ds->source);
685 taint_reset_successor_taint ((struct taint *) taint);
686 assert (!taint_has_tainted_successor (taint));
692 static trns_proc_func case_limit_trns_proc;
693 static trns_free_func case_limit_trns_free;
695 /* Adds a transformation that limits the number of cases that may
696 pass through, if DS->DICT has a case limit. */
698 add_case_limit_trns (struct dataset *ds)
700 casenumber case_limit = dict_get_case_limit (ds->dict);
703 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
704 *cases_remaining = case_limit;
705 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
707 dict_set_case_limit (ds->dict, 0);
711 /* Limits the maximum number of cases processed to
714 case_limit_trns_proc (void *cases_remaining_,
715 struct ccase **c UNUSED, casenumber case_nr UNUSED)
717 size_t *cases_remaining = cases_remaining_;
718 if (*cases_remaining > 0)
720 (*cases_remaining)--;
721 return TRNS_CONTINUE;
724 return TRNS_DROP_CASE;
727 /* Frees the data associated with a case limit transformation. */
729 case_limit_trns_free (void *cases_remaining_)
731 size_t *cases_remaining = cases_remaining_;
732 free (cases_remaining);
736 static trns_proc_func filter_trns_proc;
738 /* Adds a temporary transformation to filter data according to
739 the variable specified on FILTER, if any. */
741 add_filter_trns (struct dataset *ds)
743 struct variable *filter_var = dict_get_filter (ds->dict);
744 if (filter_var != NULL)
746 proc_start_temporary_transformations (ds);
747 add_transformation (ds, filter_trns_proc, NULL, filter_var);
751 /* FILTER transformation. */
753 filter_trns_proc (void *filter_var_,
754 struct ccase **c UNUSED, casenumber case_nr UNUSED)
757 struct variable *filter_var = filter_var_;
758 double f = case_num (*c, filter_var);
759 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
760 ? TRNS_CONTINUE : TRNS_DROP_CASE);
765 dataset_dict (const struct dataset *ds)
770 const struct casereader *
771 dataset_source (const struct dataset *ds)
777 dataset_need_lag (struct dataset *ds, int n_before)
779 ds->n_lag = MAX (ds->n_lag, n_before);