Add a deque, implemented as a circular queue, to libpspp.
[pspp-builds.git] / src / data / procedure.c
index b6f988aabbaf46b2707940aca2e646504ced5ffa..ed69420c3e464bd71221bd3bff5ee0ad3e727037 100644 (file)
@@ -26,6 +26,7 @@
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
+#include <data/casedeque.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
@@ -75,11 +76,9 @@ struct dataset {
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
-  /* Lag queue. */
+  /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
-  int lag_count;               /* Number of cases in lag_queue so far. */
-  int lag_head;                /* Index where next case will be added. */
-  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+  struct casedeque lagged_cases; /* Lagged cases. */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
@@ -100,7 +99,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
@@ -294,9 +292,14 @@ proc_read (struct dataset *ds, struct ccase **c)
       if (retval != TRNS_CONTINUE)
         continue;
 
-      /* Write case to LAG queue. */
-      if (ds->n_lag)
-        lag_case (ds, &ds->trns_case);
+      /* Write case to collection of lagged cases. */
+      if (ds->n_lag > 0) 
+        {
+          while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+            case_destroy (casedeque_pop_back (&ds->lagged_cases));
+          case_clone (casedeque_push_front (&ds->lagged_cases),
+                      &ds->trns_case);
+        }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
@@ -416,29 +419,8 @@ open_active_file (struct dataset *ds)
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
-  /* Allocate memory for lag queue. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-
-      ds->lag_count = 0;
-      ds->lag_head = 0;
-      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
-      for (i = 0; i < ds->n_lag; i++)
-        case_nullify (&ds->lag_queue[i]);
-    }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
-  if (ds->lag_count < ds->n_lag)
-    ds->lag_count++;
-  case_destroy (&ds->lag_queue[ds->lag_head]);
-  case_clone (&ds->lag_queue[ds->lag_head], c);
-  if (++ds->lag_head >= ds->n_lag)
-    ds->lag_head = 0;
+  /* Allocate memory for lagged cases. */
+  casedeque_init (&ds->lagged_cases, ds->n_lag);
 }
 
 /* Clears the variables in C that need to be cleared between
@@ -466,16 +448,10 @@ clear_case (const struct dataset *ds, struct ccase *c)
 static bool
 close_active_file (struct dataset *ds)
 {
-  /* Free memory for lag queue, and turn off lagging. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-
-      for (i = 0; i < ds->n_lag; i++)
-       case_destroy (&ds->lag_queue[i]);
-      free (ds->lag_queue);
-      ds->n_lag = 0;
-    }
+  /* Free memory for lagged cases. */
+  while (!casedeque_is_empty (&ds->lagged_cases))
+    case_destroy (casedeque_pop_back (&ds->lagged_cases));
+  casedeque_destroy (&ds->lagged_cases);
 
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
@@ -504,16 +480,11 @@ close_active_file (struct dataset *ds)
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
-  assert (n_before >= 1 );
+  assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
-  if (n_before <= ds->lag_count)
-    {
-      int index = ds->lag_head - n_before;
-      if (index < 0)
-        index += ds->n_lag;
-      return &ds->lag_queue[index];
-    }
+  if (n_before <= casedeque_count (&ds->lagged_cases))
+    return casedeque_front (&ds->lagged_cases, n_before - 1);
   else
     return NULL;
 }
@@ -1028,19 +999,12 @@ dataset_set_dict (struct dataset *ds, struct dictionary *dict)
   dict_destroy (old_dict);
 }
 
-int
-dataset_n_lag (const struct dataset *ds)
+void 
+dataset_need_lag (struct dataset *ds, int n_before)
 {
-  return ds->n_lag;
+  ds->n_lag = MAX (ds->n_lag, n_before);
 }
 
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
-{
-  ds->n_lag = n_lag;
-}
-
-
 struct casefile_factory *
 dataset_get_casefile_factory (const struct dataset *ds)
 {