#include "ofpbuf.h"
#include "ofproto.h"
#include "packets.h"
+#include "poll-loop.h"
#include "socket-util.h"
#include "timeval.h"
#include "util.h"
uint32_t netflow_cnt; /* Flow sequence number for NetFlow. */
struct ofpbuf packet; /* NetFlow packet being accumulated. */
long long int active_timeout; /* Timeout for flows that are still active. */
+ long long int next_timeout; /* Next scheduled active timeout. */
long long int reconfig_time; /* When we reconfigured the timeouts. */
};
nf_flow->tcp_flags = 0;
}
-void
+/* Returns true if it's time to send out a round of NetFlow active timeouts,
+ * false otherwise. */
+bool
netflow_run(struct netflow *nf)
{
if (nf->packet.size) {
collectors_send(nf->collectors, nf->packet.data, nf->packet.size);
nf->packet.size = 0;
}
+
+ if (nf->active_timeout && time_msec() >= nf->next_timeout) {
+ nf->next_timeout = time_msec() + 1000;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void
+netflow_wait(struct netflow *nf)
+{
+ if (nf->active_timeout) {
+ poll_timer_wait_until(nf->next_timeout);
+ }
+ if (nf->packet.size) {
+ poll_immediate_wake();
+ }
}
int
nf->active_timeout *= 1000;
if (old_timeout != nf->active_timeout) {
nf->reconfig_time = time_msec();
+ nf->next_timeout = time_msec();
}
return error;
struct netflow *
netflow_create(void)
{
- struct netflow *nf = xmalloc(sizeof *nf);
+ struct netflow *nf = xzalloc(sizeof *nf);
nf->engine_type = 0;
nf->engine_id = 0;
nf->boot_time = time_msec();
/* Flow expiration. */
static int expire(struct ofproto_dpif *);
+/* NetFlow. */
+static void send_netflow_active_timeouts(struct ofproto_dpif *);
+
/* Utilities. */
static int send_packet(struct ofproto_dpif *, uint32_t odp_port,
const struct ofpbuf *packet);
}
if (ofproto->netflow) {
- netflow_run(ofproto->netflow);
+ if (netflow_run(ofproto->netflow)) {
+ send_netflow_active_timeouts(ofproto);
+ }
}
if (ofproto->sflow) {
dpif_sflow_run(ofproto->sflow);
HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
bundle_wait(bundle);
}
+ if (ofproto->netflow) {
+ netflow_wait(ofproto->netflow);
+ }
mac_learning_wait(ofproto->ml);
stp_wait(ofproto);
if (ofproto->need_revalidate) {
htonll(s.n_hit + ofproto->n_matches));
}
-static int
-set_netflow(struct ofproto *ofproto_,
- const struct netflow_options *netflow_options)
-{
- struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-
- if (netflow_options) {
- if (!ofproto->netflow) {
- ofproto->netflow = netflow_create();
- }
- return netflow_set_options(ofproto->netflow, netflow_options);
- } else {
- netflow_destroy(ofproto->netflow);
- ofproto->netflow = NULL;
- return 0;
- }
-}
-
static struct ofport *
port_alloc(void)
{
return bucket * BUCKET_WIDTH;
}
-static void
-facet_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet)
-{
- if (ofproto->netflow && !facet_is_controller_flow(facet) &&
- netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) {
- struct ofexpired expired;
-
- if (facet->installed) {
- struct dpif_flow_stats stats;
-
- facet_put__(ofproto, facet, facet->actions, facet->actions_len,
- &stats);
- facet_update_stats(ofproto, facet, &stats);
- }
-
- expired.flow = facet->flow;
- expired.packet_count = facet->packet_count;
- expired.byte_count = facet->byte_count;
- expired.used = facet->used;
- netflow_expire(ofproto->netflow, &facet->nf_flow, &expired);
- }
-}
-
static void
expire_facets(struct ofproto_dpif *ofproto, int dp_max_idle)
{
struct facet *facet, *next_facet;
HMAP_FOR_EACH_SAFE (facet, next_facet, hmap_node, &ofproto->facets) {
- facet_active_timeout(ofproto, facet);
if (facet->used < cutoff) {
facet_remove(ofproto, facet);
}
}
return error;
}
+\f
+/* NetFlow. */
+
+static int
+set_netflow(struct ofproto *ofproto_,
+ const struct netflow_options *netflow_options)
+{
+ struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+ if (netflow_options) {
+ if (!ofproto->netflow) {
+ ofproto->netflow = netflow_create();
+ }
+ return netflow_set_options(ofproto->netflow, netflow_options);
+ } else {
+ netflow_destroy(ofproto->netflow);
+ ofproto->netflow = NULL;
+ return 0;
+ }
+}
static void
get_netflow_ids(const struct ofproto *ofproto_,
dpif_get_netflow_ids(ofproto->dpif, engine_type, engine_id);
}
+
+static void
+send_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet)
+{
+ if (!facet_is_controller_flow(facet) &&
+ netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) {
+ struct ofexpired expired;
+
+ if (facet->installed) {
+ struct dpif_flow_stats stats;
+
+ facet_put__(ofproto, facet, facet->actions, facet->actions_len,
+ &stats);
+ facet_update_stats(ofproto, facet, &stats);
+ }
+
+ expired.flow = facet->flow;
+ expired.packet_count = facet->packet_count;
+ expired.byte_count = facet->byte_count;
+ expired.used = facet->used;
+ netflow_expire(ofproto->netflow, &facet->nf_flow, &expired);
+ }
+}
+
+static void
+send_netflow_active_timeouts(struct ofproto_dpif *ofproto)
+{
+ struct facet *facet;
+
+ HMAP_FOR_EACH (facet, hmap_node, &ofproto->facets) {
+ send_active_timeout(ofproto, facet);
+ }
+}
\f
static struct ofproto_dpif *
ofproto_dpif_lookup(const char *name)