#include "ofpbuf.h"
#include "ofproto.h"
#include "packets.h"
+#include "poll-loop.h"
#include "socket-util.h"
#include "timeval.h"
#include "util.h"
uint32_t netflow_cnt; /* Flow sequence number for NetFlow. */
struct ofpbuf packet; /* NetFlow packet being accumulated. */
long long int active_timeout; /* Timeout for flows that are still active. */
+ long long int next_timeout; /* Next scheduled active timeout. */
long long int reconfig_time; /* When we reconfigured the timeouts. */
};
}
nf_rec->tcp_flags = nf_flow->tcp_flags;
nf_rec->ip_proto = expired->flow.nw_proto;
- nf_rec->ip_tos = expired->flow.nw_tos;
+ nf_rec->ip_tos = expired->flow.nw_tos & IP_DSCP_MASK;
/* NetFlow messages are limited to 30 records. */
if (ntohs(nf_hdr->count) >= 30) {
nf_flow->tcp_flags = 0;
}
-void
+/* Returns true if it's time to send out a round of NetFlow active timeouts,
+ * false otherwise. */
+bool
netflow_run(struct netflow *nf)
{
if (nf->packet.size) {
collectors_send(nf->collectors, nf->packet.data, nf->packet.size);
nf->packet.size = 0;
}
+
+ if (nf->active_timeout && time_msec() >= nf->next_timeout) {
+ nf->next_timeout = time_msec() + 1000;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void
+netflow_wait(struct netflow *nf)
+{
+ if (nf->active_timeout) {
+ poll_timer_wait_until(nf->next_timeout);
+ }
+ if (nf->packet.size) {
+ poll_immediate_wake();
+ }
}
int
nf->active_timeout *= 1000;
if (old_timeout != nf->active_timeout) {
nf->reconfig_time = time_msec();
+ nf->next_timeout = time_msec();
}
return error;
struct netflow *
netflow_create(void)
{
- struct netflow *nf = xmalloc(sizeof *nf);
+ struct netflow *nf = xzalloc(sizeof *nf);
nf->engine_type = 0;
nf->engine_id = 0;
nf->boot_time = time_msec();