6ade4b24c997be69acc68b7e07994a2ca0014fbf
[pspp] / src / data / dataset.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/hash-functions.h"
40 #include "libpspp/hmap.h"
41 #include "libpspp/misc.h"
42 #include "libpspp/str.h"
43 #include "libpspp/taint.h"
44 #include "libpspp/i18n.h"
45
46 #include "gl/minmax.h"
47 #include "gl/xalloc.h"
48
49 struct dataset {
50   /* A dataset is usually part of a session.  Within a session its name must
51      unique.  The name must either be a valid PSPP identifier or the empty
52      string.  (It must be unique within the session even if it is the empty
53      string; that is, there may only be a single dataset within a session with
54      the empty string as its name.) */
55   struct session *session;
56   char *name;
57   enum dataset_display display;
58
59   /* Cases are read from source,
60      their transformation variables are initialized,
61      pass through permanent_trns_chain (which transforms them into
62      the format described by permanent_dict),
63      are written to sink,
64      pass through temporary_trns_chain (which transforms them into
65      the format described by dict),
66      and are finally passed to the procedure. */
67   struct casereader *source;
68   struct caseinit *caseinit;
69   struct trns_chain permanent_trns_chain;
70   struct dictionary *permanent_dict;
71   struct casewriter *sink;
72   struct trns_chain temporary_trns_chain;
73   bool temporary;
74   struct dictionary *dict;
75
76   /* Stack of transformation chains for DO IF and LOOP and INPUT PROGRAM. */
77   struct trns_chain *stack;
78   size_t n_stack;
79   size_t allocated_stack;
80
81   /* If true, cases are discarded instead of being written to
82      sink. */
83   bool discard_output;
84
85   /* The case map used to compact a case, if necessary;
86      otherwise a null pointer. */
87   struct case_map *compactor;
88
89   /* Time at which proc was last invoked. */
90   time_t last_proc_invocation;
91
92   /* Cases just before ("lagging") the current one. */
93   int n_lag;                    /* Number of cases to lag. */
94   struct deque lag;             /* Deque of lagged cases. */
95   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
96
97   /* Procedure data. */
98   enum
99     {
100       PROC_COMMITTED,           /* No procedure in progress. */
101       PROC_OPEN,                /* proc_open called, casereader still open. */
102       PROC_CLOSED               /* casereader from proc_open destroyed,
103                                    but proc_commit not yet called. */
104     }
105   proc_state;
106   casenumber cases_written;     /* Cases output so far. */
107   bool ok;                      /* Error status. */
108   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
109
110   const struct dataset_callbacks *callbacks;
111   void *cb_data;
112
113   /* Uniquely distinguishes datasets. */
114   unsigned int seqno;
115 };
116
117 static void dataset_changed__ (struct dataset *);
118 static void dataset_transformations_changed__ (struct dataset *,
119                                                bool non_empty);
120
121 static void add_measurement_level_trns (struct dataset *, struct dictionary *);
122 static void cancel_measurement_level_trns (struct trns_chain *);
123 static void add_case_limit_trns (struct dataset *ds);
124 static void add_filter_trns (struct dataset *ds);
125
126 static void update_last_proc_invocation (struct dataset *ds);
127
128 static void
129 dict_callback (struct dictionary *d UNUSED, void *ds_)
130 {
131   struct dataset *ds = ds_;
132   dataset_changed__ (ds);
133 }
134 \f
135 static void
136 dataset_create_finish__ (struct dataset *ds, struct session *session)
137 {
138   static unsigned int seqno;
139
140   dict_set_change_callback (ds->dict, dict_callback, ds);
141   proc_cancel_all_transformations (ds);
142   dataset_set_session (ds, session);
143   ds->seqno = ++seqno;
144 }
145
146 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
147    SESSION already contains a dataset named NAME, it is deleted and replaced.
148    The dataset initially has an empty dictionary and no data source. */
149 struct dataset *
150 dataset_create (struct session *session, const char *name)
151 {
152   struct dataset *ds = XMALLOC (struct dataset);
153   *ds = (struct dataset) {
154     .name = xstrdup (name),
155     .display = DATASET_FRONT,
156     .dict = dict_create (get_default_encoding ()),
157     .caseinit = caseinit_create (),
158   };
159   dataset_create_finish__ (ds, session);
160
161   return ds;
162 }
163
164 /* Creates and returns a new dataset that has the same data and dictionary as
165    OLD named NAME, adds it to the same session as OLD, and returns the new
166    dataset.  If SESSION already contains a dataset named NAME, it is deleted
167    and replaced.
168
169    OLD must not have any active transformations or temporary state and must
170    not be in the middle of a procedure.
171
172    Callbacks are not cloned. */
173 struct dataset *
174 dataset_clone (struct dataset *old, const char *name)
175 {
176   struct dataset *new;
177
178   assert (old->proc_state == PROC_COMMITTED);
179   assert (!old->permanent_trns_chain.n);
180   assert (old->permanent_dict == NULL);
181   assert (old->sink == NULL);
182   assert (!old->temporary);
183   assert (!old->temporary_trns_chain.n);
184   assert (!old->n_stack);
185
186   new = xzalloc (sizeof *new);
187   new->name = xstrdup (name);
188   new->display = DATASET_FRONT;
189   new->source = casereader_clone (old->source);
190   new->dict = dict_clone (old->dict);
191   new->caseinit = caseinit_clone (old->caseinit);
192   new->last_proc_invocation = old->last_proc_invocation;
193   new->ok = old->ok;
194
195   dataset_create_finish__ (new, old->session);
196
197   return new;
198 }
199
200 /* Destroys DS. */
201 void
202 dataset_destroy (struct dataset *ds)
203 {
204   if (ds != NULL)
205     {
206       dataset_set_session (ds, NULL);
207       dataset_clear (ds);
208       dict_unref (ds->dict);
209       dict_unref (ds->permanent_dict);
210       caseinit_destroy (ds->caseinit);
211       trns_chain_uninit (&ds->permanent_trns_chain);
212       for (size_t i = 0; i < ds->n_stack; i++)
213         trns_chain_uninit (&ds->stack[i]);
214       free (ds->stack);
215       dataset_transformations_changed__ (ds, false);
216       free (ds->name);
217       free (ds);
218     }
219 }
220
221 /* Discards the active dataset's dictionary, data, and transformations. */
222 void
223 dataset_clear (struct dataset *ds)
224 {
225   assert (ds->proc_state == PROC_COMMITTED);
226
227   dict_clear (ds->dict);
228   fh_set_default_handle (NULL);
229
230   ds->n_lag = 0;
231
232   casereader_destroy (ds->source);
233   ds->source = NULL;
234
235   proc_cancel_all_transformations (ds);
236 }
237
238 const char *
239 dataset_name (const struct dataset *ds)
240 {
241   return ds->name;
242 }
243
244 void
245 dataset_set_name (struct dataset *ds, const char *name)
246 {
247   struct session *session = ds->session;
248   bool active = false;
249
250   if (session != NULL)
251     {
252       active = session_active_dataset (session) == ds;
253       if (active)
254         session_set_active_dataset (session, NULL);
255       dataset_set_session (ds, NULL);
256     }
257
258   free (ds->name);
259   ds->name = xstrdup (name);
260
261   if (session != NULL)
262     {
263       dataset_set_session (ds, session);
264       if (active)
265         session_set_active_dataset (session, ds);
266     }
267 }
268
269 struct session *
270 dataset_session (const struct dataset *ds)
271 {
272   return ds->session;
273 }
274
275 void
276 dataset_set_session (struct dataset *ds, struct session *session)
277 {
278   if (session != ds->session)
279     {
280       if (ds->session != NULL)
281         session_remove_dataset (ds->session, ds);
282       if (session != NULL)
283         session_add_dataset (session, ds);
284     }
285 }
286
287 /* Returns the dictionary within DS.  This is always nonnull, although it
288    might not contain any variables. */
289 struct dictionary *
290 dataset_dict (const struct dataset *ds)
291 {
292   return ds->dict;
293 }
294
295 /* Replaces DS's dictionary by DICT, discarding any source and
296    transformations. */
297 void
298 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
299 {
300   assert (ds->proc_state == PROC_COMMITTED);
301   assert (ds->dict != dict);
302
303   dataset_clear (ds);
304
305   dict_unref (ds->dict);
306   ds->dict = dict;
307   dict_set_change_callback (ds->dict, dict_callback, ds);
308 }
309
310 /* Returns the casereader that will be read when a procedure is executed on
311    DS.  This can be NULL if none has been set up yet. */
312 const struct casereader *
313 dataset_source (const struct dataset *ds)
314 {
315   return ds->source;
316 }
317
318 /* Returns true if DS has a data source, false otherwise. */
319 bool
320 dataset_has_source (const struct dataset *ds)
321 {
322   return dataset_source (ds) != NULL;
323 }
324
325 /* Replaces the active dataset's data by READER.  READER's cases must have an
326    appropriate format for DS's dictionary. */
327 bool
328 dataset_set_source (struct dataset *ds, struct casereader *reader)
329 {
330   casereader_destroy (ds->source);
331   ds->source = reader;
332
333   caseinit_clear (ds->caseinit);
334   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
335
336   return reader == NULL || !casereader_error (reader);
337 }
338
339 /* Returns the data source from DS and removes it from DS.  Returns a null
340    pointer if DS has no data source. */
341 struct casereader *
342 dataset_steal_source (struct dataset *ds)
343 {
344   struct casereader *reader = ds->source;
345   ds->source = NULL;
346
347   return reader;
348 }
349
350 void
351 dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
352 {
353   assert (!proc_in_temporary_transformations (ds));
354   assert (!proc_has_transformations (ds));
355   assert (n < dict_get_n_vars (ds->dict));
356
357   dict_delete_vars (ds->dict, vars, n);
358   ds->source = case_map_create_input_translator (
359     case_map_to_compact_dict (ds->dict, 0), ds->source);
360   dict_compact_values (ds->dict);
361   caseinit_clear (ds->caseinit);
362   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
363 }
364
365 /* Returns a number unique to DS.  It can be used to distinguish one dataset
366    from any other within a given program run, even datasets that do not exist
367    at the same time. */
368 unsigned int
369 dataset_seqno (const struct dataset *ds)
370 {
371   return ds->seqno;
372 }
373
374 void
375 dataset_set_callbacks (struct dataset *ds,
376                        const struct dataset_callbacks *callbacks,
377                        void *cb_data)
378 {
379   ds->callbacks = callbacks;
380   ds->cb_data = cb_data;
381 }
382
383 enum dataset_display
384 dataset_get_display (const struct dataset *ds)
385 {
386   return ds->display;
387 }
388
389 void
390 dataset_set_display (struct dataset *ds, enum dataset_display display)
391 {
392   ds->display = display;
393 }
394 \f
395 /* Returns the last time the data was read. */
396 time_t
397 time_of_last_procedure (struct dataset *ds)
398 {
399   if (!ds)
400     return time (NULL);
401   if (ds->last_proc_invocation == 0)
402     update_last_proc_invocation (ds);
403   return ds->last_proc_invocation;
404 }
405 \f
406 /* Regular procedure. */
407
408 /* Executes any pending transformations, if necessary.
409    This is not identical to the EXECUTE command in that it won't
410    always read the source data.  This can be important when the
411    source data is given inline within BEGIN DATA...END FILE. */
412 bool
413 proc_execute (struct dataset *ds)
414 {
415   bool ok;
416
417   if ((!ds->temporary || !ds->temporary_trns_chain.n)
418       && !ds->permanent_trns_chain.n)
419     {
420       ds->n_lag = 0;
421       ds->discard_output = false;
422       dict_set_case_limit (ds->dict, 0);
423       dict_clear_vectors (ds->dict);
424       return true;
425     }
426
427   ok = casereader_destroy (proc_open (ds));
428   return proc_commit (ds) && ok;
429 }
430
431 static const struct casereader_class proc_casereader_class;
432
433 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
434    cases filtered out with FILTER BY will not be included in the casereader
435    (which is usually desirable).  If FILTER is false, all cases will be
436    included regardless of FILTER BY settings.
437
438    proc_commit must be called when done. */
439 struct casereader *
440 proc_open_filtering (struct dataset *ds, bool filter)
441 {
442   struct casereader *reader;
443
444   assert (ds->n_stack == 0);
445   assert (ds->source != NULL);
446   assert (ds->proc_state == PROC_COMMITTED);
447
448   update_last_proc_invocation (ds);
449
450   caseinit_mark_for_init (ds->caseinit, ds->dict);
451   ds->source = caseinit_translate_casereader_to_init_vars (
452     ds->caseinit, dict_get_proto (ds->dict), ds->source);
453
454   /* Finish up the collection of transformations. */
455   add_case_limit_trns (ds);
456   if (filter)
457     add_filter_trns (ds);
458   if (!proc_in_temporary_transformations (ds))
459     add_measurement_level_trns (ds, ds->dict);
460
461   /* Make permanent_dict refer to the dictionary right before
462      data reaches the sink. */
463   if (ds->permanent_dict == NULL)
464     ds->permanent_dict = ds->dict;
465
466   /* Prepare sink. */
467   if (!ds->discard_output)
468     {
469       struct dictionary *pd = ds->permanent_dict;
470       size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
471       if (compacted_n_values < dict_get_next_value_idx (pd))
472         {
473           struct caseproto *compacted_proto;
474           compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
475           ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
476           ds->sink = autopaging_writer_create (compacted_proto);
477           caseproto_unref (compacted_proto);
478         }
479       else
480         {
481           ds->compactor = NULL;
482           ds->sink = autopaging_writer_create (dict_get_proto (pd));
483         }
484     }
485   else
486     {
487       ds->compactor = NULL;
488       ds->sink = NULL;
489     }
490
491   /* Allocate memory for lagged cases. */
492   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
493
494   ds->proc_state = PROC_OPEN;
495   ds->cases_written = 0;
496   ds->ok = true;
497
498   /* FIXME: use taint in dataset in place of `ok'? */
499   /* FIXME: for trivial cases we can just return a clone of
500      ds->source? */
501
502   /* Create casereader and insert a shim on top.  The shim allows us to
503      arbitrarily extend the casereader's lifetime, by slurping the cases into
504      the shim's buffer in proc_commit().  That is especially useful when output
505      table_items are generated directly from the procedure casereader (e.g. by
506      the LIST procedure) when we are using an output driver that keeps a
507      reference to the output items passed to it (e.g. the GUI output driver in
508      PSPPIRE). */
509   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
510                                          CASENUMBER_MAX,
511                                          &proc_casereader_class, ds);
512   ds->shim = casereader_shim_insert (reader);
513   return reader;
514 }
515
516 /* Opens dataset DS for reading cases with proc_read.
517    proc_commit must be called when done. */
518 struct casereader *
519 proc_open (struct dataset *ds)
520 {
521   return proc_open_filtering (ds, true);
522 }
523
524 /* Returns true if a procedure is in progress, that is, if
525    proc_open has been called but proc_commit has not. */
526 bool
527 proc_is_open (const struct dataset *ds)
528 {
529   return ds->proc_state != PROC_COMMITTED;
530 }
531
532 /* "read" function for procedure casereader. */
533 static struct ccase *
534 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
535 {
536   struct dataset *ds = ds_;
537   enum trns_result retval = TRNS_DROP_CASE;
538   struct ccase *c;
539
540   assert (ds->proc_state == PROC_OPEN);
541   for (; ; case_unref (c))
542     {
543       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
544       if (retval == TRNS_ERROR)
545         ds->ok = false;
546       if (!ds->ok)
547         return NULL;
548
549       /* Read a case from source. */
550       c = casereader_read (ds->source);
551       if (c == NULL)
552         return NULL;
553       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
554       caseinit_restore_left_vars (ds->caseinit, c);
555
556       /* Execute permanent transformations.  */
557       casenumber case_nr = ds->cases_written + 1;
558       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
559       caseinit_save_left_vars (ds->caseinit, c);
560       if (retval != TRNS_CONTINUE)
561         continue;
562
563       /* Write case to collection of lagged cases. */
564       if (ds->n_lag > 0)
565         {
566           while (deque_count (&ds->lag) >= ds->n_lag)
567             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
568           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
569         }
570
571       /* Write case to replacement dataset. */
572       ds->cases_written++;
573       if (ds->sink != NULL)
574         casewriter_write (ds->sink,
575                           case_map_execute (ds->compactor, case_ref (c)));
576
577       /* Execute temporary transformations. */
578       if (ds->temporary_trns_chain.n)
579         {
580           retval = trns_chain_execute (&ds->temporary_trns_chain,
581                                        ds->cases_written, &c);
582           if (retval != TRNS_CONTINUE)
583             continue;
584         }
585
586       return c;
587     }
588 }
589
590 /* "destroy" function for procedure casereader. */
591 static void
592 proc_casereader_destroy (struct casereader *reader, void *ds_)
593 {
594   struct dataset *ds = ds_;
595   struct ccase *c;
596
597   /* We are always the subreader for a casereader_buffer, so if we're being
598      destroyed then it's because the casereader_buffer has read all the cases
599      that it ever will. */
600   ds->shim = NULL;
601
602   /* Make sure transformations happen for every input case, in
603      case they have side effects, and ensure that the replacement
604      active dataset gets all the cases it should. */
605   while ((c = casereader_read (reader)) != NULL)
606     case_unref (c);
607
608   ds->proc_state = PROC_CLOSED;
609   ds->ok = casereader_destroy (ds->source) && ds->ok;
610   ds->source = NULL;
611   dataset_set_source (ds, NULL);
612 }
613
614 /* Must return false if the source casereader, a transformation,
615    or the sink casewriter signaled an error.  (If a temporary
616    transformation signals an error, then the return value is
617    false, but the replacement active dataset may still be
618    untainted.) */
619 bool
620 proc_commit (struct dataset *ds)
621 {
622   if (ds->shim != NULL)
623     casereader_shim_slurp (ds->shim);
624
625   assert (ds->proc_state == PROC_CLOSED);
626   ds->proc_state = PROC_COMMITTED;
627
628   dataset_changed__ (ds);
629
630   /* Free memory for lagged cases. */
631   while (!deque_is_empty (&ds->lag))
632     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
633   free (ds->lag_cases);
634
635   /* Dictionary from before TEMPORARY becomes permanent. */
636   proc_cancel_temporary_transformations (ds);
637   bool ok = proc_cancel_all_transformations (ds) && ds->ok;
638
639   if (!ds->discard_output)
640     {
641       /* Finish compacting. */
642       if (ds->compactor != NULL)
643         {
644           case_map_destroy (ds->compactor);
645           ds->compactor = NULL;
646
647           dict_delete_scratch_vars (ds->dict);
648           dict_compact_values (ds->dict);
649         }
650
651       /* Old data sink becomes new data source. */
652       if (ds->sink != NULL)
653         ds->source = casewriter_make_reader (ds->sink);
654     }
655   else
656     {
657       ds->source = NULL;
658       ds->discard_output = false;
659     }
660   ds->sink = NULL;
661
662   caseinit_clear (ds->caseinit);
663   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
664
665   dict_clear_vectors (ds->dict);
666   ds->permanent_dict = NULL;
667   return ok;
668 }
669
670 /* Casereader class for procedure execution. */
671 static const struct casereader_class proc_casereader_class =
672   {
673     proc_casereader_read,
674     proc_casereader_destroy,
675     NULL,
676     NULL,
677   };
678
679 /* Updates last_proc_invocation. */
680 static void
681 update_last_proc_invocation (struct dataset *ds)
682 {
683   ds->last_proc_invocation = time (NULL);
684 }
685 \f
686 /* Returns a pointer to the lagged case from N_BEFORE cases before the
687    current one, or NULL if there haven't been that many cases yet. */
688 const struct ccase *
689 lagged_case (const struct dataset *ds, int n_before)
690 {
691   assert (n_before >= 1);
692   assert (n_before <= ds->n_lag);
693
694   if (n_before <= deque_count (&ds->lag))
695     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
696   else
697     return NULL;
698 }
699 \f
700 /* Adds TRNS to the current set of transformations. */
701 void
702 add_transformation (struct dataset *ds,
703                     const struct trns_class *class, void *aux)
704 {
705   struct trns_chain *chain = (ds->n_stack > 0 ? &ds->stack[ds->n_stack - 1]
706                               : ds->temporary ? &ds->temporary_trns_chain
707                               : &ds->permanent_trns_chain);
708   struct transformation t = { .class = class, .aux = aux };
709   trns_chain_append (chain, &t);
710   dataset_transformations_changed__ (ds, true);
711 }
712
713 /* Returns true if the next call to add_transformation() will add
714    a temporary transformation, false if it will add a permanent
715    transformation. */
716 bool
717 proc_in_temporary_transformations (const struct dataset *ds)
718 {
719   return ds->temporary;
720 }
721
722 /* Marks the start of temporary transformations.
723    Further calls to add_transformation() will add temporary
724    transformations. */
725 void
726 proc_start_temporary_transformations (struct dataset *ds)
727 {
728   assert (!ds->n_stack);
729   if (!proc_in_temporary_transformations (ds))
730     {
731       add_case_limit_trns (ds);
732
733       ds->permanent_dict = dict_clone (ds->dict);
734       add_measurement_level_trns (ds, ds->permanent_dict);
735
736       ds->temporary = true;
737       dataset_transformations_changed__ (ds, true);
738     }
739 }
740
741 /* Converts all the temporary transformations, if any, to permanent
742    transformations.  Further transformations will be permanent.
743
744    The FILTER command is implemented as a temporary transformation, so a
745    procedure that uses this function should usually use proc_open_filtering()
746    with FILTER false, instead of plain proc_open().
747
748    Returns true if anything changed, false otherwise. */
749 bool
750 proc_make_temporary_transformations_permanent (struct dataset *ds)
751 {
752   if (proc_in_temporary_transformations (ds))
753     {
754       cancel_measurement_level_trns (&ds->permanent_trns_chain);
755       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
756
757       ds->temporary = false;
758
759       dict_unref (ds->permanent_dict);
760       ds->permanent_dict = NULL;
761
762       return true;
763     }
764   else
765     return false;
766 }
767
768 /* Cancels all temporary transformations, if any.  Further
769    transformations will be permanent.
770    Returns true if anything changed, false otherwise. */
771 bool
772 proc_cancel_temporary_transformations (struct dataset *ds)
773 {
774   if (proc_in_temporary_transformations (ds))
775     {
776       trns_chain_clear (&ds->temporary_trns_chain);
777
778       dict_unref (ds->dict);
779       ds->dict = ds->permanent_dict;
780       ds->permanent_dict = NULL;
781
782       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
783       return true;
784     }
785   else
786     return false;
787 }
788
789 /* Cancels all transformations, if any.
790    Returns true if successful, false on I/O error. */
791 bool
792 proc_cancel_all_transformations (struct dataset *ds)
793 {
794   bool ok;
795   assert (ds->proc_state == PROC_COMMITTED);
796   ok = trns_chain_clear (&ds->permanent_trns_chain);
797   ok = trns_chain_clear (&ds->temporary_trns_chain) && ok;
798   ds->temporary = false;
799   for (size_t i = 0; i < ds->n_stack; i++)
800     ok = trns_chain_uninit (&ds->stack[i]) && ok;
801   ds->n_stack = 0;
802   dataset_transformations_changed__ (ds, false);
803
804   return ok;
805 }
806
807 void
808 proc_push_transformations (struct dataset *ds)
809 {
810   if (ds->n_stack >= ds->allocated_stack)
811     ds->stack = x2nrealloc (ds->stack, &ds->allocated_stack,
812                             sizeof *ds->stack);
813   trns_chain_init (&ds->stack[ds->n_stack++]);
814 }
815
816 void
817 proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
818 {
819   assert (ds->n_stack > 0);
820   *chain = ds->stack[--ds->n_stack];
821 }
822
823 bool
824 proc_has_transformations (const struct dataset *ds)
825 {
826   return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
827 }
828
829 static enum trns_result
830 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
831 {
832   struct variable *var = var_;
833
834   *cc = case_unshare (*cc);
835   *case_num_rw (*cc, var) = case_num;
836
837   return TRNS_CONTINUE;
838 }
839
840 /* Add a variable which we can sort by to get back the original order. */
841 struct variable *
842 add_permanent_ordering_transformation (struct dataset *ds)
843 {
844   struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
845   struct variable *order_var
846     = (proc_in_temporary_transformations (ds)
847        ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
848        : temp_var);
849
850   static const struct trns_class trns_class = {
851     .name = "ordering",
852     .execute = store_case_num
853   };
854   const struct transformation t = { .class = &trns_class, .aux = order_var };
855   trns_chain_append (&ds->permanent_trns_chain, &t);
856
857   return temp_var;
858 }
859 \f
860 /* Causes output from the next procedure to be discarded, instead
861    of being preserved for use as input for the next procedure. */
862 void
863 proc_discard_output (struct dataset *ds)
864 {
865   ds->discard_output = true;
866 }
867
868
869 /* Checks whether DS has a corrupted active dataset.  If so,
870    discards it and returns false.  If not, returns true without
871    doing anything. */
872 bool
873 dataset_end_of_command (struct dataset *ds)
874 {
875   if (ds->source != NULL)
876     {
877       if (casereader_error (ds->source))
878         {
879           dataset_clear (ds);
880           return false;
881         }
882       else
883         {
884           const struct taint *taint = casereader_get_taint (ds->source);
885           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
886           assert (!taint_has_tainted_successor (taint));
887         }
888     }
889   return true;
890 }
891 \f
892 /* Limits the maximum number of cases processed to
893    *CASES_REMAINING. */
894 static enum trns_result
895 case_limit_trns_proc (void *cases_remaining_,
896                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
897 {
898   size_t *cases_remaining = cases_remaining_;
899   if (*cases_remaining > 0)
900     {
901       (*cases_remaining)--;
902       return TRNS_CONTINUE;
903     }
904   else
905     return TRNS_DROP_CASE;
906 }
907
908 /* Frees the data associated with a case limit transformation. */
909 static bool
910 case_limit_trns_free (void *cases_remaining_)
911 {
912   size_t *cases_remaining = cases_remaining_;
913   free (cases_remaining);
914   return true;
915 }
916
917 /* Adds a transformation that limits the number of cases that may
918    pass through, if DS->DICT has a case limit. */
919 static void
920 add_case_limit_trns (struct dataset *ds)
921 {
922   casenumber case_limit = dict_get_case_limit (ds->dict);
923   if (case_limit != 0)
924     {
925       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
926       *cases_remaining = case_limit;
927
928       static const struct trns_class trns_class = {
929         .name = "case limit",
930         .execute = case_limit_trns_proc,
931         .destroy = case_limit_trns_free,
932       };
933       add_transformation (ds, &trns_class, cases_remaining);
934
935       dict_set_case_limit (ds->dict, 0);
936     }
937 }
938
939 \f
940 /* FILTER transformation. */
941 static enum trns_result
942 filter_trns_proc (void *filter_var_,
943                   struct ccase **c, casenumber case_nr UNUSED)
944
945 {
946   struct variable *filter_var = filter_var_;
947   double f = case_num (*c, filter_var);
948   return (f != 0.0 && !var_is_num_missing (filter_var, f)
949           ? TRNS_CONTINUE : TRNS_DROP_CASE);
950 }
951
952 /* Adds a temporary transformation to filter data according to
953    the variable specified on FILTER, if any. */
954 static void
955 add_filter_trns (struct dataset *ds)
956 {
957   struct variable *filter_var = dict_get_filter (ds->dict);
958   if (filter_var != NULL)
959     {
960       proc_start_temporary_transformations (ds);
961
962       static const struct trns_class trns_class = {
963         .name = "FILTER",
964         .execute = filter_trns_proc,
965       };
966       add_transformation (ds, &trns_class, filter_var);
967     }
968 }
969
970 void
971 dataset_need_lag (struct dataset *ds, int n_before)
972 {
973   ds->n_lag = MAX (ds->n_lag, n_before);
974 }
975 \f
976 /* Measurement guesser, for guessing a measurement level from formats and
977    data. */
978
979 struct mg_value
980   {
981     struct hmap_node hmap_node;
982     double value;
983   };
984
985 struct mg_var
986   {
987     struct variable *var;
988     struct hmap *values;
989   };
990
991 static void
992 mg_var_uninit (struct mg_var *mgv)
993 {
994   struct mg_value *mgvalue, *next;
995   HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
996                       mgv->values)
997     {
998       hmap_delete (mgv->values, &mgvalue->hmap_node);
999       free (mgvalue);
1000     }
1001   hmap_destroy (mgv->values);
1002   free (mgv->values);
1003 }
1004
1005 static enum measure
1006 mg_var_interpret (const struct mg_var *mgv)
1007 {
1008   size_t n = hmap_count (mgv->values);
1009   if (!n)
1010     {
1011       /* All missing (or no data). */
1012       return MEASURE_NOMINAL;
1013     }
1014
1015   const struct mg_value *mgvalue;
1016   HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
1017                  mgv->values)
1018     if (mgvalue->value < 10)
1019       return MEASURE_NOMINAL;
1020   return MEASURE_SCALE;
1021 }
1022
1023 static enum measure
1024 mg_var_add_value (struct mg_var *mgv, double value)
1025 {
1026   if (var_is_num_missing (mgv->var, value))
1027     return MEASURE_UNKNOWN;
1028   else if (value < 0 || value != floor (value))
1029     return MEASURE_SCALE;
1030
1031   size_t hash = hash_double (value, 0);
1032   struct mg_value *mgvalue;
1033   HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
1034                            hash, mgv->values)
1035     if (mgvalue->value == value)
1036       return MEASURE_UNKNOWN;
1037
1038   mgvalue = xmalloc (sizeof *mgvalue);
1039   mgvalue->value = value;
1040   hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
1041   if (hmap_count (mgv->values) >= settings_get_scalemin ())
1042     return MEASURE_SCALE;
1043
1044   return MEASURE_UNKNOWN;
1045 }
1046
1047 struct measure_guesser
1048   {
1049     struct mg_var *vars;
1050     size_t n_vars;
1051   };
1052
1053 static struct measure_guesser *
1054 measure_guesser_create__ (struct dictionary *dict)
1055 {
1056   struct mg_var *mgvs = NULL;
1057   size_t n_mgvs = 0;
1058   size_t allocated_mgvs = 0;
1059
1060   for (size_t i = 0; i < dict_get_n_vars (dict); i++)
1061     {
1062       struct variable *var = dict_get_var (dict, i);
1063       if (var_get_measure (var) != MEASURE_UNKNOWN)
1064         continue;
1065
1066       struct fmt_spec f = var_get_print_format (var);
1067       enum measure m = var_default_measure_for_format (f.type);
1068       if (m != MEASURE_UNKNOWN)
1069         {
1070           var_set_measure (var, m);
1071           continue;
1072         }
1073
1074       if (n_mgvs >= allocated_mgvs)
1075         mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
1076
1077       struct mg_var *mgv = &mgvs[n_mgvs++];
1078       *mgv = (struct mg_var) {
1079         .var = var,
1080         .values = xmalloc (sizeof *mgv->values),
1081       };
1082       hmap_init (mgv->values);
1083     }
1084   if (!n_mgvs)
1085     return NULL;
1086
1087   struct measure_guesser *mg = xmalloc (sizeof *mg);
1088   *mg = (struct measure_guesser) {
1089     .vars = mgvs,
1090     .n_vars = n_mgvs,
1091   };
1092   return mg;
1093 }
1094
1095 /* Scans through DS's dictionary for variables that have an unknown measurement
1096    level.  For those, if the measurement level can be guessed based on the
1097    variable's type and format, sets a default.  If that's enough, returns NULL.
1098    If any remain whose levels are unknown and can't be guessed that way,
1099    creates and returns a structure that the caller should pass to
1100    measure_guesser_add_case() or measure_guesser_run() for guessing a
1101    measurement level based on the data.  */
1102 struct measure_guesser *
1103 measure_guesser_create (struct dataset *ds)
1104 {
1105   return measure_guesser_create__ (dataset_dict (ds));
1106 }
1107
1108 /* Adds data from case C to MG. */
1109 static void
1110 measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
1111 {
1112   for (size_t i = 0; i < mg->n_vars; )
1113     {
1114       struct mg_var *mgv = &mg->vars[i];
1115       double value = case_num (c, mgv->var);
1116       enum measure m = mg_var_add_value (mgv, value);
1117       if (m != MEASURE_UNKNOWN)
1118         {
1119           var_set_measure (mgv->var, m);
1120
1121           mg_var_uninit (mgv);
1122           *mgv = mg->vars[--mg->n_vars];
1123         }
1124       else
1125         i++;
1126     }
1127 }
1128
1129 /* Destroys MG. */
1130 void
1131 measure_guesser_destroy (struct measure_guesser *mg)
1132 {
1133   if (!mg)
1134     return;
1135
1136   for (size_t i = 0; i < mg->n_vars; i++)
1137     {
1138       struct mg_var *mgv = &mg->vars[i];
1139       var_set_measure (mgv->var, mg_var_interpret (mgv));
1140       mg_var_uninit (mgv);
1141     }
1142   free (mg->vars);
1143   free (mg);
1144 }
1145
1146 /* Adds final measurement levels based on MG, after all the cases have been
1147    added. */
1148 static void
1149 measure_guesser_commit (struct measure_guesser *mg)
1150 {
1151   for (size_t i = 0; i < mg->n_vars; i++)
1152     {
1153       struct mg_var *mgv = &mg->vars[i];
1154       var_set_measure (mgv->var, mg_var_interpret (mgv));
1155     }
1156 }
1157
1158 /* Passes the cases in READER through MG and uses the data in the cases to set
1159    measurement levels for the variables where they were still unknown. */
1160 void
1161 measure_guesser_run (struct measure_guesser *mg,
1162                      const struct casereader *reader)
1163 {
1164   struct casereader *r = casereader_clone (reader);
1165   while (mg->n_vars > 0)
1166     {
1167       struct ccase *c = casereader_read (r);
1168       if (!c)
1169         break;
1170       measure_guesser_add_case (mg, c);
1171       case_unref (c);
1172     }
1173   casereader_destroy (r);
1174
1175   measure_guesser_commit (mg);
1176 }
1177 \f
1178 /* A transformation for guessing measurement levels. */
1179
1180 static enum trns_result
1181 mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
1182 {
1183   struct measure_guesser *mg = mg_;
1184   measure_guesser_add_case (mg, *c);
1185   return TRNS_CONTINUE;
1186 }
1187
1188 static bool
1189 mg_trns_free (void *mg_)
1190 {
1191   struct measure_guesser *mg = mg_;
1192   measure_guesser_commit (mg);
1193   measure_guesser_destroy (mg);
1194   return true;
1195 }
1196
1197 static const struct trns_class mg_trns_class = {
1198   .name = "add measurement level",
1199   .execute = mg_trns_proc,
1200   .destroy = mg_trns_free,
1201 };
1202
1203 static void
1204 add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
1205 {
1206   struct measure_guesser *mg = measure_guesser_create__ (dict);
1207   if (mg)
1208     add_transformation (ds, &mg_trns_class, mg);
1209 }
1210
1211 static void
1212 cancel_measurement_level_trns (struct trns_chain *chain)
1213 {
1214   if (!chain->n)
1215     return;
1216
1217   struct transformation *trns = &chain->xforms[chain->n - 1];
1218   if (trns->class != &mg_trns_class)
1219     return;
1220
1221   struct measure_guesser *mg = trns->aux;
1222   measure_guesser_destroy (mg);
1223   chain->n--;
1224 }
1225 \f
1226 static void
1227 dataset_changed__ (struct dataset *ds)
1228 {
1229   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
1230     ds->callbacks->changed (ds->cb_data);
1231 }
1232
1233 static void
1234 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
1235 {
1236   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
1237     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
1238 }
1239 \f
1240 /* Private interface for use by session code. */
1241
1242 void
1243 dataset_set_session__ (struct dataset *ds, struct session *session)
1244 {
1245   ds->session = session;
1246 }