Make cases simpler, faster, and easier to understand.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40 #include "xalloc.h"
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs whenever the transformation chain(s) have
60      been modified */
61   transformation_change_callback_func *xform_callback;
62   void *xform_callback_aux;
63
64   /* If true, cases are discarded instead of being written to
65      sink. */
66   bool discard_output;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The case map used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct case_map *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct deque lag;             /* Deque of lagged cases. */
82   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
83
84   /* Procedure data. */
85   enum
86     {
87       PROC_COMMITTED,           /* No procedure in progress. */
88       PROC_OPEN,                /* proc_open called, casereader still open. */
89       PROC_CLOSED               /* casereader from proc_open destroyed,
90                                    but proc_commit not yet called. */
91     }
92   proc_state;
93   casenumber cases_written;       /* Cases output so far. */
94   bool ok;                    /* Error status. */
95 }; /* struct dataset */
96
97
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
100
101 static void update_last_proc_invocation (struct dataset *ds);
102 \f
103 /* Public functions. */
104
105 /* Returns the last time the data was read. */
106 time_t
107 time_of_last_procedure (struct dataset *ds)
108 {
109   if (ds->last_proc_invocation == 0)
110     update_last_proc_invocation (ds);
111   return ds->last_proc_invocation;
112 }
113 \f
114 /* Regular procedure. */
115
116 /* Executes any pending transformations, if necessary.
117    This is not identical to the EXECUTE command in that it won't
118    always read the source data.  This can be important when the
119    source data is given inline within BEGIN DATA...END FILE. */
120 bool
121 proc_execute (struct dataset *ds)
122 {
123   bool ok;
124
125   if ((ds->temporary_trns_chain == NULL
126        || trns_chain_is_empty (ds->temporary_trns_chain))
127       && trns_chain_is_empty (ds->permanent_trns_chain))
128     {
129       ds->n_lag = 0;
130       ds->discard_output = false;
131       dict_set_case_limit (ds->dict, 0);
132       dict_clear_vectors (ds->dict);
133       return true;
134     }
135
136   ok = casereader_destroy (proc_open (ds));
137   return proc_commit (ds) && ok;
138 }
139
140 static const struct casereader_class proc_casereader_class;
141
142 /* Opens dataset DS for reading cases with proc_read.
143    proc_commit must be called when done. */
144 struct casereader *
145 proc_open (struct dataset *ds)
146 {
147   assert (ds->source != NULL);
148   assert (ds->proc_state == PROC_COMMITTED);
149
150   update_last_proc_invocation (ds);
151
152   caseinit_mark_for_init (ds->caseinit, ds->dict);
153
154   /* Finish up the collection of transformations. */
155   add_case_limit_trns (ds);
156   add_filter_trns (ds);
157   trns_chain_finalize (ds->cur_trns_chain);
158
159   /* Make permanent_dict refer to the dictionary right before
160      data reaches the sink. */
161   if (ds->permanent_dict == NULL)
162     ds->permanent_dict = ds->dict;
163
164   /* Prepare sink. */
165   if (!ds->discard_output)
166     {
167       struct dictionary *pd = ds->permanent_dict;
168       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
169       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
170       ds->compactor = (should_compact
171                        ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
172                        : NULL);
173       ds->sink = autopaging_writer_create (compacted_value_cnt);
174     }
175   else
176     {
177       ds->compactor = NULL;
178       ds->sink = NULL;
179     }
180
181   /* Allocate memory for lagged cases. */
182   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
183
184   ds->proc_state = PROC_OPEN;
185   ds->cases_written = 0;
186   ds->ok = true;
187
188   /* FIXME: use taint in dataset in place of `ok'? */
189   /* FIXME: for trivial cases we can just return a clone of
190      ds->source? */
191   return casereader_create_sequential (NULL,
192                                        dict_get_next_value_idx (ds->dict),
193                                        CASENUMBER_MAX,
194                                        &proc_casereader_class, ds);
195 }
196
197 /* Returns true if a procedure is in progress, that is, if
198    proc_open has been called but proc_commit has not. */
199 bool
200 proc_is_open (const struct dataset *ds)
201 {
202   return ds->proc_state != PROC_COMMITTED;
203 }
204
205 /* "read" function for procedure casereader. */
206 static struct ccase *
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
208 {
209   struct dataset *ds = ds_;
210   enum trns_result retval = TRNS_DROP_CASE;
211   struct ccase *c;
212
213   assert (ds->proc_state == PROC_OPEN);
214   for (; ; case_unref (c))
215     {
216       casenumber case_nr;
217
218       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219       if (retval == TRNS_ERROR)
220         ds->ok = false;
221       if (!ds->ok)
222         return NULL;
223
224       /* Read a case from source. */
225       c = casereader_read (ds->source);
226       if (c == NULL)
227         return NULL;
228       c = case_unshare_and_resize (c, dict_get_next_value_idx (ds->dict));
229       caseinit_init_vars (ds->caseinit, c);
230
231       /* Execute permanent transformations.  */
232       case_nr = ds->cases_written + 1;
233       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
234                                    &c, case_nr);
235       caseinit_update_left_vars (ds->caseinit, c);
236       if (retval != TRNS_CONTINUE)
237         continue;
238
239       /* Write case to collection of lagged cases. */
240       if (ds->n_lag > 0)
241         {
242           while (deque_count (&ds->lag) >= ds->n_lag)
243             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
244           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
245         }
246
247       /* Write case to replacement active file. */
248       ds->cases_written++;
249       if (ds->sink != NULL)
250         casewriter_write (ds->sink,
251                           case_map_execute (ds->compactor, case_ref (c)));
252
253       /* Execute temporary transformations. */
254       if (ds->temporary_trns_chain != NULL)
255         {
256           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
257                                        &c, ds->cases_written);
258           if (retval != TRNS_CONTINUE)
259             continue;
260         }
261
262       return c;
263     }
264 }
265
266 /* "destroy" function for procedure casereader. */
267 static void
268 proc_casereader_destroy (struct casereader *reader, void *ds_)
269 {
270   struct dataset *ds = ds_;
271   struct ccase *c;
272
273   /* Make sure transformations happen for every input case, in
274      case they have side effects, and ensure that the replacement
275      active file gets all the cases it should. */
276   while ((c = casereader_read (reader)) != NULL)
277     case_unref (c);
278
279   ds->proc_state = PROC_CLOSED;
280   ds->ok = casereader_destroy (ds->source) && ds->ok;
281   ds->source = NULL;
282   proc_set_active_file_data (ds, NULL);
283 }
284
285 /* Must return false if the source casereader, a transformation,
286    or the sink casewriter signaled an error.  (If a temporary
287    transformation signals an error, then the return value is
288    false, but the replacement active file may still be
289    untainted.) */
290 bool
291 proc_commit (struct dataset *ds)
292 {
293   assert (ds->proc_state == PROC_CLOSED);
294   ds->proc_state = PROC_COMMITTED;
295
296   /* Free memory for lagged cases. */
297   while (!deque_is_empty (&ds->lag))
298     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
299   free (ds->lag_cases);
300
301   /* Dictionary from before TEMPORARY becomes permanent. */
302   proc_cancel_temporary_transformations (ds);
303
304   if (!ds->discard_output)
305     {
306       /* Finish compacting. */
307       if (ds->compactor != NULL)
308         {
309           case_map_destroy (ds->compactor);
310           ds->compactor = NULL;
311
312           dict_delete_scratch_vars (ds->dict);
313           dict_compact_values (ds->dict);
314         }
315
316       /* Old data sink becomes new data source. */
317       if (ds->sink != NULL)
318         ds->source = casewriter_make_reader (ds->sink);
319     }
320   else
321     {
322       ds->source = NULL;
323       ds->discard_output = false;
324     }
325   ds->sink = NULL;
326
327   caseinit_clear (ds->caseinit);
328   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
329
330   dict_clear_vectors (ds->dict);
331   ds->permanent_dict = NULL;
332   return proc_cancel_all_transformations (ds) && ds->ok;
333 }
334
335 /* Casereader class for procedure execution. */
336 static const struct casereader_class proc_casereader_class =
337   {
338     proc_casereader_read,
339     proc_casereader_destroy,
340     NULL,
341     NULL,
342   };
343
344 /* Updates last_proc_invocation. */
345 static void
346 update_last_proc_invocation (struct dataset *ds)
347 {
348   ds->last_proc_invocation = time (NULL);
349 }
350 \f
351 /* Returns a pointer to the lagged case from N_BEFORE cases before the
352    current one, or NULL if there haven't been that many cases yet. */
353 const struct ccase *
354 lagged_case (const struct dataset *ds, int n_before)
355 {
356   assert (n_before >= 1);
357   assert (n_before <= ds->n_lag);
358
359   if (n_before <= deque_count (&ds->lag))
360     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
361   else
362     return NULL;
363 }
364 \f
365 /* Returns the current set of permanent transformations,
366    and clears the permanent transformations.
367    For use by INPUT PROGRAM. */
368 struct trns_chain *
369 proc_capture_transformations (struct dataset *ds)
370 {
371   struct trns_chain *chain;
372
373   assert (ds->temporary_trns_chain == NULL);
374   chain = ds->permanent_trns_chain;
375   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
376
377   if ( ds->xform_callback)
378     ds->xform_callback (false, ds->xform_callback_aux);
379
380   return chain;
381 }
382
383 /* Adds a transformation that processes a case with PROC and
384    frees itself with FREE to the current set of transformations.
385    The functions are passed AUX as auxiliary data. */
386 void
387 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
388 {
389   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
390   if ( ds->xform_callback)
391     ds->xform_callback (true, ds->xform_callback_aux);
392 }
393
394 /* Adds a transformation that processes a case with PROC and
395    frees itself with FREE to the current set of transformations.
396    When parsing of the block of transformations is complete,
397    FINALIZE will be called.
398    The functions are passed AUX as auxiliary data. */
399 void
400 add_transformation_with_finalizer (struct dataset *ds,
401                                    trns_finalize_func *finalize,
402                                    trns_proc_func *proc,
403                                    trns_free_func *free, void *aux)
404 {
405   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
406
407   if ( ds->xform_callback)
408     ds->xform_callback (true, ds->xform_callback_aux);
409 }
410
411 /* Returns the index of the next transformation.
412    This value can be returned by a transformation procedure
413    function to indicate a "jump" to that transformation. */
414 size_t
415 next_transformation (const struct dataset *ds)
416 {
417   return trns_chain_next (ds->cur_trns_chain);
418 }
419
420 /* Returns true if the next call to add_transformation() will add
421    a temporary transformation, false if it will add a permanent
422    transformation. */
423 bool
424 proc_in_temporary_transformations (const struct dataset *ds)
425 {
426   return ds->temporary_trns_chain != NULL;
427 }
428
429 /* Marks the start of temporary transformations.
430    Further calls to add_transformation() will add temporary
431    transformations. */
432 void
433 proc_start_temporary_transformations (struct dataset *ds)
434 {
435   if (!proc_in_temporary_transformations (ds))
436     {
437       add_case_limit_trns (ds);
438
439       ds->permanent_dict = dict_clone (ds->dict);
440
441       trns_chain_finalize (ds->permanent_trns_chain);
442       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
443
444       if ( ds->xform_callback)
445         ds->xform_callback (true, ds->xform_callback_aux);
446     }
447 }
448
449 /* Converts all the temporary transformations, if any, to
450    permanent transformations.  Further transformations will be
451    permanent.
452    Returns true if anything changed, false otherwise. */
453 bool
454 proc_make_temporary_transformations_permanent (struct dataset *ds)
455 {
456   if (proc_in_temporary_transformations (ds))
457     {
458       trns_chain_finalize (ds->temporary_trns_chain);
459       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
460       ds->temporary_trns_chain = NULL;
461
462       dict_destroy (ds->permanent_dict);
463       ds->permanent_dict = NULL;
464
465       return true;
466     }
467   else
468     return false;
469 }
470
471 /* Cancels all temporary transformations, if any.  Further
472    transformations will be permanent.
473    Returns true if anything changed, false otherwise. */
474 bool
475 proc_cancel_temporary_transformations (struct dataset *ds)
476 {
477   if (proc_in_temporary_transformations (ds))
478     {
479       dict_destroy (ds->dict);
480       ds->dict = ds->permanent_dict;
481       ds->permanent_dict = NULL;
482
483       trns_chain_destroy (ds->temporary_trns_chain);
484       ds->temporary_trns_chain = NULL;
485
486       if ( ds->xform_callback)
487         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
488                             ds->xform_callback_aux);
489
490       return true;
491     }
492   else
493     return false;
494 }
495
496 /* Cancels all transformations, if any.
497    Returns true if successful, false on I/O error. */
498 bool
499 proc_cancel_all_transformations (struct dataset *ds)
500 {
501   bool ok;
502   assert (ds->proc_state == PROC_COMMITTED);
503   ok = trns_chain_destroy (ds->permanent_trns_chain);
504   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
505   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
506   ds->temporary_trns_chain = NULL;
507   if ( ds->xform_callback)
508     ds->xform_callback (false, ds->xform_callback_aux);
509
510   return ok;
511 }
512 \f
513 /* Initializes procedure handling. */
514 struct dataset *
515 create_dataset (void)
516 {
517   struct dataset *ds = xzalloc (sizeof(*ds));
518   ds->dict = dict_create ();
519   ds->caseinit = caseinit_create ();
520   proc_cancel_all_transformations (ds);
521   return ds;
522 }
523
524
525 void
526 dataset_add_transform_change_callback (struct dataset *ds,
527                                        transformation_change_callback_func *cb,
528                                        void *aux)
529 {
530   ds->xform_callback = cb;
531   ds->xform_callback_aux = aux;
532 }
533
534 /* Finishes up procedure handling. */
535 void
536 destroy_dataset (struct dataset *ds)
537 {
538   proc_discard_active_file (ds);
539   dict_destroy (ds->dict);
540   caseinit_destroy (ds->caseinit);
541   trns_chain_destroy (ds->permanent_trns_chain);
542
543   if ( ds->xform_callback)
544     ds->xform_callback (false, ds->xform_callback_aux);
545   free (ds);
546 }
547
548 /* Causes output from the next procedure to be discarded, instead
549    of being preserved for use as input for the next procedure. */
550 void
551 proc_discard_output (struct dataset *ds)
552 {
553   ds->discard_output = true;
554 }
555
556 /* Discards the active file dictionary, data, and
557    transformations. */
558 void
559 proc_discard_active_file (struct dataset *ds)
560 {
561   assert (ds->proc_state == PROC_COMMITTED);
562
563   dict_clear (ds->dict);
564   fh_set_default_handle (NULL);
565
566   ds->n_lag = 0;
567
568   casereader_destroy (ds->source);
569   ds->source = NULL;
570
571   proc_cancel_all_transformations (ds);
572 }
573
574 /* Sets SOURCE as the source for procedure input for the next
575    procedure. */
576 void
577 proc_set_active_file (struct dataset *ds,
578                       struct casereader *source,
579                       struct dictionary *dict)
580 {
581   assert (ds->proc_state == PROC_COMMITTED);
582   assert (ds->dict != dict);
583
584   proc_discard_active_file (ds);
585
586   dict_destroy (ds->dict);
587   ds->dict = dict;
588
589   proc_set_active_file_data (ds, source);
590 }
591
592 /* Replaces the active file's data by READER without replacing
593    the associated dictionary. */
594 bool
595 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
596 {
597   casereader_destroy (ds->source);
598   ds->source = reader;
599
600   caseinit_clear (ds->caseinit);
601   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
602
603   return reader == NULL || !casereader_error (reader);
604 }
605
606 /* Returns true if an active file data source is available, false
607    otherwise. */
608 bool
609 proc_has_active_file (const struct dataset *ds)
610 {
611   return ds->source != NULL;
612 }
613
614 /* Returns the active file data source from DS, or a null pointer
615    if DS has no data source, and removes it from DS. */
616 struct casereader *
617 proc_extract_active_file_data (struct dataset *ds)
618 {
619   struct casereader *reader = ds->source;
620   ds->source = NULL;
621
622   return reader;
623 }
624
625 /* Checks whether DS has a corrupted active file.  If so,
626    discards it and returns false.  If not, returns true without
627    doing anything. */
628 bool
629 dataset_end_of_command (struct dataset *ds)
630 {
631   if (ds->source != NULL)
632     {
633       if (casereader_error (ds->source))
634         {
635           proc_discard_active_file (ds);
636           return false;
637         }
638       else
639         {
640           const struct taint *taint = casereader_get_taint (ds->source);
641           taint_reset_successor_taint ((struct taint *) taint);
642           assert (!taint_has_tainted_successor (taint));
643         }
644     }
645   return true;
646 }
647 \f
648 static trns_proc_func case_limit_trns_proc;
649 static trns_free_func case_limit_trns_free;
650
651 /* Adds a transformation that limits the number of cases that may
652    pass through, if DS->DICT has a case limit. */
653 static void
654 add_case_limit_trns (struct dataset *ds)
655 {
656   casenumber case_limit = dict_get_case_limit (ds->dict);
657   if (case_limit != 0)
658     {
659       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
660       *cases_remaining = case_limit;
661       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
662                           cases_remaining);
663       dict_set_case_limit (ds->dict, 0);
664     }
665 }
666
667 /* Limits the maximum number of cases processed to
668    *CASES_REMAINING. */
669 static int
670 case_limit_trns_proc (void *cases_remaining_,
671                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
672 {
673   size_t *cases_remaining = cases_remaining_;
674   if (*cases_remaining > 0)
675     {
676       (*cases_remaining)--;
677       return TRNS_CONTINUE;
678     }
679   else
680     return TRNS_DROP_CASE;
681 }
682
683 /* Frees the data associated with a case limit transformation. */
684 static bool
685 case_limit_trns_free (void *cases_remaining_)
686 {
687   size_t *cases_remaining = cases_remaining_;
688   free (cases_remaining);
689   return true;
690 }
691 \f
692 static trns_proc_func filter_trns_proc;
693
694 /* Adds a temporary transformation to filter data according to
695    the variable specified on FILTER, if any. */
696 static void
697 add_filter_trns (struct dataset *ds)
698 {
699   struct variable *filter_var = dict_get_filter (ds->dict);
700   if (filter_var != NULL)
701     {
702       proc_start_temporary_transformations (ds);
703       add_transformation (ds, filter_trns_proc, NULL, filter_var);
704     }
705 }
706
707 /* FILTER transformation. */
708 static int
709 filter_trns_proc (void *filter_var_,
710                   struct ccase **c UNUSED, casenumber case_nr UNUSED)
711
712 {
713   struct variable *filter_var = filter_var_;
714   double f = case_num (*c, filter_var);
715   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
716           ? TRNS_CONTINUE : TRNS_DROP_CASE);
717 }
718
719
720 struct dictionary *
721 dataset_dict (const struct dataset *ds)
722 {
723   return ds->dict;
724 }
725
726 const struct casereader *
727 dataset_source (const struct dataset *ds)
728 {
729   return ds->source;
730 }
731
732 void
733 dataset_need_lag (struct dataset *ds, int n_before)
734 {
735   ds->n_lag = MAX (ds->n_lag, n_before);
736 }