1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case.h>
27 #include <data/caseinit.h>
28 #include <data/casereader.h>
29 #include <data/casereader-provider.h>
30 #include <data/casewriter.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/transformations.h>
35 #include <data/variable.h>
36 #include <libpspp/alloc.h>
37 #include <libpspp/deque.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs when a procedure provides a new source for
61 replace_source_callback *replace_source ;
63 /* Callback which occurs whenever the DICT is replaced by a new one */
64 replace_dictionary_callback *replace_dict;
66 /* If true, cases are discarded instead of being written to
70 /* The transformation chain that the next transformation will be
72 struct trns_chain *cur_trns_chain;
74 /* The compactor used to compact a case, if necessary;
75 otherwise a null pointer. */
76 struct dict_compactor *compactor;
78 /* Time at which proc was last invoked. */
79 time_t last_proc_invocation;
81 /* Cases just before ("lagging") the current one. */
82 int n_lag; /* Number of cases to lag. */
83 struct deque lag; /* Deque of lagged cases. */
84 struct ccase *lag_cases; /* Lagged cases managed by deque. */
89 PROC_COMMITTED, /* No procedure in progress. */
90 PROC_OPEN, /* proc_open called, casereader still open. */
91 PROC_CLOSED /* casereader from proc_open destroyed,
92 but proc_commit not yet called. */
95 size_t cases_written; /* Cases output so far. */
96 bool ok; /* Error status. */
97 }; /* struct dataset */
100 static void add_case_limit_trns (struct dataset *ds);
101 static void add_filter_trns (struct dataset *ds);
103 static void update_last_proc_invocation (struct dataset *ds);
105 /* Public functions. */
107 /* Returns the last time the data was read. */
109 time_of_last_procedure (struct dataset *ds)
111 if (ds->last_proc_invocation == 0)
112 update_last_proc_invocation (ds);
113 return ds->last_proc_invocation;
116 /* Regular procedure. */
118 /* Executes any pending transformations, if necessary.
119 This is not identical to the EXECUTE command in that it won't
120 always read the source data. This can be important when the
121 source data is given inline within BEGIN DATA...END FILE. */
123 proc_execute (struct dataset *ds)
127 if ((ds->temporary_trns_chain == NULL
128 || trns_chain_is_empty (ds->temporary_trns_chain))
129 && trns_chain_is_empty (ds->permanent_trns_chain))
132 ds->discard_output = false;
133 dict_set_case_limit (ds->dict, 0);
134 dict_clear_vectors (ds->dict);
138 ok = casereader_destroy (proc_open (ds));
139 return proc_commit (ds) && ok;
142 static struct casereader_class proc_casereader_class;
144 /* Opens dataset DS for reading cases with proc_read.
145 proc_commit must be called when done. */
147 proc_open (struct dataset *ds)
149 assert (ds->source != NULL);
150 assert (ds->proc_state == PROC_COMMITTED);
152 update_last_proc_invocation (ds);
154 caseinit_mark_for_init (ds->caseinit, ds->dict);
156 /* Finish up the collection of transformations. */
157 add_case_limit_trns (ds);
158 add_filter_trns (ds);
159 trns_chain_finalize (ds->cur_trns_chain);
161 /* Make permanent_dict refer to the dictionary right before
162 data reaches the sink. */
163 if (ds->permanent_dict == NULL)
164 ds->permanent_dict = ds->dict;
167 if (!ds->discard_output)
169 ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
170 ? dict_make_compactor (ds->permanent_dict)
172 ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
173 ds->permanent_dict));
177 ds->compactor = NULL;
181 /* Allocate memory for lagged cases. */
182 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
184 ds->proc_state = PROC_OPEN;
185 ds->cases_written = 0;
188 /* FIXME: use taint in dataset in place of `ok'? */
189 /* FIXME: for trivial cases we can just return a clone of
191 return casereader_create_sequential (NULL,
192 dict_get_next_value_idx (ds->dict),
194 &proc_casereader_class, ds);
197 /* Returns true if a procedure is in progress, that is, if
198 proc_open has been called but proc_commit has not. */
200 proc_is_open (const struct dataset *ds)
202 return ds->proc_state != PROC_COMMITTED;
205 /* "read" function for procedure casereader. */
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
210 struct dataset *ds = ds_;
211 enum trns_result retval = TRNS_DROP_CASE;
213 assert (ds->proc_state == PROC_OPEN);
218 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219 if (retval == TRNS_ERROR)
224 /* Read a case from source. */
225 if (!casereader_read (ds->source, c))
227 case_resize (c, dict_get_next_value_idx (ds->dict));
228 caseinit_init_vars (ds->caseinit, c);
230 /* Execute permanent transformations. */
231 case_nr = ds->cases_written + 1;
232 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
234 caseinit_update_left_vars (ds->caseinit, c);
235 if (retval != TRNS_CONTINUE)
241 /* Write case to collection of lagged cases. */
244 while (deque_count (&ds->lag) >= ds->n_lag)
245 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
246 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
249 /* Write case to replacement active file. */
251 if (ds->sink != NULL)
254 if (ds->compactor != NULL)
256 case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
257 dict_compactor_compact (ds->compactor, &tmp, c);
260 case_clone (&tmp, c);
261 casewriter_write (ds->sink, &tmp);
264 /* Execute temporary transformations. */
265 if (ds->temporary_trns_chain != NULL)
267 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
268 c, &ds->cases_written);
269 if (retval != TRNS_CONTINUE)
280 /* "destroy" function for procedure casereader. */
282 proc_casereader_destroy (struct casereader *reader, void *ds_)
284 struct dataset *ds = ds_;
287 /* Make sure transformations happen for every input case, in
288 case they have side effects, and ensure that the replacement
289 active file gets all the cases it should. */
290 while (casereader_read (reader, &c))
293 ds->proc_state = PROC_CLOSED;
294 ds->ok = casereader_destroy (ds->source) && ds->ok;
296 proc_set_active_file_data (ds, NULL);
299 /* Must return false if the source casereader, a transformation,
300 or the sink casewriter signaled an error. (If a temporary
301 transformation signals an error, then the return value is
302 false, but the replacement active file may still be
305 proc_commit (struct dataset *ds)
307 assert (ds->proc_state == PROC_CLOSED);
308 ds->proc_state = PROC_COMMITTED;
310 /* Free memory for lagged cases. */
311 while (!deque_is_empty (&ds->lag))
312 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
313 free (ds->lag_cases);
315 /* Dictionary from before TEMPORARY becomes permanent. */
316 proc_cancel_temporary_transformations (ds);
318 if (!ds->discard_output)
320 /* Finish compacting. */
321 if (ds->compactor != NULL)
323 dict_compactor_destroy (ds->compactor);
324 dict_compact_values (ds->dict);
325 ds->compactor = NULL;
328 /* Old data sink becomes new data source. */
329 if (ds->sink != NULL)
330 ds->source = casewriter_make_reader (ds->sink);
335 ds->discard_output = false;
338 if ( ds->replace_source) ds->replace_source (ds->source);
340 caseinit_clear (ds->caseinit);
341 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
343 dict_clear_vectors (ds->dict);
344 ds->permanent_dict = NULL;
345 return proc_cancel_all_transformations (ds) && ds->ok;
348 /* Casereader class for procedure execution. */
349 static struct casereader_class proc_casereader_class =
351 proc_casereader_read,
352 proc_casereader_destroy,
357 /* Updates last_proc_invocation. */
359 update_last_proc_invocation (struct dataset *ds)
361 ds->last_proc_invocation = time (NULL);
364 /* Returns a pointer to the lagged case from N_BEFORE cases before the
365 current one, or NULL if there haven't been that many cases yet. */
367 lagged_case (const struct dataset *ds, int n_before)
369 assert (n_before >= 1);
370 assert (n_before <= ds->n_lag);
372 if (n_before <= deque_count (&ds->lag))
373 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
378 /* Returns the current set of permanent transformations,
379 and clears the permanent transformations.
380 For use by INPUT PROGRAM. */
382 proc_capture_transformations (struct dataset *ds)
384 struct trns_chain *chain;
386 assert (ds->temporary_trns_chain == NULL);
387 chain = ds->permanent_trns_chain;
388 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
392 /* Adds a transformation that processes a case with PROC and
393 frees itself with FREE to the current set of transformations.
394 The functions are passed AUX as auxiliary data. */
396 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
398 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
401 /* Adds a transformation that processes a case with PROC and
402 frees itself with FREE to the current set of transformations.
403 When parsing of the block of transformations is complete,
404 FINALIZE will be called.
405 The functions are passed AUX as auxiliary data. */
407 add_transformation_with_finalizer (struct dataset *ds,
408 trns_finalize_func *finalize,
409 trns_proc_func *proc,
410 trns_free_func *free, void *aux)
412 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
415 /* Returns the index of the next transformation.
416 This value can be returned by a transformation procedure
417 function to indicate a "jump" to that transformation. */
419 next_transformation (const struct dataset *ds)
421 return trns_chain_next (ds->cur_trns_chain);
424 /* Returns true if the next call to add_transformation() will add
425 a temporary transformation, false if it will add a permanent
428 proc_in_temporary_transformations (const struct dataset *ds)
430 return ds->temporary_trns_chain != NULL;
433 /* Marks the start of temporary transformations.
434 Further calls to add_transformation() will add temporary
437 proc_start_temporary_transformations (struct dataset *ds)
439 if (!proc_in_temporary_transformations (ds))
441 add_case_limit_trns (ds);
443 ds->permanent_dict = dict_clone (ds->dict);
445 trns_chain_finalize (ds->permanent_trns_chain);
446 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
450 /* Converts all the temporary transformations, if any, to
451 permanent transformations. Further transformations will be
453 Returns true if anything changed, false otherwise. */
455 proc_make_temporary_transformations_permanent (struct dataset *ds)
457 if (proc_in_temporary_transformations (ds))
459 trns_chain_finalize (ds->temporary_trns_chain);
460 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
461 ds->temporary_trns_chain = NULL;
463 dict_destroy (ds->permanent_dict);
464 ds->permanent_dict = NULL;
472 /* Cancels all temporary transformations, if any. Further
473 transformations will be permanent.
474 Returns true if anything changed, false otherwise. */
476 proc_cancel_temporary_transformations (struct dataset *ds)
478 if (proc_in_temporary_transformations (ds))
480 dict_destroy (ds->dict);
481 ds->dict = ds->permanent_dict;
482 ds->permanent_dict = NULL;
483 if (ds->replace_dict) ds->replace_dict (ds->dict);
485 trns_chain_destroy (ds->temporary_trns_chain);
486 ds->temporary_trns_chain = NULL;
494 /* Cancels all transformations, if any.
495 Returns true if successful, false on I/O error. */
497 proc_cancel_all_transformations (struct dataset *ds)
500 assert (ds->proc_state == PROC_COMMITTED);
501 ok = trns_chain_destroy (ds->permanent_trns_chain);
502 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
503 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
504 ds->temporary_trns_chain = NULL;
508 /* Initializes procedure handling. */
510 create_dataset (replace_source_callback *rps,
511 replace_dictionary_callback *rds)
513 struct dataset *ds = xzalloc (sizeof(*ds));
514 ds->dict = dict_create ();
515 ds->caseinit = caseinit_create ();
516 ds->replace_source = rps;
517 ds->replace_dict = rds;
518 proc_cancel_all_transformations (ds);
522 /* Finishes up procedure handling. */
524 destroy_dataset (struct dataset *ds)
526 proc_discard_active_file (ds);
527 dict_destroy (ds->dict);
528 caseinit_destroy (ds->caseinit);
529 trns_chain_destroy (ds->permanent_trns_chain);
533 /* Causes output from the next procedure to be discarded, instead
534 of being preserved for use as input for the next procedure. */
536 proc_discard_output (struct dataset *ds)
538 ds->discard_output = true;
541 /* Discards the active file dictionary, data, and
544 proc_discard_active_file (struct dataset *ds)
546 assert (ds->proc_state == PROC_COMMITTED);
548 dict_clear (ds->dict);
549 fh_set_default_handle (NULL);
553 casereader_destroy (ds->source);
555 if ( ds->replace_source) ds->replace_source (NULL);
557 proc_cancel_all_transformations (ds);
560 /* Sets SOURCE as the source for procedure input for the next
563 proc_set_active_file (struct dataset *ds,
564 struct casereader *source,
565 struct dictionary *dict)
567 assert (ds->proc_state == PROC_COMMITTED);
568 assert (ds->dict != dict);
570 proc_discard_active_file (ds);
572 dict_destroy (ds->dict);
574 if ( ds->replace_dict) ds->replace_dict (dict);
576 proc_set_active_file_data (ds, source);
579 /* Replaces the active file's data by READER without replacing
580 the associated dictionary. */
582 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
584 casereader_destroy (ds->source);
586 if (ds->replace_source) ds->replace_source (reader);
588 caseinit_clear (ds->caseinit);
589 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
591 return reader == NULL || !casereader_error (reader);
594 /* Returns true if an active file data source is available, false
597 proc_has_active_file (const struct dataset *ds)
599 return ds->source != NULL;
602 /* Checks whether DS has a corrupted active file. If so,
603 discards it and returns false. If not, returns true without
606 dataset_end_of_command (struct dataset *ds)
608 if (ds->source != NULL)
610 if (casereader_error (ds->source))
612 proc_discard_active_file (ds);
617 const struct taint *taint = casereader_get_taint (ds->source);
618 taint_reset_successor_taint ((struct taint *) taint);
619 assert (!taint_has_tainted_successor (taint));
625 static trns_proc_func case_limit_trns_proc;
626 static trns_free_func case_limit_trns_free;
628 /* Adds a transformation that limits the number of cases that may
629 pass through, if DS->DICT has a case limit. */
631 add_case_limit_trns (struct dataset *ds)
633 size_t case_limit = dict_get_case_limit (ds->dict);
636 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
637 *cases_remaining = case_limit;
638 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
640 dict_set_case_limit (ds->dict, 0);
644 /* Limits the maximum number of cases processed to
647 case_limit_trns_proc (void *cases_remaining_,
648 struct ccase *c UNUSED, casenumber case_nr UNUSED)
650 size_t *cases_remaining = cases_remaining_;
651 if (*cases_remaining > 0)
653 (*cases_remaining)--;
654 return TRNS_CONTINUE;
657 return TRNS_DROP_CASE;
660 /* Frees the data associated with a case limit transformation. */
662 case_limit_trns_free (void *cases_remaining_)
664 size_t *cases_remaining = cases_remaining_;
665 free (cases_remaining);
669 static trns_proc_func filter_trns_proc;
671 /* Adds a temporary transformation to filter data according to
672 the variable specified on FILTER, if any. */
674 add_filter_trns (struct dataset *ds)
676 struct variable *filter_var = dict_get_filter (ds->dict);
677 if (filter_var != NULL)
679 proc_start_temporary_transformations (ds);
680 add_transformation (ds, filter_trns_proc, NULL, filter_var);
684 /* FILTER transformation. */
686 filter_trns_proc (void *filter_var_,
687 struct ccase *c UNUSED, casenumber case_nr UNUSED)
690 struct variable *filter_var = filter_var_;
691 double f = case_num (c, filter_var);
692 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
693 ? TRNS_CONTINUE : TRNS_DROP_CASE);
698 dataset_dict (const struct dataset *ds)
703 const struct casereader *
704 dataset_source (const struct dataset *ds)
710 dataset_need_lag (struct dataset *ds, int n_before)
712 ds->n_lag = MAX (ds->n_lag, n_before);