1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
26 #include <data/case.h>
27 #include <data/caseinit.h>
28 #include <data/casereader.h>
29 #include <data/casereader-provider.h>
30 #include <data/casewriter.h>
31 #include <data/dictionary.h>
32 #include <data/file-handle-def.h>
33 #include <data/procedure.h>
34 #include <data/transformations.h>
35 #include <data/variable.h>
36 #include <libpspp/alloc.h>
37 #include <libpspp/deque.h>
38 #include <libpspp/misc.h>
39 #include <libpspp/str.h>
40 #include <libpspp/taint.h>
43 /* Cases are read from source,
44 their transformation variables are initialized,
45 pass through permanent_trns_chain (which transforms them into
46 the format described by permanent_dict),
48 pass through temporary_trns_chain (which transforms them into
49 the format described by dict),
50 and are finally passed to the procedure. */
51 struct casereader *source;
52 struct caseinit *caseinit;
53 struct trns_chain *permanent_trns_chain;
54 struct dictionary *permanent_dict;
55 struct casewriter *sink;
56 struct trns_chain *temporary_trns_chain;
57 struct dictionary *dict;
59 /* Callback which occurs when a procedure provides a new source for
61 replace_source_callback *replace_source ;
63 /* Callback which occurs whenever the DICT is replaced by a new one */
64 replace_dictionary_callback *replace_dict;
66 /* If true, cases are discarded instead of being written to
70 /* The transformation chain that the next transformation will be
72 struct trns_chain *cur_trns_chain;
74 /* The compactor used to compact a case, if necessary;
75 otherwise a null pointer. */
76 struct dict_compactor *compactor;
78 /* Time at which proc was last invoked. */
79 time_t last_proc_invocation;
81 /* Cases just before ("lagging") the current one. */
82 int n_lag; /* Number of cases to lag. */
83 struct deque lag; /* Deque of lagged cases. */
84 struct ccase *lag_cases; /* Lagged cases managed by deque. */
94 size_t cases_written; /* Cases output so far. */
95 bool ok; /* Error status. */
96 }; /* struct dataset */
99 static void add_case_limit_trns (struct dataset *ds);
100 static void add_filter_trns (struct dataset *ds);
102 static void update_last_proc_invocation (struct dataset *ds);
104 /* Public functions. */
106 /* Returns the last time the data was read. */
108 time_of_last_procedure (struct dataset *ds)
110 if (ds->last_proc_invocation == 0)
111 update_last_proc_invocation (ds);
112 return ds->last_proc_invocation;
115 /* Regular procedure. */
117 /* Executes any pending transformations, if necessary.
118 This is not identical to the EXECUTE command in that it won't
119 always read the source data. This can be important when the
120 source data is given inline within BEGIN DATA...END FILE. */
122 proc_execute (struct dataset *ds)
126 if ((ds->temporary_trns_chain == NULL
127 || trns_chain_is_empty (ds->temporary_trns_chain))
128 && trns_chain_is_empty (ds->permanent_trns_chain))
131 ds->discard_output = false;
132 dict_set_case_limit (ds->dict, 0);
133 dict_clear_vectors (ds->dict);
137 ok = casereader_destroy (proc_open (ds));
138 return proc_commit (ds) && ok;
141 static struct casereader_class proc_casereader_class;
143 /* Opens dataset DS for reading cases with proc_read.
144 proc_commit must be called when done. */
146 proc_open (struct dataset *ds)
148 assert (ds->source != NULL);
149 assert (ds->proc_state == PROC_COMMITTED);
151 update_last_proc_invocation (ds);
153 caseinit_mark_for_init (ds->caseinit, ds->dict);
155 /* Finish up the collection of transformations. */
156 add_case_limit_trns (ds);
157 add_filter_trns (ds);
158 trns_chain_finalize (ds->cur_trns_chain);
160 /* Make permanent_dict refer to the dictionary right before
161 data reaches the sink. */
162 if (ds->permanent_dict == NULL)
163 ds->permanent_dict = ds->dict;
166 if (!ds->discard_output)
168 ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
169 ? dict_make_compactor (ds->permanent_dict)
171 ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
172 ds->permanent_dict));
176 ds->compactor = NULL;
180 /* Allocate memory for lagged cases. */
181 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
183 ds->proc_state = PROC_OPEN;
184 ds->cases_written = 0;
187 /* FIXME: use taint in dataset in place of `ok'? */
188 /* FIXME: for trivial cases we can just return a clone of
190 return casereader_create_sequential (NULL,
191 dict_get_next_value_idx (ds->dict),
193 &proc_casereader_class, ds);
197 proc_is_open (const struct dataset *ds)
199 return ds->proc_state != PROC_COMMITTED;
202 /* Reads the next case from dataset DS, which must have been
203 opened for reading with proc_open.
204 Returns true if successful, in which case a pointer to the
205 case is stored in *C.
206 Return false at end of file or if a read error occurs. In
207 this case a null pointer is stored in *C. */
209 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
212 struct dataset *ds = ds_;
213 enum trns_result retval = TRNS_DROP_CASE;
215 assert (ds->proc_state == PROC_OPEN);
220 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
221 if (retval == TRNS_ERROR)
226 /* Read a case from source. */
227 if (!casereader_read (ds->source, c))
229 case_resize (c, dict_get_next_value_idx (ds->dict));
230 caseinit_init_reinit_vars (ds->caseinit, c);
231 caseinit_init_left_vars (ds->caseinit, c);
233 /* Execute permanent transformations. */
234 case_nr = ds->cases_written + 1;
235 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
237 caseinit_update_left_vars (ds->caseinit, c);
238 if (retval != TRNS_CONTINUE)
244 /* Write case to collection of lagged cases. */
247 while (deque_count (&ds->lag) >= ds->n_lag)
248 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
249 case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
252 /* Write case to replacement active file. */
254 if (ds->sink != NULL)
257 if (ds->compactor != NULL)
259 case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
260 dict_compactor_compact (ds->compactor, &tmp, c);
263 case_clone (&tmp, c);
264 casewriter_write (ds->sink, &tmp);
267 /* Execute temporary transformations. */
268 if (ds->temporary_trns_chain != NULL)
270 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
271 c, &ds->cases_written);
272 if (retval != TRNS_CONTINUE)
283 /* Closes dataset DS for reading.
284 Returns true if successful, false if an I/O error occurred
285 while reading or closing the data set.
286 If DS has not been opened, returns true without doing
289 proc_casereader_destroy (struct casereader *reader, void *ds_)
291 struct dataset *ds = ds_;
294 /* Make sure transformations happen for every input case, in
295 case they have side effects, and ensure that the replacement
296 active file gets all the cases it should. */
297 while (casereader_read (reader, &c))
300 ds->proc_state = PROC_CLOSED;
301 ds->ok = casereader_destroy (ds->source) && ds->ok;
303 proc_set_active_file_data (ds, NULL);
306 /* Must return false if the source casereader, a transformation,
307 or the sink casewriter signaled an error. (If a temporary
308 transformation signals an error, then the return value is
309 false, but the replacement active file may still be
312 proc_commit (struct dataset *ds)
314 assert (ds->proc_state == PROC_CLOSED);
315 ds->proc_state = PROC_COMMITTED;
317 /* Free memory for lagged cases. */
318 while (!deque_is_empty (&ds->lag))
319 case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
320 free (ds->lag_cases);
322 /* Dictionary from before TEMPORARY becomes permanent. */
323 proc_cancel_temporary_transformations (ds);
325 if (!ds->discard_output)
327 /* Finish compacting. */
328 if (ds->compactor != NULL)
330 dict_compactor_destroy (ds->compactor);
331 dict_compact_values (ds->dict);
332 ds->compactor = NULL;
335 /* Old data sink becomes new data source. */
336 if (ds->sink != NULL)
337 ds->source = casewriter_make_reader (ds->sink);
342 ds->discard_output = false;
345 if ( ds->replace_source) ds->replace_source (ds->source);
347 caseinit_clear (ds->caseinit);
348 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
350 dict_clear_vectors (ds->dict);
351 ds->permanent_dict = NULL;
352 return proc_cancel_all_transformations (ds) && ds->ok;
355 static struct casereader_class proc_casereader_class =
357 proc_casereader_read,
358 proc_casereader_destroy,
363 /* Updates last_proc_invocation. */
365 update_last_proc_invocation (struct dataset *ds)
367 ds->last_proc_invocation = time (NULL);
370 /* Returns a pointer to the lagged case from N_BEFORE cases before the
371 current one, or NULL if there haven't been that many cases yet. */
373 lagged_case (const struct dataset *ds, int n_before)
375 assert (n_before >= 1);
376 assert (n_before <= ds->n_lag);
378 if (n_before <= deque_count (&ds->lag))
379 return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
384 /* Returns the current set of permanent transformations,
385 and clears the permanent transformations.
386 For use by INPUT PROGRAM. */
388 proc_capture_transformations (struct dataset *ds)
390 struct trns_chain *chain;
392 assert (ds->temporary_trns_chain == NULL);
393 chain = ds->permanent_trns_chain;
394 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
398 /* Adds a transformation that processes a case with PROC and
399 frees itself with FREE to the current set of transformations.
400 The functions are passed AUX as auxiliary data. */
402 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
404 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
407 /* Adds a transformation that processes a case with PROC and
408 frees itself with FREE to the current set of transformations.
409 When parsing of the block of transformations is complete,
410 FINALIZE will be called.
411 The functions are passed AUX as auxiliary data. */
413 add_transformation_with_finalizer (struct dataset *ds,
414 trns_finalize_func *finalize,
415 trns_proc_func *proc,
416 trns_free_func *free, void *aux)
418 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
421 /* Returns the index of the next transformation.
422 This value can be returned by a transformation procedure
423 function to indicate a "jump" to that transformation. */
425 next_transformation (const struct dataset *ds)
427 return trns_chain_next (ds->cur_trns_chain);
430 /* Returns true if the next call to add_transformation() will add
431 a temporary transformation, false if it will add a permanent
434 proc_in_temporary_transformations (const struct dataset *ds)
436 return ds->temporary_trns_chain != NULL;
439 /* Marks the start of temporary transformations.
440 Further calls to add_transformation() will add temporary
443 proc_start_temporary_transformations (struct dataset *ds)
445 if (!proc_in_temporary_transformations (ds))
447 add_case_limit_trns (ds);
449 ds->permanent_dict = dict_clone (ds->dict);
451 trns_chain_finalize (ds->permanent_trns_chain);
452 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
456 /* Converts all the temporary transformations, if any, to
457 permanent transformations. Further transformations will be
459 Returns true if anything changed, false otherwise. */
461 proc_make_temporary_transformations_permanent (struct dataset *ds)
463 if (proc_in_temporary_transformations (ds))
465 trns_chain_finalize (ds->temporary_trns_chain);
466 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
467 ds->temporary_trns_chain = NULL;
469 dict_destroy (ds->permanent_dict);
470 ds->permanent_dict = NULL;
478 /* Cancels all temporary transformations, if any. Further
479 transformations will be permanent.
480 Returns true if anything changed, false otherwise. */
482 proc_cancel_temporary_transformations (struct dataset *ds)
484 if (proc_in_temporary_transformations (ds))
486 dict_destroy (ds->dict);
487 ds->dict = ds->permanent_dict;
488 ds->permanent_dict = NULL;
489 if (ds->replace_dict) ds->replace_dict (ds->dict);
491 trns_chain_destroy (ds->temporary_trns_chain);
492 ds->temporary_trns_chain = NULL;
500 /* Cancels all transformations, if any.
501 Returns true if successful, false on I/O error. */
503 proc_cancel_all_transformations (struct dataset *ds)
506 assert (ds->proc_state == PROC_COMMITTED);
507 ok = trns_chain_destroy (ds->permanent_trns_chain);
508 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
509 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
510 ds->temporary_trns_chain = NULL;
514 /* Initializes procedure handling. */
516 create_dataset (replace_source_callback *rps,
517 replace_dictionary_callback *rds)
519 struct dataset *ds = xzalloc (sizeof(*ds));
520 ds->dict = dict_create ();
521 ds->caseinit = caseinit_create ();
522 ds->replace_source = rps;
523 ds->replace_dict = rds;
524 proc_cancel_all_transformations (ds);
528 /* Finishes up procedure handling. */
530 destroy_dataset (struct dataset *ds)
532 proc_discard_active_file (ds);
533 dict_destroy (ds->dict);
534 caseinit_destroy (ds->caseinit);
535 trns_chain_destroy (ds->permanent_trns_chain);
539 /* Causes output from the next procedure to be discarded, instead
540 of being preserved for use as input for the next procedure. */
542 proc_discard_output (struct dataset *ds)
544 ds->discard_output = true;
547 /* Discards the active file dictionary, data, and
550 proc_discard_active_file (struct dataset *ds)
552 assert (ds->proc_state == PROC_COMMITTED);
554 dict_clear (ds->dict);
555 fh_set_default_handle (NULL);
559 casereader_destroy (ds->source);
561 if ( ds->replace_source) ds->replace_source (NULL);
563 proc_cancel_all_transformations (ds);
566 /* Sets SOURCE as the source for procedure input for the next
569 proc_set_active_file (struct dataset *ds,
570 struct casereader *source,
571 struct dictionary *dict)
573 assert (ds->proc_state == PROC_COMMITTED);
574 assert (ds->dict != dict);
576 proc_discard_active_file (ds);
578 dict_destroy (ds->dict);
580 if ( ds->replace_dict) ds->replace_dict (dict);
582 proc_set_active_file_data (ds, source);
585 /* Replaces the active file's data by READER without replacing
586 the associated dictionary. */
588 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
590 casereader_destroy (ds->source);
592 if (ds->replace_source) ds->replace_source (reader);
594 caseinit_clear (ds->caseinit);
595 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
597 return reader == NULL || !casereader_error (reader);
600 /* Returns true if an active file data source is available, false
603 proc_has_active_file (const struct dataset *ds)
605 return ds->source != NULL;
608 /* Checks whether DS has a corrupted active file. If so,
609 discards it and returns false. If not, returns true without
612 dataset_end_of_command (struct dataset *ds)
614 if (ds->source != NULL)
616 if (casereader_error (ds->source))
618 proc_discard_active_file (ds);
623 const struct taint *taint = casereader_get_taint (ds->source);
624 taint_reset_successor_taint ((struct taint *) taint);
625 assert (!taint_has_tainted_successor (taint));
631 static trns_proc_func case_limit_trns_proc;
632 static trns_free_func case_limit_trns_free;
634 /* Adds a transformation that limits the number of cases that may
635 pass through, if DS->DICT has a case limit. */
637 add_case_limit_trns (struct dataset *ds)
639 size_t case_limit = dict_get_case_limit (ds->dict);
642 size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
643 *cases_remaining = case_limit;
644 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
646 dict_set_case_limit (ds->dict, 0);
650 /* Limits the maximum number of cases processed to
653 case_limit_trns_proc (void *cases_remaining_,
654 struct ccase *c UNUSED, casenumber case_nr UNUSED)
656 size_t *cases_remaining = cases_remaining_;
657 if (*cases_remaining > 0)
659 (*cases_remaining)--;
660 return TRNS_CONTINUE;
663 return TRNS_DROP_CASE;
666 /* Frees the data associated with a case limit transformation. */
668 case_limit_trns_free (void *cases_remaining_)
670 size_t *cases_remaining = cases_remaining_;
671 free (cases_remaining);
675 static trns_proc_func filter_trns_proc;
677 /* Adds a temporary transformation to filter data according to
678 the variable specified on FILTER, if any. */
680 add_filter_trns (struct dataset *ds)
682 struct variable *filter_var = dict_get_filter (ds->dict);
683 if (filter_var != NULL)
685 proc_start_temporary_transformations (ds);
686 add_transformation (ds, filter_trns_proc, NULL, filter_var);
690 /* FILTER transformation. */
692 filter_trns_proc (void *filter_var_,
693 struct ccase *c UNUSED, casenumber case_nr UNUSED)
696 struct variable *filter_var = filter_var_;
697 double f = case_num (c, filter_var);
698 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
699 ? TRNS_CONTINUE : TRNS_DROP_CASE);
704 dataset_dict (const struct dataset *ds)
710 dataset_need_lag (struct dataset *ds, int n_before)
712 ds->n_lag = MAX (ds->n_lag, n_before);