Fixed bug reporting the significance of paired value t-test.
[pspp-builds.git] / src / data / procedure.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 #include <data/case.h>
25 #include <data/case-map.h>
26 #include <data/caseinit.h>
27 #include <data/casereader.h>
28 #include <data/casereader-provider.h>
29 #include <data/casewriter.h>
30 #include <data/dictionary.h>
31 #include <data/file-handle-def.h>
32 #include <data/procedure.h>
33 #include <data/transformations.h>
34 #include <data/variable.h>
35 #include <libpspp/deque.h>
36 #include <libpspp/misc.h>
37 #include <libpspp/str.h>
38 #include <libpspp/taint.h>
39
40 #include "xalloc.h"
41
42 struct dataset {
43   /* Cases are read from source,
44      their transformation variables are initialized,
45      pass through permanent_trns_chain (which transforms them into
46      the format described by permanent_dict),
47      are written to sink,
48      pass through temporary_trns_chain (which transforms them into
49      the format described by dict),
50      and are finally passed to the procedure. */
51   struct casereader *source;
52   struct caseinit *caseinit;
53   struct trns_chain *permanent_trns_chain;
54   struct dictionary *permanent_dict;
55   struct casewriter *sink;
56   struct trns_chain *temporary_trns_chain;
57   struct dictionary *dict;
58
59   /* Callback which occurs whenever the transformation chain(s) have
60      been modified */
61   transformation_change_callback_func *xform_callback;
62   void *xform_callback_aux;
63
64   /* If true, cases are discarded instead of being written to
65      sink. */
66   bool discard_output;
67
68   /* The transformation chain that the next transformation will be
69      added to. */
70   struct trns_chain *cur_trns_chain;
71
72   /* The case map used to compact a case, if necessary;
73      otherwise a null pointer. */
74   struct case_map *compactor;
75
76   /* Time at which proc was last invoked. */
77   time_t last_proc_invocation;
78
79   /* Cases just before ("lagging") the current one. */
80   int n_lag;                    /* Number of cases to lag. */
81   struct deque lag;             /* Deque of lagged cases. */
82   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
83
84   /* Procedure data. */
85   enum
86     {
87       PROC_COMMITTED,           /* No procedure in progress. */
88       PROC_OPEN,                /* proc_open called, casereader still open. */
89       PROC_CLOSED               /* casereader from proc_open destroyed,
90                                    but proc_commit not yet called. */
91     }
92   proc_state;
93   casenumber cases_written;       /* Cases output so far. */
94   bool ok;                    /* Error status. */
95 }; /* struct dataset */
96
97
98 static void add_case_limit_trns (struct dataset *ds);
99 static void add_filter_trns (struct dataset *ds);
100
101 static void update_last_proc_invocation (struct dataset *ds);
102 \f
103 /* Public functions. */
104
105 /* Returns the last time the data was read. */
106 time_t
107 time_of_last_procedure (struct dataset *ds)
108 {
109   if (ds->last_proc_invocation == 0)
110     update_last_proc_invocation (ds);
111   return ds->last_proc_invocation;
112 }
113 \f
114 /* Regular procedure. */
115
116 /* Executes any pending transformations, if necessary.
117    This is not identical to the EXECUTE command in that it won't
118    always read the source data.  This can be important when the
119    source data is given inline within BEGIN DATA...END FILE. */
120 bool
121 proc_execute (struct dataset *ds)
122 {
123   bool ok;
124
125   if ((ds->temporary_trns_chain == NULL
126        || trns_chain_is_empty (ds->temporary_trns_chain))
127       && trns_chain_is_empty (ds->permanent_trns_chain))
128     {
129       ds->n_lag = 0;
130       ds->discard_output = false;
131       dict_set_case_limit (ds->dict, 0);
132       dict_clear_vectors (ds->dict);
133       return true;
134     }
135
136   ok = casereader_destroy (proc_open (ds));
137   return proc_commit (ds) && ok;
138 }
139
140 static const struct casereader_class proc_casereader_class;
141
142 /* Opens dataset DS for reading cases with proc_read.
143    proc_commit must be called when done. */
144 struct casereader *
145 proc_open (struct dataset *ds)
146 {
147   assert (ds->source != NULL);
148   assert (ds->proc_state == PROC_COMMITTED);
149
150   update_last_proc_invocation (ds);
151
152   caseinit_mark_for_init (ds->caseinit, ds->dict);
153
154   /* Finish up the collection of transformations. */
155   add_case_limit_trns (ds);
156   add_filter_trns (ds);
157   trns_chain_finalize (ds->cur_trns_chain);
158
159   /* Make permanent_dict refer to the dictionary right before
160      data reaches the sink. */
161   if (ds->permanent_dict == NULL)
162     ds->permanent_dict = ds->dict;
163
164   /* Prepare sink. */
165   if (!ds->discard_output)
166     {
167       struct dictionary *pd = ds->permanent_dict;
168       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
169       bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
170       ds->compactor = (should_compact
171                        ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
172                        : NULL);
173       ds->sink = autopaging_writer_create (compacted_value_cnt);
174     }
175   else
176     {
177       ds->compactor = NULL;
178       ds->sink = NULL;
179     }
180
181   /* Allocate memory for lagged cases. */
182   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
183
184   ds->proc_state = PROC_OPEN;
185   ds->cases_written = 0;
186   ds->ok = true;
187
188   /* FIXME: use taint in dataset in place of `ok'? */
189   /* FIXME: for trivial cases we can just return a clone of
190      ds->source? */
191   return casereader_create_sequential (NULL,
192                                        dict_get_next_value_idx (ds->dict),
193                                        CASENUMBER_MAX,
194                                        &proc_casereader_class, ds);
195 }
196
197 /* Returns true if a procedure is in progress, that is, if
198    proc_open has been called but proc_commit has not. */
199 bool
200 proc_is_open (const struct dataset *ds)
201 {
202   return ds->proc_state != PROC_COMMITTED;
203 }
204
205 /* "read" function for procedure casereader. */
206 static bool
207 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
208                       struct ccase *c)
209 {
210   struct dataset *ds = ds_;
211   enum trns_result retval = TRNS_DROP_CASE;
212
213   assert (ds->proc_state == PROC_OPEN);
214   for (;;)
215     {
216       casenumber case_nr;
217
218       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
219       if (retval == TRNS_ERROR)
220         ds->ok = false;
221       if (!ds->ok)
222         return false;
223
224       /* Read a case from source. */
225       if (!casereader_read (ds->source, c))
226         return false;
227       case_resize (c, dict_get_next_value_idx (ds->dict));
228       caseinit_init_vars (ds->caseinit, c);
229
230       /* Execute permanent transformations.  */
231       case_nr = ds->cases_written + 1;
232       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
233                                    c, case_nr);
234       caseinit_update_left_vars (ds->caseinit, c);
235       if (retval != TRNS_CONTINUE)
236         {
237           case_destroy (c);
238           continue;
239         }
240
241       /* Write case to collection of lagged cases. */
242       if (ds->n_lag > 0)
243         {
244           while (deque_count (&ds->lag) >= ds->n_lag)
245             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
246           case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
247         }
248
249       /* Write case to replacement active file. */
250       ds->cases_written++;
251       if (ds->sink != NULL)
252         {
253           struct ccase tmp;
254           if (ds->compactor != NULL)
255             case_map_execute (ds->compactor, c, &tmp);
256           else
257             case_clone (&tmp, c);
258           casewriter_write (ds->sink, &tmp);
259         }
260
261       /* Execute temporary transformations. */
262       if (ds->temporary_trns_chain != NULL)
263         {
264           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
265                                        c, ds->cases_written);
266           if (retval != TRNS_CONTINUE)
267             {
268               case_destroy (c);
269               continue;
270             }
271         }
272
273       return true;
274     }
275 }
276
277 /* "destroy" function for procedure casereader. */
278 static void
279 proc_casereader_destroy (struct casereader *reader, void *ds_)
280 {
281   struct dataset *ds = ds_;
282   struct ccase c;
283
284   /* Make sure transformations happen for every input case, in
285      case they have side effects, and ensure that the replacement
286      active file gets all the cases it should. */
287   while (casereader_read (reader, &c))
288     case_destroy (&c);
289
290   ds->proc_state = PROC_CLOSED;
291   ds->ok = casereader_destroy (ds->source) && ds->ok;
292   ds->source = NULL;
293   proc_set_active_file_data (ds, NULL);
294 }
295
296 /* Must return false if the source casereader, a transformation,
297    or the sink casewriter signaled an error.  (If a temporary
298    transformation signals an error, then the return value is
299    false, but the replacement active file may still be
300    untainted.) */
301 bool
302 proc_commit (struct dataset *ds)
303 {
304   assert (ds->proc_state == PROC_CLOSED);
305   ds->proc_state = PROC_COMMITTED;
306
307   /* Free memory for lagged cases. */
308   while (!deque_is_empty (&ds->lag))
309     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
310   free (ds->lag_cases);
311
312   /* Dictionary from before TEMPORARY becomes permanent. */
313   proc_cancel_temporary_transformations (ds);
314
315   if (!ds->discard_output)
316     {
317       /* Finish compacting. */
318       if (ds->compactor != NULL)
319         {
320           case_map_destroy (ds->compactor);
321           ds->compactor = NULL;
322
323           dict_delete_scratch_vars (ds->dict);
324           dict_compact_values (ds->dict);
325         }
326
327       /* Old data sink becomes new data source. */
328       if (ds->sink != NULL)
329         ds->source = casewriter_make_reader (ds->sink);
330     }
331   else
332     {
333       ds->source = NULL;
334       ds->discard_output = false;
335     }
336   ds->sink = NULL;
337
338   caseinit_clear (ds->caseinit);
339   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
340
341   dict_clear_vectors (ds->dict);
342   ds->permanent_dict = NULL;
343   return proc_cancel_all_transformations (ds) && ds->ok;
344 }
345
346 /* Casereader class for procedure execution. */
347 static const struct casereader_class proc_casereader_class =
348   {
349     proc_casereader_read,
350     proc_casereader_destroy,
351     NULL,
352     NULL,
353   };
354
355 /* Updates last_proc_invocation. */
356 static void
357 update_last_proc_invocation (struct dataset *ds)
358 {
359   ds->last_proc_invocation = time (NULL);
360 }
361 \f
362 /* Returns a pointer to the lagged case from N_BEFORE cases before the
363    current one, or NULL if there haven't been that many cases yet. */
364 struct ccase *
365 lagged_case (const struct dataset *ds, int n_before)
366 {
367   assert (n_before >= 1);
368   assert (n_before <= ds->n_lag);
369
370   if (n_before <= deque_count (&ds->lag))
371     return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
372   else
373     return NULL;
374 }
375 \f
376 /* Returns the current set of permanent transformations,
377    and clears the permanent transformations.
378    For use by INPUT PROGRAM. */
379 struct trns_chain *
380 proc_capture_transformations (struct dataset *ds)
381 {
382   struct trns_chain *chain;
383
384   assert (ds->temporary_trns_chain == NULL);
385   chain = ds->permanent_trns_chain;
386   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
387
388   if ( ds->xform_callback)
389     ds->xform_callback (false, ds->xform_callback_aux);
390
391   return chain;
392 }
393
394 /* Adds a transformation that processes a case with PROC and
395    frees itself with FREE to the current set of transformations.
396    The functions are passed AUX as auxiliary data. */
397 void
398 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
399 {
400   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
401   if ( ds->xform_callback)
402     ds->xform_callback (true, ds->xform_callback_aux);
403 }
404
405 /* Adds a transformation that processes a case with PROC and
406    frees itself with FREE to the current set of transformations.
407    When parsing of the block of transformations is complete,
408    FINALIZE will be called.
409    The functions are passed AUX as auxiliary data. */
410 void
411 add_transformation_with_finalizer (struct dataset *ds,
412                                    trns_finalize_func *finalize,
413                                    trns_proc_func *proc,
414                                    trns_free_func *free, void *aux)
415 {
416   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
417
418   if ( ds->xform_callback)
419     ds->xform_callback (true, ds->xform_callback_aux);
420 }
421
422 /* Returns the index of the next transformation.
423    This value can be returned by a transformation procedure
424    function to indicate a "jump" to that transformation. */
425 size_t
426 next_transformation (const struct dataset *ds)
427 {
428   return trns_chain_next (ds->cur_trns_chain);
429 }
430
431 /* Returns true if the next call to add_transformation() will add
432    a temporary transformation, false if it will add a permanent
433    transformation. */
434 bool
435 proc_in_temporary_transformations (const struct dataset *ds)
436 {
437   return ds->temporary_trns_chain != NULL;
438 }
439
440 /* Marks the start of temporary transformations.
441    Further calls to add_transformation() will add temporary
442    transformations. */
443 void
444 proc_start_temporary_transformations (struct dataset *ds)
445 {
446   if (!proc_in_temporary_transformations (ds))
447     {
448       add_case_limit_trns (ds);
449
450       ds->permanent_dict = dict_clone (ds->dict);
451
452       trns_chain_finalize (ds->permanent_trns_chain);
453       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
454
455       if ( ds->xform_callback)
456         ds->xform_callback (true, ds->xform_callback_aux);
457     }
458 }
459
460 /* Converts all the temporary transformations, if any, to
461    permanent transformations.  Further transformations will be
462    permanent.
463    Returns true if anything changed, false otherwise. */
464 bool
465 proc_make_temporary_transformations_permanent (struct dataset *ds)
466 {
467   if (proc_in_temporary_transformations (ds))
468     {
469       trns_chain_finalize (ds->temporary_trns_chain);
470       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
471       ds->temporary_trns_chain = NULL;
472
473       dict_destroy (ds->permanent_dict);
474       ds->permanent_dict = NULL;
475
476       return true;
477     }
478   else
479     return false;
480 }
481
482 /* Cancels all temporary transformations, if any.  Further
483    transformations will be permanent.
484    Returns true if anything changed, false otherwise. */
485 bool
486 proc_cancel_temporary_transformations (struct dataset *ds)
487 {
488   if (proc_in_temporary_transformations (ds))
489     {
490       dict_destroy (ds->dict);
491       ds->dict = ds->permanent_dict;
492       ds->permanent_dict = NULL;
493
494       trns_chain_destroy (ds->temporary_trns_chain);
495       ds->temporary_trns_chain = NULL;
496
497       if ( ds->xform_callback)
498         ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
499                             ds->xform_callback_aux);
500
501       return true;
502     }
503   else
504     return false;
505 }
506
507 /* Cancels all transformations, if any.
508    Returns true if successful, false on I/O error. */
509 bool
510 proc_cancel_all_transformations (struct dataset *ds)
511 {
512   bool ok;
513   assert (ds->proc_state == PROC_COMMITTED);
514   ok = trns_chain_destroy (ds->permanent_trns_chain);
515   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
516   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
517   ds->temporary_trns_chain = NULL;
518   if ( ds->xform_callback)
519     ds->xform_callback (false, ds->xform_callback_aux);
520
521   return ok;
522 }
523 \f
524 /* Initializes procedure handling. */
525 struct dataset *
526 create_dataset (void)
527 {
528   struct dataset *ds = xzalloc (sizeof(*ds));
529   ds->dict = dict_create ();
530   ds->caseinit = caseinit_create ();
531   proc_cancel_all_transformations (ds);
532   return ds;
533 }
534
535
536 void
537 dataset_add_transform_change_callback (struct dataset *ds,
538                                        transformation_change_callback_func *cb,
539                                        void *aux)
540 {
541   ds->xform_callback = cb;
542   ds->xform_callback_aux = aux;
543 }
544
545 /* Finishes up procedure handling. */
546 void
547 destroy_dataset (struct dataset *ds)
548 {
549   proc_discard_active_file (ds);
550   dict_destroy (ds->dict);
551   caseinit_destroy (ds->caseinit);
552   trns_chain_destroy (ds->permanent_trns_chain);
553
554   if ( ds->xform_callback)
555     ds->xform_callback (false, ds->xform_callback_aux);
556   free (ds);
557 }
558
559 /* Causes output from the next procedure to be discarded, instead
560    of being preserved for use as input for the next procedure. */
561 void
562 proc_discard_output (struct dataset *ds)
563 {
564   ds->discard_output = true;
565 }
566
567 /* Discards the active file dictionary, data, and
568    transformations. */
569 void
570 proc_discard_active_file (struct dataset *ds)
571 {
572   assert (ds->proc_state == PROC_COMMITTED);
573
574   dict_clear (ds->dict);
575   fh_set_default_handle (NULL);
576
577   ds->n_lag = 0;
578
579   casereader_destroy (ds->source);
580   ds->source = NULL;
581
582   proc_cancel_all_transformations (ds);
583 }
584
585 /* Sets SOURCE as the source for procedure input for the next
586    procedure. */
587 void
588 proc_set_active_file (struct dataset *ds,
589                       struct casereader *source,
590                       struct dictionary *dict)
591 {
592   assert (ds->proc_state == PROC_COMMITTED);
593   assert (ds->dict != dict);
594
595   proc_discard_active_file (ds);
596
597   dict_destroy (ds->dict);
598   ds->dict = dict;
599
600   proc_set_active_file_data (ds, source);
601 }
602
603 /* Replaces the active file's data by READER without replacing
604    the associated dictionary. */
605 bool
606 proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
607 {
608   casereader_destroy (ds->source);
609   ds->source = reader;
610
611   caseinit_clear (ds->caseinit);
612   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
613
614   return reader == NULL || !casereader_error (reader);
615 }
616
617 /* Returns true if an active file data source is available, false
618    otherwise. */
619 bool
620 proc_has_active_file (const struct dataset *ds)
621 {
622   return ds->source != NULL;
623 }
624
625 /* Returns the active file data source from DS, or a null pointer
626    if DS has no data source, and removes it from DS. */
627 struct casereader *
628 proc_extract_active_file_data (struct dataset *ds)
629 {
630   struct casereader *reader = ds->source;
631   ds->source = NULL;
632
633   return reader;
634 }
635
636 /* Checks whether DS has a corrupted active file.  If so,
637    discards it and returns false.  If not, returns true without
638    doing anything. */
639 bool
640 dataset_end_of_command (struct dataset *ds)
641 {
642   if (ds->source != NULL)
643     {
644       if (casereader_error (ds->source))
645         {
646           proc_discard_active_file (ds);
647           return false;
648         }
649       else
650         {
651           const struct taint *taint = casereader_get_taint (ds->source);
652           taint_reset_successor_taint ((struct taint *) taint);
653           assert (!taint_has_tainted_successor (taint));
654         }
655     }
656   return true;
657 }
658 \f
659 static trns_proc_func case_limit_trns_proc;
660 static trns_free_func case_limit_trns_free;
661
662 /* Adds a transformation that limits the number of cases that may
663    pass through, if DS->DICT has a case limit. */
664 static void
665 add_case_limit_trns (struct dataset *ds)
666 {
667   casenumber case_limit = dict_get_case_limit (ds->dict);
668   if (case_limit != 0)
669     {
670       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
671       *cases_remaining = case_limit;
672       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
673                           cases_remaining);
674       dict_set_case_limit (ds->dict, 0);
675     }
676 }
677
678 /* Limits the maximum number of cases processed to
679    *CASES_REMAINING. */
680 static int
681 case_limit_trns_proc (void *cases_remaining_,
682                       struct ccase *c UNUSED, casenumber case_nr UNUSED)
683 {
684   size_t *cases_remaining = cases_remaining_;
685   if (*cases_remaining > 0)
686     {
687       (*cases_remaining)--;
688       return TRNS_CONTINUE;
689     }
690   else
691     return TRNS_DROP_CASE;
692 }
693
694 /* Frees the data associated with a case limit transformation. */
695 static bool
696 case_limit_trns_free (void *cases_remaining_)
697 {
698   size_t *cases_remaining = cases_remaining_;
699   free (cases_remaining);
700   return true;
701 }
702 \f
703 static trns_proc_func filter_trns_proc;
704
705 /* Adds a temporary transformation to filter data according to
706    the variable specified on FILTER, if any. */
707 static void
708 add_filter_trns (struct dataset *ds)
709 {
710   struct variable *filter_var = dict_get_filter (ds->dict);
711   if (filter_var != NULL)
712     {
713       proc_start_temporary_transformations (ds);
714       add_transformation (ds, filter_trns_proc, NULL, filter_var);
715     }
716 }
717
718 /* FILTER transformation. */
719 static int
720 filter_trns_proc (void *filter_var_,
721                   struct ccase *c UNUSED, casenumber case_nr UNUSED)
722
723 {
724   struct variable *filter_var = filter_var_;
725   double f = case_num (c, filter_var);
726   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
727           ? TRNS_CONTINUE : TRNS_DROP_CASE);
728 }
729
730
731 struct dictionary *
732 dataset_dict (const struct dataset *ds)
733 {
734   return ds->dict;
735 }
736
737 const struct casereader *
738 dataset_source (const struct dataset *ds)
739 {
740   return ds->source;
741 }
742
743 void
744 dataset_need_lag (struct dataset *ds, int n_before)
745 {
746   ds->n_lag = MAX (ds->n_lag, n_before);
747 }