1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs whenever the transformation chain(s) have
61 transformation_change_callback_func *xform_callback;
62 void *xform_callback_aux;
64 /* If true, cases are discarded instead of being written to
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The case map used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct case_map *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct deque lag; /* Deque of lagged cases. */
82 struct ccase **lag_cases; /* Lagged cases managed by deque. */
87 PROC_COMMITTED, /* No procedure in progress. */
88 PROC_OPEN, /* proc_open called, casereader still open. */
89 PROC_CLOSED /* casereader from proc_open destroyed,
90 but proc_commit not yet called. */
93 casenumber cases_written; /* Cases output so far. */
94 bool ok; /* Error status. */
96 void (*callback) (void *); /* Callback for when the dataset changes */
99 }; /* struct dataset */
102 static void add_case_limit_trns (struct dataset *ds);
103 static void add_filter_trns (struct dataset *ds);
105 static void update_last_proc_invocation (struct dataset *ds);
108 dataset_set_unsaved (const struct dataset *ds)
110 if (ds->callback) ds->callback (ds->cb_data);
114 /* Public functions. */
117 dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
120 ds->cb_data = cb_data;
124 /* Returns the last time the data was read. */
126 time_of_last_procedure (struct dataset *ds)
128 if (ds->last_proc_invocation == 0)
129 update_last_proc_invocation (ds);
130 return ds->last_proc_invocation;
133 /* Regular procedure. */
135 /* Executes any pending transformations, if necessary.
136 This is not identical to the EXECUTE command in that it won't
137 always read the source data. This can be important when the
138 source data is given inline within BEGIN DATA...END FILE. */
140 proc_execute (struct dataset *ds)
144 if ((ds->temporary_trns_chain == NULL
145 || trns_chain_is_empty (ds->temporary_trns_chain))
146 && trns_chain_is_empty (ds->permanent_trns_chain))
149 ds->discard_output = false;
150 dict_set_case_limit (ds->dict, 0);
151 dict_clear_vectors (ds->dict);
155 ok = casereader_destroy (proc_open (ds));
156 return proc_commit (ds) && ok;
159 static const struct casereader_class proc_casereader_class;
161 /* Opens dataset DS for reading cases with proc_read.
162 proc_commit must be called when done. */
164 proc_open (struct dataset *ds)
166 assert (ds->source != NULL);
167 assert (ds->proc_state == PROC_COMMITTED);
169 update_last_proc_invocation (ds);
171 caseinit_mark_for_init (ds->caseinit, ds->dict);
173 /* Finish up the collection of transformations. */
174 add_case_limit_trns (ds);
175 add_filter_trns (ds);
176 trns_chain_finalize (ds->cur_trns_chain);
178 /* Make permanent_dict refer to the dictionary right before
179 data reaches the sink. */
180 if (ds->permanent_dict == NULL)
181 ds->permanent_dict = ds->dict;
184 if (!ds->discard_output)
186 struct dictionary *pd = ds->permanent_dict;
187 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
188 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
189 ds->compactor = (should_compact
190 ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
192 ds->sink = autopaging_writer_create (compacted_value_cnt);
196 ds->compactor = NULL;
200 /* Allocate memory for lagged cases. */
201 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
203 ds->proc_state = PROC_OPEN;
204 ds->cases_written = 0;
207 /* FIXME: use taint in dataset in place of `ok'? */
208 /* FIXME: for trivial cases we can just return a clone of
210 return casereader_create_sequential (NULL,
211 dict_get_next_value_idx (ds->dict),
213 &proc_casereader_class, ds);
216 /* Returns true if a procedure is in progress, that is, if
217 proc_open has been called but proc_commit has not. */
219 proc_is_open (const struct dataset *ds)
221 return ds->proc_state != PROC_COMMITTED;
224 /* "read" function for procedure casereader. */
225 static struct ccase *
226 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
228 struct dataset *ds = ds_;
229 enum trns_result retval = TRNS_DROP_CASE;
232 assert (ds->proc_state == PROC_OPEN);
233 for (; ; case_unref (c))
237 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
238 if (retval == TRNS_ERROR)
243 /* Read a case from source. */
244 c = casereader_read (ds->source);
247 c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
248 caseinit_init_vars (ds->caseinit, c);
250 /* Execute permanent transformations. */
251 case_nr = ds->cases_written + 1;
252 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
254 caseinit_update_left_vars (ds->caseinit, c);
255 if (retval != TRNS_CONTINUE)
258 /* Write case to collection of lagged cases. */
261 while (deque_count (&ds->lag) >= ds->n_lag)
262 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
263 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
266 /* Write case to replacement active file. */
268 if (ds->sink != NULL)
269 casewriter_write (ds->sink,
270 case_map_execute (ds->compactor, case_ref (c)));
272 /* Execute temporary transformations. */
273 if (ds->temporary_trns_chain != NULL)
275 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
276 &c, ds->cases_written);
277 if (retval != TRNS_CONTINUE)
285 /* "destroy" function for procedure casereader. */
287 proc_casereader_destroy (struct casereader *reader, void *ds_)
289 struct dataset *ds = ds_;
292 /* Make sure transformations happen for every input case, in
293 case they have side effects, and ensure that the replacement
294 active file gets all the cases it should. */
295 while ((c = casereader_read (reader)) != NULL)
298 ds->proc_state = PROC_CLOSED;
299 ds->ok = casereader_destroy (ds->source) && ds->ok;
301 proc_set_active_file_data (ds, NULL);
304 /* Must return false if the source casereader, a transformation,
305 or the sink casewriter signaled an error. (If a temporary
306 transformation signals an error, then the return value is
307 false, but the replacement active file may still be
310 proc_commit (struct dataset *ds)
312 assert (ds->proc_state == PROC_CLOSED);
313 ds->proc_state = PROC_COMMITTED;
315 dataset_set_unsaved (ds);
317 /* Free memory for lagged cases. */
318 while (!deque_is_empty (&ds->lag))
319 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
320 free (ds->lag_cases);
322 /* Dictionary from before TEMPORARY becomes permanent. */
323 proc_cancel_temporary_transformations (ds);
325 if (!ds->discard_output)
327 /* Finish compacting. */
328 if (ds->compactor != NULL)
330 case_map_destroy (ds->compactor);
331 ds->compactor = NULL;
333 dict_delete_scratch_vars (ds->dict);
334 dict_compact_values (ds->dict);
337 /* Old data sink becomes new data source. */
338 if (ds->sink != NULL)
339 ds->source = casewriter_make_reader (ds->sink);
344 ds->discard_output = false;
348 caseinit_clear (ds->caseinit);
349 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
351 dict_clear_vectors (ds->dict);
352 ds->permanent_dict = NULL;
353 return proc_cancel_all_transformations (ds) && ds->ok;
356 /* Casereader class for procedure execution. */
357 static const struct casereader_class proc_casereader_class =
359 proc_casereader_read,
360 proc_casereader_destroy,
365 /* Updates last_proc_invocation. */
367 update_last_proc_invocation (struct dataset *ds)
369 ds->last_proc_invocation = time (NULL);
372 /* Returns a pointer to the lagged case from N_BEFORE cases before the
373 current one, or NULL if there haven't been that many cases yet. */
375 lagged_case (const struct dataset *ds, int n_before)
377 assert (n_before >= 1);
378 assert (n_before <= ds->n_lag);
380 if (n_before <= deque_count (&ds->lag))
381 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
386 /* Returns the current set of permanent transformations,
387 and clears the permanent transformations.
388 For use by INPUT PROGRAM. */
390 proc_capture_transformations (struct dataset *ds)
392 struct trns_chain *chain;
394 assert (ds->temporary_trns_chain == NULL);
395 chain = ds->permanent_trns_chain;
396 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
398 if ( ds->xform_callback)
399 ds->xform_callback (false, ds->xform_callback_aux);
404 /* Adds a transformation that processes a case with PROC and
405 frees itself with FREE to the current set of transformations.
406 The functions are passed AUX as auxiliary data. */
408 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
410 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
411 if ( ds->xform_callback)
412 ds->xform_callback (true, ds->xform_callback_aux);
415 /* Adds a transformation that processes a case with PROC and
416 frees itself with FREE to the current set of transformations.
417 When parsing of the block of transformations is complete,
418 FINALIZE will be called.
419 The functions are passed AUX as auxiliary data. */
421 add_transformation_with_finalizer (struct dataset *ds,
422 trns_finalize_func *finalize,
423 trns_proc_func *proc,
424 trns_free_func *free, void *aux)
426 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
428 if ( ds->xform_callback)
429 ds->xform_callback (true, ds->xform_callback_aux);
432 /* Returns the index of the next transformation.
433 This value can be returned by a transformation procedure
434 function to indicate a "jump" to that transformation. */
436 next_transformation (const struct dataset *ds)
438 return trns_chain_next (ds->cur_trns_chain);
441 /* Returns true if the next call to add_transformation() will add
442 a temporary transformation, false if it will add a permanent
445 proc_in_temporary_transformations (const struct dataset *ds)
447 return ds->temporary_trns_chain != NULL;
450 /* Marks the start of temporary transformations.
451 Further calls to add_transformation() will add temporary
454 proc_start_temporary_transformations (struct dataset *ds)
456 if (!proc_in_temporary_transformations (ds))
458 add_case_limit_trns (ds);
460 ds->permanent_dict = dict_clone (ds->dict);
462 trns_chain_finalize (ds->permanent_trns_chain);
463 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
465 if ( ds->xform_callback)
466 ds->xform_callback (true, ds->xform_callback_aux);
470 /* Converts all the temporary transformations, if any, to
471 permanent transformations. Further transformations will be
473 Returns true if anything changed, false otherwise. */
475 proc_make_temporary_transformations_permanent (struct dataset *ds)
477 if (proc_in_temporary_transformations (ds))
479 trns_chain_finalize (ds->temporary_trns_chain);
480 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
481 ds->temporary_trns_chain = NULL;
483 dict_destroy (ds->permanent_dict);
484 ds->permanent_dict = NULL;
492 /* Cancels all temporary transformations, if any. Further
493 transformations will be permanent.
494 Returns true if anything changed, false otherwise. */
496 proc_cancel_temporary_transformations (struct dataset *ds)
498 if (proc_in_temporary_transformations (ds))
500 dict_destroy (ds->dict);
501 ds->dict = ds->permanent_dict;
502 ds->permanent_dict = NULL;
504 trns_chain_destroy (ds->temporary_trns_chain);
505 ds->temporary_trns_chain = NULL;
507 if ( ds->xform_callback)
508 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
509 ds->xform_callback_aux);
517 /* Cancels all transformations, if any.
518 Returns true if successful, false on I/O error. */
520 proc_cancel_all_transformations (struct dataset *ds)
523 assert (ds->proc_state == PROC_COMMITTED);
524 ok = trns_chain_destroy (ds->permanent_trns_chain);
525 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
526 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
527 ds->temporary_trns_chain = NULL;
528 if ( ds->xform_callback)
529 ds->xform_callback (false, ds->xform_callback_aux);
536 dict_callback (struct dictionary *d UNUSED, void *ds_)
538 struct dataset *ds = ds_;
539 dataset_set_unsaved (ds);
542 /* Initializes procedure handling. */
544 create_dataset (void)
546 struct dataset *ds = xzalloc (sizeof(*ds));
547 ds->dict = dict_create ();
549 dict_set_change_callback (ds->dict, dict_callback, ds);
551 ds->caseinit = caseinit_create ();
552 proc_cancel_all_transformations (ds);
558 dataset_add_transform_change_callback (struct dataset *ds,
559 transformation_change_callback_func *cb,
562 ds->xform_callback = cb;
563 ds->xform_callback_aux = aux;
566 /* Finishes up procedure handling. */
568 destroy_dataset (struct dataset *ds)
570 proc_discard_active_file (ds);
571 dict_destroy (ds->dict);
572 caseinit_destroy (ds->caseinit);
573 trns_chain_destroy (ds->permanent_trns_chain);
575 if ( ds->xform_callback)
576 ds->xform_callback (false, ds->xform_callback_aux);
580 /* Causes output from the next procedure to be discarded, instead
581 of being preserved for use as input for the next procedure. */
583 proc_discard_output (struct dataset *ds)
585 ds->discard_output = true;
588 /* Discards the active file dictionary, data, and
591 proc_discard_active_file (struct dataset *ds)
593 assert (ds->proc_state == PROC_COMMITTED);
595 dict_clear (ds->dict);
596 fh_set_default_handle (NULL);
600 casereader_destroy (ds->source);
603 proc_cancel_all_transformations (ds);
606 /* Sets SOURCE as the source for procedure input for the next
609 proc_set_active_file (struct dataset *ds,
610 struct casereader *source,
611 struct dictionary *dict)
613 assert (ds->proc_state == PROC_COMMITTED);
614 assert (ds->dict != dict);
616 proc_discard_active_file (ds);
618 dict_destroy (ds->dict);
620 dict_set_change_callback (ds->dict, dict_callback, ds);
622 proc_set_active_file_data (ds, source);
625 /* Replaces the active file's data by READER without replacing
626 the associated dictionary. */
628 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
630 casereader_destroy (ds->source);
633 caseinit_clear (ds->caseinit);
634 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636 return reader == NULL || !casereader_error (reader);
639 /* Returns true if an active file data source is available, false
642 proc_has_active_file (const struct dataset *ds)
644 return ds->source != NULL;
647 /* Returns the active file data source from DS, or a null pointer
648 if DS has no data source, and removes it from DS. */
650 proc_extract_active_file_data (struct dataset *ds)
652 struct casereader *reader = ds->source;
658 /* Checks whether DS has a corrupted active file. If so,
659 discards it and returns false. If not, returns true without
662 dataset_end_of_command (struct dataset *ds)
664 if (ds->source != NULL)
666 if (casereader_error (ds->source))
668 proc_discard_active_file (ds);
673 const struct taint *taint = casereader_get_taint (ds->source);
674 taint_reset_successor_taint ((struct taint *) taint);
675 assert (!taint_has_tainted_successor (taint));
681 static trns_proc_func case_limit_trns_proc;
682 static trns_free_func case_limit_trns_free;
684 /* Adds a transformation that limits the number of cases that may
685 pass through, if DS->DICT has a case limit. */
687 add_case_limit_trns (struct dataset *ds)
689 casenumber case_limit = dict_get_case_limit (ds->dict);
692 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
693 *cases_remaining = case_limit;
694 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
696 dict_set_case_limit (ds->dict, 0);
700 /* Limits the maximum number of cases processed to
703 case_limit_trns_proc (void *cases_remaining_,
704 struct ccase **c UNUSED, casenumber case_nr UNUSED)
706 size_t *cases_remaining = cases_remaining_;
707 if (*cases_remaining > 0)
709 (*cases_remaining)--;
710 return TRNS_CONTINUE;
713 return TRNS_DROP_CASE;
716 /* Frees the data associated with a case limit transformation. */
718 case_limit_trns_free (void *cases_remaining_)
720 size_t *cases_remaining = cases_remaining_;
721 free (cases_remaining);
725 static trns_proc_func filter_trns_proc;
727 /* Adds a temporary transformation to filter data according to
728 the variable specified on FILTER, if any. */
730 add_filter_trns (struct dataset *ds)
732 struct variable *filter_var = dict_get_filter (ds->dict);
733 if (filter_var != NULL)
735 proc_start_temporary_transformations (ds);
736 add_transformation (ds, filter_trns_proc, NULL, filter_var);
740 /* FILTER transformation. */
742 filter_trns_proc (void *filter_var_,
743 struct ccase **c UNUSED, casenumber case_nr UNUSED)
746 struct variable *filter_var = filter_var_;
747 double f = case_num (*c, filter_var);
748 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
749 ? TRNS_CONTINUE : TRNS_DROP_CASE);
754 dataset_dict (const struct dataset *ds)
759 const struct casereader *
760 dataset_source (const struct dataset *ds)
766 dataset_need_lag (struct dataset *ds, int n_before)
768 ds->n_lag = MAX (ds->n_lag, n_before);