#include <data/transformations.h>
#include <data/variable.h>
#include <libpspp/alloc.h>
+#include <libpspp/deque.h>
#include <libpspp/misc.h>
#include <libpspp/str.h>
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
- /* Lag queue. */
+ /* Cases just before ("lagging") the current one. */
int n_lag; /* Number of cases to lag. */
- int lag_count; /* Number of cases in lag_queue so far. */
- int lag_head; /* Index where next case will be added. */
- struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ struct deque lag; /* Deque of lagged cases. */
+ struct ccase *lag_cases; /* Lagged cases managed by deque. */
/* Procedure data. */
bool is_open; /* Procedure open? */
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
\f
if (retval != TRNS_CONTINUE)
continue;
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &ds->trns_case);
+ /* Write case to collection of lagged cases. */
+ if (ds->n_lag > 0)
+ {
+ while (deque_count (&ds->lag) >= ds->n_lag)
+ case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+ case_clone (&ds->lag_cases[deque_push_front (&ds->lag)],
+ &ds->trns_case);
+ }
/* Write case to replacement active file. */
ds->cases_written++;
if (ds->proc_sink->class->open != NULL)
ds->proc_sink->class->open (ds->proc_sink);
- /* Allocate memory for lag queue. */
- if (ds->n_lag > 0)
- {
- int i;
-
- ds->lag_count = 0;
- ds->lag_head = 0;
- ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
- for (i = 0; i < ds->n_lag; i++)
- case_nullify (&ds->lag_queue[i]);
- }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
- if (ds->lag_count < ds->n_lag)
- ds->lag_count++;
- case_destroy (&ds->lag_queue[ds->lag_head]);
- case_clone (&ds->lag_queue[ds->lag_head], c);
- if (++ds->lag_head >= ds->n_lag)
- ds->lag_head = 0;
+ /* Allocate memory for lagged cases. */
+ ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
}
/* Clears the variables in C that need to be cleared between
static bool
close_active_file (struct dataset *ds)
{
- /* Free memory for lag queue, and turn off lagging. */
- if (ds->n_lag > 0)
- {
- int i;
-
- for (i = 0; i < ds->n_lag; i++)
- case_destroy (&ds->lag_queue[i]);
- free (ds->lag_queue);
- ds->n_lag = 0;
- }
+ /* Free memory for lagged cases. */
+ while (!deque_is_empty (&ds->lag))
+ case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+ free (ds->lag_cases);
/* Dictionary from before TEMPORARY becomes permanent. */
proc_cancel_temporary_transformations (ds);
struct ccase *
lagged_case (const struct dataset *ds, int n_before)
{
- assert (n_before >= 1 );
+ assert (n_before >= 1);
assert (n_before <= ds->n_lag);
- if (n_before <= ds->lag_count)
- {
- int index = ds->lag_head - n_before;
- if (index < 0)
- index += ds->n_lag;
- return &ds->lag_queue[index];
- }
+ if (n_before <= deque_count (&ds->lag))
+ return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
else
return NULL;
}
dict_destroy (old_dict);
}
-int
-dataset_n_lag (const struct dataset *ds)
+void
+dataset_need_lag (struct dataset *ds, int n_before)
{
- return ds->n_lag;
+ ds->n_lag = MAX (ds->n_lag, n_before);
}
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
-{
- ds->n_lag = n_lag;
-}
-
-
struct casefile_factory *
dataset_get_casefile_factory (const struct dataset *ds)
{