added minmax
[pspp-builds.git] / src / data / procedure.c
index 2154f689c8f50d7a3370d8d99a935f2056590897..ed69420c3e464bd71221bd3bff5ee0ad3e727037 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - computes sample statistics.
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
@@ -26,6 +26,7 @@
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
+#include <data/casedeque.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
@@ -75,11 +76,9 @@ struct dataset {
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
-  /* Lag queue. */
+  /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
   int n_lag;                   /* Number of cases to lag. */
-  int lag_count;               /* Number of cases in lag_queue so far. */
-  int lag_head;                /* Index where next case will be added. */
-  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+  struct casedeque lagged_cases; /* Lagged cases. */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
@@ -100,7 +99,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
@@ -108,7 +106,7 @@ static bool close_active_file (struct dataset *ds);
 
 /* Returns the last time the data was read. */
 time_t
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (struct dataset *ds) 
+time_of_last_procedure (struct dataset *ds)
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
@@ -127,10 +125,10 @@ time_of_last_procedure (struct dataset *ds)
       start the next case from step 1.
 
    2. Write case to replacement active file.
       start the next case from step 1.
 
    2. Write case to replacement active file.
-  
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-     
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
@@ -162,7 +160,7 @@ procedure (struct dataset *ds, case_func *cf, void *aux)
 struct multipass_aux_data
   {
     struct casefile *casefile;
 struct multipass_aux_data
   {
     struct casefile *casefile;
-   
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
@@ -233,8 +231,6 @@ internal_procedure (struct dataset *ds, case_func *proc,
 
   if ( proc_close (ds) && ok )
     {
 
   if ( proc_close (ds) && ok )
     {
-      if ( ds->replace_source )
-       ds->replace_source (ds->proc_source);
 
       return true;
     }
 
       return true;
     }
@@ -295,10 +291,15 @@ proc_read (struct dataset *ds, struct ccase **c)
                                    &ds->trns_case, &case_nr);
       if (retval != TRNS_CONTINUE)
         continue;
                                    &ds->trns_case, &case_nr);
       if (retval != TRNS_CONTINUE)
         continue;
-      /* Write case to LAG queue. */
-      if (ds->n_lag)
-        lag_case (ds, &ds->trns_case);
+
+      /* Write case to collection of lagged cases. */
+      if (ds->n_lag > 0) 
+        {
+          while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+            case_destroy (casedeque_pop_back (&ds->lagged_cases));
+          case_clone (casedeque_push_front (&ds->lagged_cases),
+                      &ds->trns_case);
+        }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
 
       /* Write case to replacement active file. */
       ds->cases_written++;
@@ -313,7 +314,7 @@ proc_read (struct dataset *ds, struct ccase **c)
           else
             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
         }
           else
             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
         }
+
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
@@ -347,7 +348,7 @@ proc_close (struct dataset *ds)
         break;
     }
   ds->ok = free_case_source (ds->proc_source) && ds->ok;
         break;
     }
   ds->ok = free_case_source (ds->proc_source) && ds->ok;
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   case_destroy (&ds->sink_case);
   case_destroy (&ds->trns_case);
 
   case_destroy (&ds->sink_case);
   case_destroy (&ds->trns_case);
@@ -418,29 +419,8 @@ open_active_file (struct dataset *ds)
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
-  /* Allocate memory for lag queue. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-      ds->lag_count = 0;
-      ds->lag_head = 0;
-      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
-      for (i = 0; i < ds->n_lag; i++)
-        case_nullify (&ds->lag_queue[i]);
-    }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
-  if (ds->lag_count < ds->n_lag)
-    ds->lag_count++;
-  case_destroy (&ds->lag_queue[ds->lag_head]);
-  case_clone (&ds->lag_queue[ds->lag_head], c);
-  if (++ds->lag_head >= ds->n_lag)
-    ds->lag_head = 0;
+  /* Allocate memory for lagged cases. */
+  casedeque_init (&ds->lagged_cases, ds->n_lag);
 }
 
 /* Clears the variables in C that need to be cleared between
 }
 
 /* Clears the variables in C that need to be cleared between
@@ -450,7 +430,7 @@ clear_case (const struct dataset *ds, struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
+
   for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
   for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
@@ -468,17 +448,11 @@ clear_case (const struct dataset *ds, struct ccase *c)
 static bool
 close_active_file (struct dataset *ds)
 {
 static bool
 close_active_file (struct dataset *ds)
 {
-  /* Free memory for lag queue, and turn off lagging. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-     
-      for (i = 0; i < ds->n_lag; i++)
-       case_destroy (&ds->lag_queue[i]);
-      free (ds->lag_queue);
-      ds->n_lag = 0;
-    }
+  /* Free memory for lagged cases. */
+  while (!casedeque_is_empty (&ds->lagged_cases))
+    case_destroy (casedeque_pop_back (&ds->lagged_cases));
+  casedeque_destroy (&ds->lagged_cases);
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
@@ -489,10 +463,10 @@ close_active_file (struct dataset *ds)
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
-   
+
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
-    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
@@ -506,16 +480,11 @@ close_active_file (struct dataset *ds)
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
-  assert (n_before >= 1 );
+  assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
   assert (n_before <= ds->n_lag);
 
-  if (n_before <= ds->lag_count)
-    {
-      int index = ds->lag_head - n_before;
-      if (index < 0)
-        index += ds->n_lag;
-      return &ds->lag_queue[index];
-    }
+  if (n_before <= casedeque_count (&ds->lagged_cases))
+    return casedeque_front (&ds->lagged_cases, n_before - 1);
   else
     return NULL;
 }
   else
     return NULL;
 }
@@ -554,7 +523,7 @@ static bool split_procedure_end_func (void *, const struct dataset *);
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-  
+
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
@@ -707,7 +676,7 @@ static bool
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
+
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
@@ -725,12 +694,9 @@ discard_variables (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
-  if ( ds->replace_source )
-    ds->replace_source (ds->proc_source);
 
 
+  free_case_source (ds->proc_source);
+  proc_set_source (ds, NULL);
 
   proc_cancel_all_transformations (ds);
 }
 
   proc_cancel_all_transformations (ds);
 }
@@ -742,7 +708,7 @@ struct trns_chain *
 proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
 proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
+
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
@@ -801,6 +767,7 @@ proc_start_temporary_transformations (struct dataset *ds)
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
@@ -836,8 +803,7 @@ proc_cancel_temporary_transformations (struct dataset *ds)
 {
   if (proc_in_temporary_transformations (ds))
     {
 {
   if (proc_in_temporary_transformations (ds))
     {
-      dict_destroy (ds->dict);
-      ds->dict = ds->permanent_dict;
+      dataset_set_dict (ds, ds->permanent_dict);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
@@ -902,8 +868,10 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink)
 void
 proc_set_source (struct dataset *ds, struct case_source *source)
 {
 void
 proc_set_source (struct dataset *ds, struct case_source *source)
 {
-  assert (ds->proc_source == NULL);
   ds->proc_source = source;
   ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
 }
 
 /* Returns true if a source for the next procedure has been
@@ -931,7 +899,7 @@ proc_capture_output (struct dataset *ds)
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
 
   return casefile;
 }
@@ -1015,29 +983,28 @@ dataset_dict (const struct dataset *ds)
 }
 
 
 }
 
 
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
 void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
 void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
+  struct dictionary *old_dict = ds->dict;
+
   dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
 
   if ( ds->replace_dict )
     ds->replace_dict (dict);
   dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
 
   if ( ds->replace_dict )
     ds->replace_dict (dict);
-}
 
 
-int
-dataset_n_lag (const struct dataset *ds)
-{
-  return ds->n_lag;
+  dict_destroy (old_dict);
 }
 
 }
 
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+void 
+dataset_need_lag (struct dataset *ds, int n_before)
 {
 {
-  ds->n_lag = n_lag;
+  ds->n_lag = MAX (ds->n_lag, n_before);
 }
 
 }
 
-
 struct casefile_factory *
 dataset_get_casefile_factory (const struct dataset *ds)
 {
 struct casefile_factory *
 dataset_get_casefile_factory (const struct dataset *ds)
 {