1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs whenever the transformation chain(s) have
61 transformation_change_callback_func *xform_callback;
62 void *xform_callback_aux;
64 /* If true, cases are discarded instead of being written to
68 /* The transformation chain that the next transformation will be
70 struct trns_chain *cur_trns_chain;
72 /* The case map used to compact a case, if necessary;
73 otherwise a null pointer. */
74 struct case_map *compactor;
76 /* Time at which proc was last invoked. */
77 time_t last_proc_invocation;
79 /* Cases just before ("lagging") the current one. */
80 int n_lag; /* Number of cases to lag. */
81 struct deque lag; /* Deque of lagged cases. */
82 struct ccase **lag_cases; /* Lagged cases managed by deque. */
87 PROC_COMMITTED, /* No procedure in progress. */
88 PROC_OPEN, /* proc_open called, casereader still open. */
89 PROC_CLOSED /* casereader from proc_open destroyed,
90 but proc_commit not yet called. */
93 casenumber cases_written; /* Cases output so far. */
94 bool ok; /* Error status. */
95 }; /* struct dataset */
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
101 static void update_last_proc_invocation (struct dataset *ds);
103 /* Public functions. */
105 /* Returns the last time the data was read. */
107 time_of_last_procedure (struct dataset *ds)
109 if (ds->last_proc_invocation == 0)
110 update_last_proc_invocation (ds);
111 return ds->last_proc_invocation;
114 /* Regular procedure. */
116 /* Executes any pending transformations, if necessary.
117 This is not identical to the EXECUTE command in that it won't
118 always read the source data. This can be important when the
119 source data is given inline within BEGIN DATA...END FILE. */
121 proc_execute (struct dataset *ds)
125 if ((ds->temporary_trns_chain == NULL
126 || trns_chain_is_empty (ds->temporary_trns_chain))
127 && trns_chain_is_empty (ds->permanent_trns_chain))
130 ds->discard_output = false;
131 dict_set_case_limit (ds->dict, 0);
132 dict_clear_vectors (ds->dict);
136 ok = casereader_destroy (proc_open (ds));
137 return proc_commit (ds) && ok;
140 static const struct casereader_class proc_casereader_class;
142 /* Opens dataset DS for reading cases with proc_read.
143 proc_commit must be called when done. */
145 proc_open (struct dataset *ds)
147 assert (ds->source != NULL);
148 assert (ds->proc_state == PROC_COMMITTED);
150 update_last_proc_invocation (ds);
152 caseinit_mark_for_init (ds->caseinit, ds->dict);
154 /* Finish up the collection of transformations. */
155 add_case_limit_trns (ds);
156 add_filter_trns (ds);
157 trns_chain_finalize (ds->cur_trns_chain);
159 /* Make permanent_dict refer to the dictionary right before
160 data reaches the sink. */
161 if (ds->permanent_dict == NULL)
162 ds->permanent_dict = ds->dict;
165 if (!ds->discard_output)
167 struct dictionary *pd = ds->permanent_dict;
168 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
169 bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
170 ds->compactor = (should_compact
171 ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
173 ds->sink = autopaging_writer_create (compacted_value_cnt);
177 ds->compactor = NULL;
181 /* Allocate memory for lagged cases. */
182 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
184 ds->proc_state = PROC_OPEN;
185 ds->cases_written = 0;
188 /* FIXME: use taint in dataset in place of `ok'? */
189 /* FIXME: for trivial cases we can just return a clone of
191 return casereader_create_sequential (NULL,
192 dict_get_next_value_idx (ds->dict),
194 &proc_casereader_class, ds);
197 /* Returns true if a procedure is in progress, that is, if
198 proc_open has been called but proc_commit has not. */
200 proc_is_open (const struct dataset *ds)
202 return ds->proc_state != PROC_COMMITTED;
205 /* "read" function for procedure casereader. */
206 static struct ccase *
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
209 struct dataset *ds = ds_;
210 enum trns_result retval = TRNS_DROP_CASE;
213 assert (ds->proc_state == PROC_OPEN);
214 for (; ; case_unref (c))
218 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219 if (retval == TRNS_ERROR)
224 /* Read a case from source. */
225 c = casereader_read (ds->source);
228 c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
229 caseinit_init_vars (ds->caseinit, c);
231 /* Execute permanent transformations. */
232 case_nr = ds->cases_written + 1;
233 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
235 caseinit_update_left_vars (ds->caseinit, c);
236 if (retval != TRNS_CONTINUE)
239 /* Write case to collection of lagged cases. */
242 while (deque_count (&ds->lag) >= ds->n_lag)
243 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
244 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
247 /* Write case to replacement active file. */
249 if (ds->sink != NULL)
250 casewriter_write (ds->sink,
251 case_map_execute (ds->compactor, case_ref (c)));
253 /* Execute temporary transformations. */
254 if (ds->temporary_trns_chain != NULL)
256 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
257 &c, ds->cases_written);
258 if (retval != TRNS_CONTINUE)
266 /* "destroy" function for procedure casereader. */
268 proc_casereader_destroy (struct casereader *reader, void *ds_)
270 struct dataset *ds = ds_;
273 /* Make sure transformations happen for every input case, in
274 case they have side effects, and ensure that the replacement
275 active file gets all the cases it should. */
276 while ((c = casereader_read (reader)) != NULL)
279 ds->proc_state = PROC_CLOSED;
280 ds->ok = casereader_destroy (ds->source) && ds->ok;
282 proc_set_active_file_data (ds, NULL);
285 /* Must return false if the source casereader, a transformation,
286 or the sink casewriter signaled an error. (If a temporary
287 transformation signals an error, then the return value is
288 false, but the replacement active file may still be
291 proc_commit (struct dataset *ds)
293 assert (ds->proc_state == PROC_CLOSED);
294 ds->proc_state = PROC_COMMITTED;
296 /* Free memory for lagged cases. */
297 while (!deque_is_empty (&ds->lag))
298 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
299 free (ds->lag_cases);
301 /* Dictionary from before TEMPORARY becomes permanent. */
302 proc_cancel_temporary_transformations (ds);
304 if (!ds->discard_output)
306 /* Finish compacting. */
307 if (ds->compactor != NULL)
309 case_map_destroy (ds->compactor);
310 ds->compactor = NULL;
312 dict_delete_scratch_vars (ds->dict);
313 dict_compact_values (ds->dict);
316 /* Old data sink becomes new data source. */
317 if (ds->sink != NULL)
318 ds->source = casewriter_make_reader (ds->sink);
323 ds->discard_output = false;
327 caseinit_clear (ds->caseinit);
328 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
330 dict_clear_vectors (ds->dict);
331 ds->permanent_dict = NULL;
332 return proc_cancel_all_transformations (ds) && ds->ok;
335 /* Casereader class for procedure execution. */
336 static const struct casereader_class proc_casereader_class =
338 proc_casereader_read,
339 proc_casereader_destroy,
344 /* Updates last_proc_invocation. */
346 update_last_proc_invocation (struct dataset *ds)
348 ds->last_proc_invocation = time (NULL);
351 /* Returns a pointer to the lagged case from N_BEFORE cases before the
352 current one, or NULL if there haven't been that many cases yet. */
354 lagged_case (const struct dataset *ds, int n_before)
356 assert (n_before >= 1);
357 assert (n_before <= ds->n_lag);
359 if (n_before <= deque_count (&ds->lag))
360 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
365 /* Returns the current set of permanent transformations,
366 and clears the permanent transformations.
367 For use by INPUT PROGRAM. */
369 proc_capture_transformations (struct dataset *ds)
371 struct trns_chain *chain;
373 assert (ds->temporary_trns_chain == NULL);
374 chain = ds->permanent_trns_chain;
375 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
377 if ( ds->xform_callback)
378 ds->xform_callback (false, ds->xform_callback_aux);
383 /* Adds a transformation that processes a case with PROC and
384 frees itself with FREE to the current set of transformations.
385 The functions are passed AUX as auxiliary data. */
387 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
389 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
390 if ( ds->xform_callback)
391 ds->xform_callback (true, ds->xform_callback_aux);
394 /* Adds a transformation that processes a case with PROC and
395 frees itself with FREE to the current set of transformations.
396 When parsing of the block of transformations is complete,
397 FINALIZE will be called.
398 The functions are passed AUX as auxiliary data. */
400 add_transformation_with_finalizer (struct dataset *ds,
401 trns_finalize_func *finalize,
402 trns_proc_func *proc,
403 trns_free_func *free, void *aux)
405 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
407 if ( ds->xform_callback)
408 ds->xform_callback (true, ds->xform_callback_aux);
411 /* Returns the index of the next transformation.
412 This value can be returned by a transformation procedure
413 function to indicate a "jump" to that transformation. */
415 next_transformation (const struct dataset *ds)
417 return trns_chain_next (ds->cur_trns_chain);
420 /* Returns true if the next call to add_transformation() will add
421 a temporary transformation, false if it will add a permanent
424 proc_in_temporary_transformations (const struct dataset *ds)
426 return ds->temporary_trns_chain != NULL;
429 /* Marks the start of temporary transformations.
430 Further calls to add_transformation() will add temporary
433 proc_start_temporary_transformations (struct dataset *ds)
435 if (!proc_in_temporary_transformations (ds))
437 add_case_limit_trns (ds);
439 ds->permanent_dict = dict_clone (ds->dict);
441 trns_chain_finalize (ds->permanent_trns_chain);
442 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
444 if ( ds->xform_callback)
445 ds->xform_callback (true, ds->xform_callback_aux);
449 /* Converts all the temporary transformations, if any, to
450 permanent transformations. Further transformations will be
452 Returns true if anything changed, false otherwise. */
454 proc_make_temporary_transformations_permanent (struct dataset *ds)
456 if (proc_in_temporary_transformations (ds))
458 trns_chain_finalize (ds->temporary_trns_chain);
459 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
460 ds->temporary_trns_chain = NULL;
462 dict_destroy (ds->permanent_dict);
463 ds->permanent_dict = NULL;
471 /* Cancels all temporary transformations, if any. Further
472 transformations will be permanent.
473 Returns true if anything changed, false otherwise. */
475 proc_cancel_temporary_transformations (struct dataset *ds)
477 if (proc_in_temporary_transformations (ds))
479 dict_destroy (ds->dict);
480 ds->dict = ds->permanent_dict;
481 ds->permanent_dict = NULL;
483 trns_chain_destroy (ds->temporary_trns_chain);
484 ds->temporary_trns_chain = NULL;
486 if ( ds->xform_callback)
487 ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
488 ds->xform_callback_aux);
496 /* Cancels all transformations, if any.
497 Returns true if successful, false on I/O error. */
499 proc_cancel_all_transformations (struct dataset *ds)
502 assert (ds->proc_state == PROC_COMMITTED);
503 ok = trns_chain_destroy (ds->permanent_trns_chain);
504 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
505 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
506 ds->temporary_trns_chain = NULL;
507 if ( ds->xform_callback)
508 ds->xform_callback (false, ds->xform_callback_aux);
513 /* Initializes procedure handling. */
515 create_dataset (void)
517 struct dataset *ds = xzalloc (sizeof(*ds));
518 ds->dict = dict_create ();
519 ds->caseinit = caseinit_create ();
520 proc_cancel_all_transformations (ds);
526 dataset_add_transform_change_callback (struct dataset *ds,
527 transformation_change_callback_func *cb,
530 ds->xform_callback = cb;
531 ds->xform_callback_aux = aux;
534 /* Finishes up procedure handling. */
536 destroy_dataset (struct dataset *ds)
538 proc_discard_active_file (ds);
539 dict_destroy (ds->dict);
540 caseinit_destroy (ds->caseinit);
541 trns_chain_destroy (ds->permanent_trns_chain);
543 if ( ds->xform_callback)
544 ds->xform_callback (false, ds->xform_callback_aux);
548 /* Causes output from the next procedure to be discarded, instead
549 of being preserved for use as input for the next procedure. */
551 proc_discard_output (struct dataset *ds)
553 ds->discard_output = true;
556 /* Discards the active file dictionary, data, and
559 proc_discard_active_file (struct dataset *ds)
561 assert (ds->proc_state == PROC_COMMITTED);
563 dict_clear (ds->dict);
564 fh_set_default_handle (NULL);
568 casereader_destroy (ds->source);
571 proc_cancel_all_transformations (ds);
574 /* Sets SOURCE as the source for procedure input for the next
577 proc_set_active_file (struct dataset *ds,
578 struct casereader *source,
579 struct dictionary *dict)
581 assert (ds->proc_state == PROC_COMMITTED);
582 assert (ds->dict != dict);
584 proc_discard_active_file (ds);
586 dict_destroy (ds->dict);
589 proc_set_active_file_data (ds, source);
592 /* Replaces the active file's data by READER without replacing
593 the associated dictionary. */
595 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
597 casereader_destroy (ds->source);
600 caseinit_clear (ds->caseinit);
601 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
603 return reader == NULL || !casereader_error (reader);
606 /* Returns true if an active file data source is available, false
609 proc_has_active_file (const struct dataset *ds)
611 return ds->source != NULL;
614 /* Returns the active file data source from DS, or a null pointer
615 if DS has no data source, and removes it from DS. */
617 proc_extract_active_file_data (struct dataset *ds)
619 struct casereader *reader = ds->source;
625 /* Checks whether DS has a corrupted active file. If so,
626 discards it and returns false. If not, returns true without
629 dataset_end_of_command (struct dataset *ds)
631 if (ds->source != NULL)
633 if (casereader_error (ds->source))
635 proc_discard_active_file (ds);
640 const struct taint *taint = casereader_get_taint (ds->source);
641 taint_reset_successor_taint ((struct taint *) taint);
642 assert (!taint_has_tainted_successor (taint));
648 static trns_proc_func case_limit_trns_proc;
649 static trns_free_func case_limit_trns_free;
651 /* Adds a transformation that limits the number of cases that may
652 pass through, if DS->DICT has a case limit. */
654 add_case_limit_trns (struct dataset *ds)
656 casenumber case_limit = dict_get_case_limit (ds->dict);
659 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
660 *cases_remaining = case_limit;
661 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
663 dict_set_case_limit (ds->dict, 0);
667 /* Limits the maximum number of cases processed to
670 case_limit_trns_proc (void *cases_remaining_,
671 struct ccase **c UNUSED, casenumber case_nr UNUSED)
673 size_t *cases_remaining = cases_remaining_;
674 if (*cases_remaining > 0)
676 (*cases_remaining)--;
677 return TRNS_CONTINUE;
680 return TRNS_DROP_CASE;
683 /* Frees the data associated with a case limit transformation. */
685 case_limit_trns_free (void *cases_remaining_)
687 size_t *cases_remaining = cases_remaining_;
688 free (cases_remaining);
692 static trns_proc_func filter_trns_proc;
694 /* Adds a temporary transformation to filter data according to
695 the variable specified on FILTER, if any. */
697 add_filter_trns (struct dataset *ds)
699 struct variable *filter_var = dict_get_filter (ds->dict);
700 if (filter_var != NULL)
702 proc_start_temporary_transformations (ds);
703 add_transformation (ds, filter_trns_proc, NULL, filter_var);
707 /* FILTER transformation. */
709 filter_trns_proc (void *filter_var_,
710 struct ccase **c UNUSED, casenumber case_nr UNUSED)
713 struct variable *filter_var = filter_var_;
714 double f = case_num (*c, filter_var);
715 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
716 ? TRNS_CONTINUE : TRNS_DROP_CASE);
721 dataset_dict (const struct dataset *ds)
726 const struct casereader *
727 dataset_source (const struct dataset *ds)
733 dataset_need_lag (struct dataset *ds, int n_before)
735 ds->n_lag = MAX (ds->n_lag, n_before);