it works!!
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 bool
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   dict_delete_vars (ds->dict, vars, n);
354   ds->source = case_map_create_input_translator (
355     case_map_to_compact_dict (ds->dict, 0), ds->source);
356   dict_compact_values (ds->dict);
357   caseinit_clear (ds->caseinit);
358   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
359   return true;
360 }
361
362 /* Returns a number unique to DS.  It can be used to distinguish one dataset
363    from any other within a given program run, even datasets that do not exist
364    at the same time. */
365 unsigned int
366 dataset_seqno (const struct dataset *ds)
367 {
368   return ds->seqno;
369 }
370
371 void
372 dataset_set_callbacks (struct dataset *ds,
373                        const struct dataset_callbacks *callbacks,
374                        void *cb_data)
375 {
376   ds->callbacks = callbacks;
377   ds->cb_data = cb_data;
378 }
379
380 enum dataset_display
381 dataset_get_display (const struct dataset *ds)
382 {
383   return ds->display;
384 }
385
386 void
387 dataset_set_display (struct dataset *ds, enum dataset_display display)
388 {
389   ds->display = display;
390 }
391 \f
392 /* Returns the last time the data was read. */
393 time_t
394 time_of_last_procedure (struct dataset *ds)
395 {
396   if (!ds)
397     return time (NULL);
398   if (ds->last_proc_invocation == 0)
399     update_last_proc_invocation (ds);
400   return ds->last_proc_invocation;
401 }
402 \f
403 /* Regular procedure. */
404
405 /* Executes any pending transformations, if necessary.
406    This is not identical to the EXECUTE command in that it won't
407    always read the source data.  This can be important when the
408    source data is given inline within BEGIN DATA...END FILE. */
409 bool
410 proc_execute (struct dataset *ds)
411 {
412   bool ok;
413
414   if ((!ds->temporary || !ds->temporary_trns_chain.n)
415       && !ds->permanent_trns_chain.n)
416     {
417       ds->n_lag = 0;
418       ds->discard_output = false;
419       dict_set_case_limit (ds->dict, 0);
420       dict_clear_vectors (ds->dict);
421       return true;
422     }
423
424   ok = casereader_destroy (proc_open (ds));
425   return proc_commit (ds) && ok;
426 }
427
428 static const struct casereader_class proc_casereader_class;
429
430 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
431    cases filtered out with FILTER BY will not be included in the casereader
432    (which is usually desirable).  If FILTER is false, all cases will be
433    included regardless of FILTER BY settings.
434
435    proc_commit must be called when done. */
436 struct casereader *
437 proc_open_filtering (struct dataset *ds, bool filter)
438 {
439   struct casereader *reader;
440
441   assert (ds->n_stack == 0);
442   assert (ds->source != NULL);
443   assert (ds->proc_state == PROC_COMMITTED);
444
445   update_last_proc_invocation (ds);
446
447   caseinit_mark_for_init (ds->caseinit, ds->dict);
448
449   /* Finish up the collection of transformations. */
450   add_case_limit_trns (ds);
451   if (filter)
452     add_filter_trns (ds);
453   if (!proc_in_temporary_transformations (ds))
454     add_measurement_level_trns (ds, ds->dict);
455
456   /* Make permanent_dict refer to the dictionary right before
457      data reaches the sink. */
458   if (ds->permanent_dict == NULL)
459     ds->permanent_dict = ds->dict;
460
461   /* Prepare sink. */
462   if (!ds->discard_output)
463     {
464       struct dictionary *pd = ds->permanent_dict;
465       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
466       if (compacted_n_values < dict_get_next_value_idx (pd))
467         {
468           struct caseproto *compacted_proto;
469           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
470           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
471           ds->sink = autopaging_writer_create (compacted_proto);
472           caseproto_unref (compacted_proto);
473         }
474       else
475         {
476           ds->compactor = NULL;
477           ds->sink = autopaging_writer_create (dict_get_proto (pd));
478         }
479     }
480   else
481     {
482       ds->compactor = NULL;
483       ds->sink = NULL;
484     }
485
486   /* Allocate memory for lagged cases. */
487   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
488
489   ds->proc_state = PROC_OPEN;
490   ds->cases_written = 0;
491   ds->ok = true;
492
493   /* FIXME: use taint in dataset in place of `ok'? */
494   /* FIXME: for trivial cases we can just return a clone of
495      ds->source? */
496
497   /* Create casereader and insert a shim on top.  The shim allows us to
498      arbitrarily extend the casereader's lifetime, by slurping the cases into
499      the shim's buffer in proc_commit().  That is especially useful when output
500      table_items are generated directly from the procedure casereader (e.g. by
501      the LIST procedure) when we are using an output driver that keeps a
502      reference to the output items passed to it (e.g. the GUI output driver in
503      PSPPIRE). */
504   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
505                                          CASENUMBER_MAX,
506                                          &proc_casereader_class, ds);
507   ds->shim = casereader_shim_insert (reader);
508   return reader;
509 }
510
511 /* Opens dataset DS for reading cases with proc_read.
512    proc_commit must be called when done. */
513 struct casereader *
514 proc_open (struct dataset *ds)
515 {
516   return proc_open_filtering (ds, true);
517 }
518
519 /* Returns true if a procedure is in progress, that is, if
520    proc_open has been called but proc_commit has not. */
521 bool
522 proc_is_open (const struct dataset *ds)
523 {
524   return ds->proc_state != PROC_COMMITTED;
525 }
526
527 /* "read" function for procedure casereader. */
528 static struct ccase *
529 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
530 {
531   struct dataset *ds = ds_;
532   enum trns_result retval = TRNS_DROP_CASE;
533   struct ccase *c;
534
535   assert (ds->proc_state == PROC_OPEN);
536   for (; ; case_unref (c))
537     {
538       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
539       if (retval == TRNS_ERROR)
540         ds->ok = false;
541       if (!ds->ok)
542         return NULL;
543
544       /* Read a case from source. */
545       c = casereader_read (ds->source);
546       if (c == NULL)
547         return NULL;
548       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
549       caseinit_init_vars (ds->caseinit, c);
550
551       /* Execute permanent transformations.  */
552       casenumber case_nr = ds->cases_written + 1;
553       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
554       caseinit_update_left_vars (ds->caseinit, c);
555       if (retval != TRNS_CONTINUE)
556         continue;
557
558       /* Write case to collection of lagged cases. */
559       if (ds->n_lag > 0)
560         {
561           while (deque_count (&ds->lag) >= ds->n_lag)
562             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
563           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
564         }
565
566       /* Write case to replacement dataset. */
567       ds->cases_written++;
568       if (ds->sink != NULL)
569         casewriter_write (ds->sink,
570                           case_map_execute (ds->compactor, case_ref (c)));
571
572       /* Execute temporary transformations. */
573       if (ds->temporary_trns_chain.n)
574         {
575           retval = trns_chain_execute (&ds->temporary_trns_chain,
576                                        ds->cases_written, &c);
577           if (retval != TRNS_CONTINUE)
578             continue;
579         }
580
581       return c;
582     }
583 }
584
585 /* "destroy" function for procedure casereader. */
586 static void
587 proc_casereader_destroy (struct casereader *reader, void *ds_)
588 {
589   struct dataset *ds = ds_;
590   struct ccase *c;
591
592   /* We are always the subreader for a casereader_buffer, so if we're being
593      destroyed then it's because the casereader_buffer has read all the cases
594      that it ever will. */
595   ds->shim = NULL;
596
597   /* Make sure transformations happen for every input case, in
598      case they have side effects, and ensure that the replacement
599      active dataset gets all the cases it should. */
600   while ((c = casereader_read (reader)) != NULL)
601     case_unref (c);
602
603   ds->proc_state = PROC_CLOSED;
604   ds->ok = casereader_destroy (ds->source) && ds->ok;
605   ds->source = NULL;
606   dataset_set_source (ds, NULL);
607 }
608
609 /* Must return false if the source casereader, a transformation,
610    or the sink casewriter signaled an error.  (If a temporary
611    transformation signals an error, then the return value is
612    false, but the replacement active dataset may still be
613    untainted.) */
614 bool
615 proc_commit (struct dataset *ds)
616 {
617   if (ds->shim != NULL)
618     casereader_shim_slurp (ds->shim);
619
620   assert (ds->proc_state == PROC_CLOSED);
621   ds->proc_state = PROC_COMMITTED;
622
623   dataset_changed__ (ds);
624
625   /* Free memory for lagged cases. */
626   while (!deque_is_empty (&ds->lag))
627     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
628   free (ds->lag_cases);
629
630   /* Dictionary from before TEMPORARY becomes permanent. */
631   proc_cancel_temporary_transformations (ds);
632   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
633
634   if (!ds->discard_output)
635     {
636       /* Finish compacting. */
637       if (ds->compactor != NULL)
638         {
639           case_map_destroy (ds->compactor);
640           ds->compactor = NULL;
641
642           dict_delete_scratch_vars (ds->dict);
643           dict_compact_values (ds->dict);
644         }
645
646       /* Old data sink becomes new data source. */
647       if (ds->sink != NULL)
648         ds->source = casewriter_make_reader (ds->sink);
649     }
650   else
651     {
652       ds->source = NULL;
653       ds->discard_output = false;
654     }
655   ds->sink = NULL;
656
657   caseinit_clear (ds->caseinit);
658   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
659
660   dict_clear_vectors (ds->dict);
661   ds->permanent_dict = NULL;
662   return ok;
663 }
664
665 /* Casereader class for procedure execution. */
666 static const struct casereader_class proc_casereader_class =
667   {
668     proc_casereader_read,
669     proc_casereader_destroy,
670     NULL,
671     NULL,
672   };
673
674 /* Updates last_proc_invocation. */
675 static void
676 update_last_proc_invocation (struct dataset *ds)
677 {
678   ds->last_proc_invocation = time (NULL);
679 }
680 \f
681 /* Returns a pointer to the lagged case from N_BEFORE cases before the
682    current one, or NULL if there haven't been that many cases yet. */
683 const struct ccase *
684 lagged_case (const struct dataset *ds, int n_before)
685 {
686   assert (n_before >= 1);
687   assert (n_before <= ds->n_lag);
688
689   if (n_before <= deque_count (&ds->lag))
690     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
691   else
692     return NULL;
693 }
694 \f
695 /* Adds TRNS to the current set of transformations. */
696 void
697 add_transformation (struct dataset *ds,
698                     const struct trns_class *class, void *aux)
699 {
700   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
701                               : ds->temporary ? &ds->temporary_trns_chain
702                               : &ds->permanent_trns_chain);
703   struct transformation t = { .class = class, .aux = aux };
704   trns_chain_append (chain, &t);
705   dataset_transformations_changed__ (ds, true);
706 }
707
708 /* Returns true if the next call to add_transformation() will add
709    a temporary transformation, false if it will add a permanent
710    transformation. */
711 bool
712 proc_in_temporary_transformations (const struct dataset *ds)
713 {
714   return ds->temporary;
715 }
716
717 /* Marks the start of temporary transformations.
718    Further calls to add_transformation() will add temporary
719    transformations. */
720 void
721 proc_start_temporary_transformations (struct dataset *ds)
722 {
723   assert (!ds->n_stack);
724   if (!proc_in_temporary_transformations (ds))
725     {
726       add_case_limit_trns (ds);
727
728       ds->permanent_dict = dict_clone (ds->dict);
729       add_measurement_level_trns (ds, ds->permanent_dict);
730
731       ds->temporary = true;
732       dataset_transformations_changed__ (ds, true);
733     }
734 }
735
736 /* Converts all the temporary transformations, if any, to permanent
737    transformations.  Further transformations will be permanent.
738
739    The FILTER command is implemented as a temporary transformation, so a
740    procedure that uses this function should usually use proc_open_filtering()
741    with FILTER false, instead of plain proc_open().
742
743    Returns true if anything changed, false otherwise. */
744 bool
745 proc_make_temporary_transformations_permanent (struct dataset *ds)
746 {
747   if (proc_in_temporary_transformations (ds))
748     {
749       cancel_measurement_level_trns (&ds->permanent_trns_chain);
750       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
751
752       ds->temporary = false;
753
754       dict_unref (ds->permanent_dict);
755       ds->permanent_dict = NULL;
756
757       return true;
758     }
759   else
760     return false;
761 }
762
763 /* Cancels all temporary transformations, if any.  Further
764    transformations will be permanent.
765    Returns true if anything changed, false otherwise. */
766 bool
767 proc_cancel_temporary_transformations (struct dataset *ds)
768 {
769   if (proc_in_temporary_transformations (ds))
770     {
771       trns_chain_clear (&ds->temporary_trns_chain);
772
773       dict_unref (ds->dict);
774       ds->dict = ds->permanent_dict;
775       ds->permanent_dict = NULL;
776
777       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
778       return true;
779     }
780   else
781     return false;
782 }
783
784 /* Cancels all transformations, if any.
785    Returns true if successful, false on I/O error. */
786 bool
787 proc_cancel_all_transformations (struct dataset *ds)
788 {
789   bool ok;
790   assert (ds->proc_state == PROC_COMMITTED);
791   ok = trns_chain_clear (&ds->permanent_trns_chain);
792   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
793   ds->temporary = false;
794   for (size_t i = 0; i < ds->n_stack; i++)
795     ok = trns_chain_uninit (&ds->stack[i]) && ok;
796   ds->n_stack = 0;
797   dataset_transformations_changed__ (ds, false);
798
799   return ok;
800 }
801
802 void
803 proc_push_transformations (struct dataset *ds)
804 {
805   if (ds->n_stack >= ds->allocated_stack)
806     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
807                             sizeof *ds->stack);
808   trns_chain_init (&ds->stack[ds->n_stack++]);
809 }
810
811 void
812 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
813 {
814   assert (ds->n_stack > 0);
815   *chain = ds->stack[--ds->n_stack];
816 }
817
818 bool
819 proc_has_transformations (const struct dataset *ds)
820 {
821   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
822 }
823
824 static enum trns_result
825 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
826 {
827   struct variable *var = var_;
828
829   *cc = case_unshare (*cc);
830   *case_num_rw (*cc, var) = case_num;
831
832   return TRNS_CONTINUE;
833 }
834
835 /* Add a variable which we can sort by to get back the original order. */
836 struct variable *
837 add_permanent_ordering_transformation (struct dataset *ds)
838 {
839   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
840   struct variable *order_var
841     = (proc_in_temporary_transformations (ds)
842        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
843        : temp_var);
844
845   static const struct trns_class trns_class = {
846     .name = "ordering",
847     .execute = store_case_num
848   };
849   const struct transformation t = { .class = &trns_class, .aux = order_var };
850   trns_chain_append (&ds->permanent_trns_chain, &t);
851
852   return temp_var;
853 }
854 \f
855 /* Causes output from the next procedure to be discarded, instead
856    of being preserved for use as input for the next procedure. */
857 void
858 proc_discard_output (struct dataset *ds)
859 {
860   ds->discard_output = true;
861 }
862
863
864 /* Checks whether DS has a corrupted active dataset.  If so,
865    discards it and returns false.  If not, returns true without
866    doing anything. */
867 bool
868 dataset_end_of_command (struct dataset *ds)
869 {
870   if (ds->source != NULL)
871     {
872       if (casereader_error (ds->source))
873         {
874           dataset_clear (ds);
875           return false;
876         }
877       else
878         {
879           const struct taint *taint = casereader_get_taint (ds->source);
880           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
881           assert (!taint_has_tainted_successor (taint));
882         }
883     }
884   return true;
885 }
886 \f
887 /* Limits the maximum number of cases processed to
888    *CASES_REMAINING. */
889 static enum trns_result
890 case_limit_trns_proc (void *cases_remaining_,
891                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
892 {
893   size_t *cases_remaining = cases_remaining_;
894   if (*cases_remaining > 0)
895     {
896       (*cases_remaining)--;
897       return TRNS_CONTINUE;
898     }
899   else
900     return TRNS_DROP_CASE;
901 }
902
903 /* Frees the data associated with a case limit transformation. */
904 static bool
905 case_limit_trns_free (void *cases_remaining_)
906 {
907   size_t *cases_remaining = cases_remaining_;
908   free (cases_remaining);
909   return true;
910 }
911
912 /* Adds a transformation that limits the number of cases that may
913    pass through, if DS->DICT has a case limit. */
914 static void
915 add_case_limit_trns (struct dataset *ds)
916 {
917   casenumber case_limit = dict_get_case_limit (ds->dict);
918   if (case_limit != 0)
919     {
920       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
921       *cases_remaining = case_limit;
922
923       static const struct trns_class trns_class = {
924         .name = "case limit",
925         .execute = case_limit_trns_proc,
926         .destroy = case_limit_trns_free,
927       };
928       add_transformation (ds, &trns_class, cases_remaining);
929
930       dict_set_case_limit (ds->dict, 0);
931     }
932 }
933
934 \f
935 /* FILTER transformation. */
936 static enum trns_result
937 filter_trns_proc (void *filter_var_,
938                   struct ccase **c, casenumber case_nr UNUSED)
939
940 {
941   struct variable *filter_var = filter_var_;
942   double f = case_num (*c, filter_var);
943   return (f != 0.0 && !var_is_num_missing (filter_var, f)
944           ? TRNS_CONTINUE : TRNS_DROP_CASE);
945 }
946
947 /* Adds a temporary transformation to filter data according to
948    the variable specified on FILTER, if any. */
949 static void
950 add_filter_trns (struct dataset *ds)
951 {
952   struct variable *filter_var = dict_get_filter (ds->dict);
953   if (filter_var != NULL)
954     {
955       proc_start_temporary_transformations (ds);
956
957       static const struct trns_class trns_class = {
958         .name = "FILTER",
959         .execute = filter_trns_proc,
960       };
961       add_transformation (ds, &trns_class, filter_var);
962     }
963 }
964
965 void
966 dataset_need_lag (struct dataset *ds, int n_before)
967 {
968   ds->n_lag = MAX (ds->n_lag, n_before);
969 }
970 \f
971 /* Measurement guesser, for guessing a measurement level from formats and
972    data. */
973
974 struct mg_value
975   {
976     struct hmap_node hmap_node;
977     double value;
978   };
979
980 struct mg_var
981   {
982     struct variable *var;
983     struct hmap *values;
984   };
985
986 static void
987 mg_var_uninit (struct mg_var *mgv)
988 {
989   struct mg_value *mgvalue, *next;
990   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
991                       mgv->values)
992     {
993       hmap_delete (mgv->values, &mgvalue->hmap_node);
994       free (mgvalue);
995     }
996   hmap_destroy (mgv->values);
997   free (mgv->values);
998 }
999
1000 static enum measure
1001 mg_var_interpret (const struct mg_var *mgv)
1002 {
1003   size_t n = hmap_count (mgv->values);
1004   if (!n)
1005     {
1006       /* All missing (or no data). */
1007       return MEASURE_NOMINAL;
1008     }
1009
1010   const struct mg_value *mgvalue;
1011   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1012                  mgv->values)
1013     if (mgvalue->value < 10)
1014       return MEASURE_NOMINAL;
1015   return MEASURE_SCALE;
1016 }
1017
1018 static enum measure
1019 mg_var_add_value (struct mg_var *mgv, double value)
1020 {
1021   if (var_is_num_missing (mgv->var, value))
1022     return MEASURE_UNKNOWN;
1023   else if (value < 0 || value != floor (value))
1024     return MEASURE_SCALE;
1025
1026   size_t hash = hash_double (value, 0);
1027   struct mg_value *mgvalue;
1028   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1029                            hash, mgv->values)
1030     if (mgvalue->value == value)
1031       return MEASURE_UNKNOWN;
1032
1033   mgvalue = xmalloc (sizeof *mgvalue);
1034   mgvalue->value = value;
1035   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1036   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1037     return MEASURE_SCALE;
1038
1039   return MEASURE_UNKNOWN;
1040 }
1041
1042 struct measure_guesser
1043   {
1044     struct mg_var *vars;
1045     size_t n_vars;
1046   };
1047
1048 static struct measure_guesser *
1049 measure_guesser_create__ (struct dictionary *dict)
1050 {
1051   struct mg_var *mgvs = NULL;
1052   size_t n_mgvs = 0;
1053   size_t allocated_mgvs = 0;
1054
1055   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1056     {
1057       struct variable *var = dict_get_var (dict, i);
1058       if (var_get_measure (var) != MEASURE_UNKNOWN)
1059         continue;
1060
1061       const struct fmt_spec *f = var_get_print_format (var);
1062       enum measure m = var_default_measure_for_format (f->type);
1063       if (m != MEASURE_UNKNOWN)
1064         {
1065           var_set_measure (var, m);
1066           continue;
1067         }
1068
1069       if (n_mgvs >= allocated_mgvs)
1070         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1071
1072       struct mg_var *mgv = &mgvs[n_mgvs++];
1073       *mgv = (struct mg_var) {
1074         .var = var,
1075         .values = xmalloc (sizeof *mgv->values),
1076       };
1077       hmap_init (mgv->values);
1078     }
1079   if (!n_mgvs)
1080     return NULL;
1081
1082   struct measure_guesser *mg = xmalloc (sizeof *mg);
1083   *mg = (struct measure_guesser) {
1084     .vars = mgvs,
1085     .n_vars = n_mgvs,
1086   };
1087   return mg;
1088 }
1089
1090 /* Scans through DS's dictionary for variables that have an unknown measurement
1091    level.  For those, if the measurement level can be guessed based on the
1092    variable's type and format, sets a default.  If that's enough, returns NULL.
1093    If any remain whose levels are unknown and can't be guessed that way,
1094    creates and returns a structure that the caller should pass to
1095    measure_guesser_add_case() or measure_guesser_run() for guessing a
1096    measurement level based on the data.  */
1097 struct measure_guesser *
1098 measure_guesser_create (struct dataset *ds)
1099 {
1100   return measure_guesser_create__ (dataset_dict (ds));
1101 }
1102
1103 /* Adds data from case C to MG. */
1104 static void
1105 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1106 {
1107   for (size_t i = 0; i < mg->n_vars; )
1108     {
1109       struct mg_var *mgv = &mg->vars[i];
1110       double value = case_num (c, mgv->var);
1111       enum measure m = mg_var_add_value (mgv, value);
1112       if (m != MEASURE_UNKNOWN)
1113         {
1114           var_set_measure (mgv->var, m);
1115
1116           mg_var_uninit (mgv);
1117           *mgv = mg->vars[--mg->n_vars];
1118         }
1119       else
1120         i++;
1121     }
1122 }
1123
1124 /* Destroys MG. */
1125 void
1126 measure_guesser_destroy (struct measure_guesser *mg)
1127 {
1128   if (!mg)
1129     return;
1130
1131   for (size_t i = 0; i < mg->n_vars; i++)
1132     {
1133       struct mg_var *mgv = &mg->vars[i];
1134       var_set_measure (mgv->var, mg_var_interpret (mgv));
1135       mg_var_uninit (mgv);
1136     }
1137   free (mg->vars);
1138   free (mg);
1139 }
1140
1141 /* Adds final measurement levels based on MG, after all the cases have been
1142    added. */
1143 static void
1144 measure_guesser_commit (struct measure_guesser *mg)
1145 {
1146   for (size_t i = 0; i < mg->n_vars; i++)
1147     {
1148       struct mg_var *mgv = &mg->vars[i];
1149       var_set_measure (mgv->var, mg_var_interpret (mgv));
1150     }
1151 }
1152
1153 /* Passes the cases in READER through MG and uses the data in the cases to set
1154    measurement levels for the variables where they were still unknown. */
1155 void
1156 measure_guesser_run (struct measure_guesser *mg,
1157                      const struct casereader *reader)
1158 {
1159   struct casereader *r = casereader_clone (reader);
1160   while (mg->n_vars > 0)
1161     {
1162       struct ccase *c = casereader_read (r);
1163       if (!c)
1164         break;
1165       measure_guesser_add_case (mg, c);
1166       case_unref (c);
1167     }
1168   casereader_destroy (r);
1169
1170   measure_guesser_commit (mg);
1171 }
1172 \f
1173 /* A transformation for guessing measurement levels. */
1174
1175 static enum trns_result
1176 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1177 {
1178   struct measure_guesser *mg = mg_;
1179   measure_guesser_add_case (mg, *c);
1180   return TRNS_CONTINUE;
1181 }
1182
1183 static bool
1184 mg_trns_free (void *mg_)
1185 {
1186   struct measure_guesser *mg = mg_;
1187   measure_guesser_commit (mg);
1188   measure_guesser_destroy (mg);
1189   return true;
1190 }
1191
1192 static const struct trns_class mg_trns_class = {
1193   .name = "add measurement level",
1194   .execute = mg_trns_proc,
1195   .destroy = mg_trns_free,
1196 };
1197
1198 static void
1199 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1200 {
1201   struct measure_guesser *mg = measure_guesser_create__ (dict);
1202   if (mg)
1203     add_transformation (ds, &mg_trns_class, mg);
1204 }
1205
1206 static void
1207 cancel_measurement_level_trns (struct trns_chain *chain)
1208 {
1209   if (!chain->n)
1210     return;
1211
1212   struct transformation *trns = &chain->xforms[chain->n - 1];
1213   if (trns->class != &mg_trns_class)
1214     return;
1215
1216   struct measure_guesser *mg = trns->aux;
1217   measure_guesser_destroy (mg);
1218   chain->n--;
1219 }
1220 \f
1221 static void
1222 dataset_changed__ (struct dataset *ds)
1223 {
1224   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1225     ds->callbacks->changed (ds->cb_data);
1226 }
1227
1228 static void
1229 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1230 {
1231   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1232     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1233 }
1234 \f
1235 /* Private interface for use by session code. */
1236
1237 void
1238 dataset_set_session__ (struct dataset *ds, struct session *session)
1239 {
1240   ds->session = session;
1241 }