added minmax
[pspp-builds.git] / src / data / procedure.c
index 2154f689c8f50d7a3370d8d99a935f2056590897..ed69420c3e464bd71221bd3bff5ee0ad3e727037 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
@@ -26,6 +26,7 @@
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
+#include <data/casedeque.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
@@ -75,11 +76,9 @@ struct dataset {
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
-  /* Lag queue. */
+  /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
-  int lag_count;               /* Number of cases in lag_queue so far. */
-  int lag_head;                /* Index where next case will be added. */
-  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+  struct casedeque lagged_cases; /* Lagged cases. */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
@@ -100,7 +99,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
@@ -108,7 +106,7 @@ static bool close_active_file (struct dataset *ds);
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (struct dataset *ds) 
+time_of_last_procedure (struct dataset *ds)
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
@@ -127,10 +125,10 @@ time_of_last_procedure (struct dataset *ds)
       start the next case from step 1.
 
    2. Write case to replacement active file.
-  
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-     
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
@@ -162,7 +160,7 @@ procedure (struct dataset *ds, case_func *cf, void *aux)
 struct multipass_aux_data
   {
     struct casefile *casefile;
-   
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
@@ -233,8 +231,6 @@ internal_procedure (struct dataset *ds, case_func *proc,
 
   if ( proc_close (ds) && ok )
     {
-      if ( ds->replace_source )
-       ds->replace_source (ds->proc_source);
 
       return true;
     }
@@ -295,10 +291,15 @@ proc_read (struct dataset *ds, struct ccase **c)
                                    &ds->trns_case, &case_nr);
       if (retval != TRNS_CONTINUE)
         continue;
-      /* Write case to LAG queue. */
-      if (ds->n_lag)
-        lag_case (ds, &ds->trns_case);
+
+      /* Write case to collection of lagged cases. */
+      if (ds->n_lag > 0) 
+        {
+          while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+            case_destroy (casedeque_pop_back (&ds->lagged_cases));
+          case_clone (casedeque_push_front (&ds->lagged_cases),
+                      &ds->trns_case);
+        }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
@@ -313,7 +314,7 @@ proc_read (struct dataset *ds, struct ccase **c)
           else
             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
         }
+
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
@@ -347,7 +348,7 @@ proc_close (struct dataset *ds)
         break;
     }
   ds->ok = free_case_source (ds->proc_source) && ds->ok;
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   case_destroy (&ds->sink_case);
   case_destroy (&ds->trns_case);
@@ -418,29 +419,8 @@ open_active_file (struct dataset *ds)
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
-  /* Allocate memory for lag queue. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-      ds->lag_count = 0;
-      ds->lag_head = 0;
-      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
-      for (i = 0; i < ds->n_lag; i++)
-        case_nullify (&ds->lag_queue[i]);
-    }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
-  if (ds->lag_count < ds->n_lag)
-    ds->lag_count++;
-  case_destroy (&ds->lag_queue[ds->lag_head]);
-  case_clone (&ds->lag_queue[ds->lag_head], c);
-  if (++ds->lag_head >= ds->n_lag)
-    ds->lag_head = 0;
+  /* Allocate memory for lagged cases. */
+  casedeque_init (&ds->lagged_cases, ds->n_lag);
 }
 
 /* Clears the variables in C that need to be cleared between
@@ -450,7 +430,7 @@ clear_case (const struct dataset *ds, struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
+
   for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
@@ -468,17 +448,11 @@ clear_case (const struct dataset *ds, struct ccase *c)
 static bool
 close_active_file (struct dataset *ds)
 {
-  /* Free memory for lag queue, and turn off lagging. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-     
-      for (i = 0; i < ds->n_lag; i++)
-       case_destroy (&ds->lag_queue[i]);
-      free (ds->lag_queue);
-      ds->n_lag = 0;
-    }
+  /* Free memory for lagged cases. */
+  while (!casedeque_is_empty (&ds->lagged_cases))
+    case_destroy (casedeque_pop_back (&ds->lagged_cases));
+  casedeque_destroy (&ds->lagged_cases);
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
@@ -489,10 +463,10 @@ close_active_file (struct dataset *ds)
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
-   
+
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
-    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
@@ -506,16 +480,11 @@ close_active_file (struct dataset *ds)
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
-  assert (n_before >= 1 );
+  assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
-  if (n_before <= ds->lag_count)
-    {
-      int index = ds->lag_head - n_before;
-      if (index < 0)
-        index += ds->n_lag;
-      return &ds->lag_queue[index];
-    }
+  if (n_before <= casedeque_count (&ds->lagged_cases))
+    return casedeque_front (&ds->lagged_cases, n_before - 1);
   else
     return NULL;
 }
@@ -554,7 +523,7 @@ static bool split_procedure_end_func (void *, const struct dataset *);
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-  
+
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
@@ -707,7 +676,7 @@ static bool
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
+
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
@@ -725,12 +694,9 @@ discard_variables (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
-  if ( ds->replace_source )
-    ds->replace_source (ds->proc_source);
 
+  free_case_source (ds->proc_source);
+  proc_set_source (ds, NULL);
 
   proc_cancel_all_transformations (ds);
 }
@@ -742,7 +708,7 @@ struct trns_chain *
 proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
+
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
@@ -801,6 +767,7 @@ proc_start_temporary_transformations (struct dataset *ds)
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
@@ -836,8 +803,7 @@ proc_cancel_temporary_transformations (struct dataset *ds)
 {
   if (proc_in_temporary_transformations (ds))
     {
-      dict_destroy (ds->dict);
-      ds->dict = ds->permanent_dict;
+      dataset_set_dict (ds, ds->permanent_dict);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
@@ -902,8 +868,10 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink)
 void
 proc_set_source (struct dataset *ds, struct case_source *source)
 {
-  assert (ds->proc_source == NULL);
   ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
@@ -931,7 +899,7 @@ proc_capture_output (struct dataset *ds)
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
@@ -1015,29 +983,28 @@ dataset_dict (const struct dataset *ds)
 }
 
 
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
 void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
+  struct dictionary *old_dict = ds->dict;
+
   dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
 
   if ( ds->replace_dict )
     ds->replace_dict (dict);
-}
 
-int
-dataset_n_lag (const struct dataset *ds)
-{
-  return ds->n_lag;
+  dict_destroy (old_dict);
 }
 
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+void 
+dataset_need_lag (struct dataset *ds, int n_before)
 {
-  ds->n_lag = n_lag;
+  ds->n_lag = MAX (ds->n_lag, n_before);
 }
 
-
 struct casefile_factory *
 dataset_get_casefile_factory (const struct dataset *ds)
 {