Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / procedure.c
index f280b6828a314b20a7e6a3c912fcab9f29da23d5..b762214d10e20a33d6bbdf89ac52253a67589453 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
+   Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 #include <data/procedure.h>
 #include <data/transformations.h>
 #include <data/variable.h>
-#include <libpspp/alloc.h>
 #include <libpspp/deque.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 #include <libpspp/taint.h>
+#include <libpspp/i18n.h>
 
+#include "minmax.h"
+#include "xalloc.h"
 
 struct dataset {
   /* Cases are read from source,
@@ -79,7 +81,7 @@ struct dataset {
   /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
   struct deque lag;             /* Deque of lagged cases. */
-  struct ccase *lag_cases;      /* Lagged cases managed by deque. */
+  struct ccase **lag_cases;     /* Lagged cases managed by deque. */
 
   /* Procedure data. */
   enum
@@ -92,6 +94,10 @@ struct dataset {
   proc_state;
   casenumber cases_written;       /* Cases output so far. */
   bool ok;                    /* Error status. */
+
+  void (*callback) (void *); /* Callback for when the dataset changes */
+  void *cb_data;
+
 }; /* struct dataset */
 
 
@@ -99,9 +105,24 @@ static void add_case_limit_trns (struct dataset *ds);
 static void add_filter_trns (struct dataset *ds);
 
 static void update_last_proc_invocation (struct dataset *ds);
+
+static void
+dataset_set_unsaved (const struct dataset *ds)
+{
+  if (ds->callback) ds->callback (ds->cb_data);
+}
+
 \f
 /* Public functions. */
 
+void
+dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
+{
+  ds->callback = cb;
+  ds->cb_data = cb_data;
+}
+
+
 /* Returns the last time the data was read. */
 time_t
 time_of_last_procedure (struct dataset *ds)
@@ -137,7 +158,7 @@ proc_execute (struct dataset *ds)
   return proc_commit (ds) && ok;
 }
 
-static struct casereader_class proc_casereader_class;
+static const struct casereader_class proc_casereader_class;
 
 /* Opens dataset DS for reading cases with proc_read.
    proc_commit must be called when done. */
@@ -166,11 +187,19 @@ proc_open (struct dataset *ds)
     {
       struct dictionary *pd = ds->permanent_dict;
       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
-      bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
-      ds->compactor = (should_compact
-                       ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
-                       : NULL);
-      ds->sink = autopaging_writer_create (compacted_value_cnt);
+      if (compacted_value_cnt < dict_get_next_value_idx (pd))
+        {
+          struct caseproto *compacted_proto;
+          compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
+          ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
+          ds->sink = autopaging_writer_create (compacted_proto);
+          caseproto_unref (compacted_proto);
+        }
+      else
+        {
+          ds->compactor = NULL;
+          ds->sink = autopaging_writer_create (dict_get_proto (pd));
+        }
     }
   else
     {
@@ -188,8 +217,7 @@ proc_open (struct dataset *ds)
   /* FIXME: use taint in dataset in place of `ok'? */
   /* FIXME: for trivial cases we can just return a clone of
      ds->source? */
-  return casereader_create_sequential (NULL,
-                                       dict_get_next_value_idx (ds->dict),
+  return casereader_create_sequential (NULL, dict_get_proto (ds->dict),
                                        CASENUMBER_MAX,
                                        &proc_casereader_class, ds);
 }
@@ -203,15 +231,15 @@ proc_is_open (const struct dataset *ds)
 }
 
 /* "read" function for procedure casereader. */
-static bool
-proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
-                      struct ccase *c)
+static struct ccase *
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
 {
   struct dataset *ds = ds_;
   enum trns_result retval = TRNS_DROP_CASE;
+  struct ccase *c;
 
   assert (ds->proc_state == PROC_OPEN);
-  for (;;)
+  for (; ; case_unref (c))
     {
       casenumber case_nr;
 
@@ -219,58 +247,47 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
       if (retval == TRNS_ERROR)
         ds->ok = false;
       if (!ds->ok)
-        return false;
+        return NULL;
 
       /* Read a case from source. */
-      if (!casereader_read (ds->source, c))
-        return false;
-      case_resize (c, dict_get_next_value_idx (ds->dict));
+      c = casereader_read (ds->source);
+      if (c == NULL)
+        return NULL;
+      c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
       caseinit_init_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
-                                   c, case_nr);
+                                   &c, case_nr);
       caseinit_update_left_vars (ds->caseinit, c);
       if (retval != TRNS_CONTINUE)
-        {
-          case_destroy (c);
-          continue;
-        }
+        continue;
 
       /* Write case to collection of lagged cases. */
       if (ds->n_lag > 0)
         {
           while (deque_count (&ds->lag) >= ds->n_lag)
-            case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
-          case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
+            case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
+          ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
         }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
       if (ds->sink != NULL)
-        {
-          struct ccase tmp;
-          if (ds->compactor != NULL)
-            case_map_execute (ds->compactor, c, &tmp);
-          else
-            case_clone (&tmp, c);
-          casewriter_write (ds->sink, &tmp);
-        }
+        casewriter_write (ds->sink,
+                          case_map_execute (ds->compactor, case_ref (c)));
 
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
-                                       c, ds->cases_written);
+                                       &c, ds->cases_written);
           if (retval != TRNS_CONTINUE)
-            {
-              case_destroy (c);
-              continue;
-            }
+            continue;
         }
 
-      return true;
+      return c;
     }
 }
 
@@ -279,13 +296,13 @@ static void
 proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
   struct dataset *ds = ds_;
-  struct ccase c;
+  struct ccase *c;
 
   /* Make sure transformations happen for every input case, in
      case they have side effects, and ensure that the replacement
      active file gets all the cases it should. */
-  while (casereader_read (reader, &c))
-    case_destroy (&c);
+  while ((c = casereader_read (reader)) != NULL)
+    case_unref (c);
 
   ds->proc_state = PROC_CLOSED;
   ds->ok = casereader_destroy (ds->source) && ds->ok;
@@ -304,9 +321,11 @@ proc_commit (struct dataset *ds)
   assert (ds->proc_state == PROC_CLOSED);
   ds->proc_state = PROC_COMMITTED;
 
+  dataset_set_unsaved (ds);
+
   /* Free memory for lagged cases. */
   while (!deque_is_empty (&ds->lag))
-    case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+    case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
   free (ds->lag_cases);
 
   /* Dictionary from before TEMPORARY becomes permanent. */
@@ -344,7 +363,7 @@ proc_commit (struct dataset *ds)
 }
 
 /* Casereader class for procedure execution. */
-static struct casereader_class proc_casereader_class =
+static const struct casereader_class proc_casereader_class =
   {
     proc_casereader_read,
     proc_casereader_destroy,
@@ -361,14 +380,14 @@ update_last_proc_invocation (struct dataset *ds)
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
+const struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
   assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
   if (n_before <= deque_count (&ds->lag))
-    return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
+    return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
   else
     return NULL;
 }
@@ -521,12 +540,25 @@ proc_cancel_all_transformations (struct dataset *ds)
   return ok;
 }
 \f
+
+static void
+dict_callback (struct dictionary *d UNUSED, void *ds_)
+{
+  struct dataset *ds = ds_;
+  dataset_set_unsaved (ds);
+}
+
 /* Initializes procedure handling. */
 struct dataset *
 create_dataset (void)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+
+  dict_set_change_callback (ds->dict, dict_callback, ds);
+
+  dict_set_encoding (ds->dict, get_default_encoding ());
+
   ds->caseinit = caseinit_create ();
   proc_cancel_all_transformations (ds);
   return ds;
@@ -596,6 +628,7 @@ proc_set_active_file (struct dataset *ds,
 
   dict_destroy (ds->dict);
   ds->dict = dict;
+  dict_set_change_callback (ds->dict, dict_callback, ds);
 
   proc_set_active_file_data (ds, source);
 }
@@ -664,10 +697,10 @@ static trns_free_func case_limit_trns_free;
 static void
 add_case_limit_trns (struct dataset *ds)
 {
-  size_t case_limit = dict_get_case_limit (ds->dict);
+  casenumber case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
     {
-      size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+      casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
                           cases_remaining);
@@ -679,7 +712,7 @@ add_case_limit_trns (struct dataset *ds)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
+                      struct ccase **c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
   if (*cases_remaining > 0)
@@ -718,11 +751,11 @@ add_filter_trns (struct dataset *ds)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+                  struct ccase **c UNUSED, casenumber case_nr UNUSED)
 
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var);
+  double f = case_num (*c, filter_var);
   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }