#include "netdev.h"
#include "ofp-print.h"
#include "ofpbuf.h"
+#include "poll-loop.h"
#include "process.h"
#include "rconn.h"
#include "socket-util.h"
+#include "stats.h"
#include "svec.h"
+#include "timeval.h"
#include "util.h"
#include "vconn.h"
+#include "xtoxll.h"
#define THIS_MODULE VLM_bridge
#include "vlog.h"
/* Flow tracking. */
struct ft *ft;
struct tag_set revalidate_set;
+
+ /* Flow statistics gathering. */
+ struct stats_mgr *stats_mgr;
+ struct stats_request *flow_rq; /* Current flow statistics request. */
+ time_t next_stats_request;
};
/* List of all bridges. */
static int bridge_fetch_dp_ifaces(struct bridge *, struct svec *iface_names);
static void bridge_process_msg(struct bridge *, struct ofpbuf *);
-static void revalidate_flow(struct bridge *br, struct ft_flow *f);
+static void revalidate_flow(struct bridge *, struct ft_flow *);
+
+static void flowstats_run(struct bridge *);
+static void flowstats_wait(struct bridge *);
static void port_create(struct bridge *, const char *name);
static void port_reconfigure(struct port *);
if (br->ml) {
mac_learning_wait(br->ml);
}
+ stats_mgr_wait(br->stats_mgr);
+ flowstats_wait(br);
}
}
\f
br->ft = ft_create();
+ br->stats_mgr = stats_mgr_create(br->rconn);
+
/* Create kernel datapath. */
for (;;) {
/* Pick a datapath index.
}
br->sc_state = SC_RUNNING;
br->sent_config = false;
+ br->next_stats_request = time_now();
return;
error:
port_destroy(br->ports[i]);
}
ft_destroy(br->ft);
+ stats_request_destroy(br->flow_rq);
+ stats_mgr_destroy(br->stats_mgr);
mac_learning_destroy(br->ml);
free(br->ports);
free(br);
return retval;
}
+static void
+send_set_config(struct bridge *br)
+{
+ struct ofp_switch_config *osc;
+ struct ofpbuf *msg;
+ int retval;
+
+ osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &msg);
+ osc->flags = htons(OFPC_SEND_FLOW_EXP | OFPC_FRAG_NORMAL);
+ osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
+ retval = rconn_send(br->rconn, msg, &br->txqlen);
+ if (retval) {
+ ofpbuf_delete(msg);
+ }
+}
+
static void
bridge_run_one(struct bridge *br)
{
int iteration;
rconn_run(br->rconn);
+
if (rconn_is_connected(br->rconn) && !br->sent_config) {
- struct ofp_switch_config *osc;
- struct ofpbuf *msg;
- int retval;
-
- osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &msg);
- osc->flags = htons(OFPC_SEND_FLOW_EXP | OFPC_FRAG_NORMAL);
- osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
- retval = rconn_send(br->rconn, msg, &br->txqlen);
- if (retval) {
- ofpbuf_delete(msg);
- } else {
- br->sent_config = true;
- }
+ send_set_config(br);
+ br->sent_config = true;
}
tag_set_init(&br->revalidate_set);
+
+ /* Now do the things that may want to revalidate flows. */
for (iteration = 0; iteration < 50 && !bridge_is_backlogged(br);
iteration++) {
struct ofpbuf *msg = rconn_recv(br->rconn);
if (br->ml) {
mac_learning_run(br->ml, &br->revalidate_set);
}
+ flowstats_run(br);
+
+ /* Now revalidate any flows that need it. */
if (!tag_set_is_empty(&br->revalidate_set)) {
struct ft_flow *f, *next;
}
}
}
+
+ /* Start or restart secchan if necessary. */
run_secchan(br);
}
static packet_handler_func process_echo_request;
static packet_handler_func process_packet_in;
static packet_handler_func process_flow_expired;
+static packet_handler_func process_stats_reply;
+static packet_handler_func process_error_msg;
+
+static void flow_from_match(struct flow *, const struct ofp_match *);
static void
bridge_process_msg(struct bridge *br, struct ofpbuf *msg)
OFPT_FLOW_EXPIRED,
process_flow_expired
},
+ {
+ OFPT_STATS_REPLY,
+ process_stats_reply
+ },
+ {
+ OFPT_ERROR,
+ process_error_msg
+ },
};
const size_t n_processors = ARRAY_SIZE(processors);
const struct processor *p;
struct ft_flow *f;
bool queue;
- f = ft_lookup(br->ft, flow);
+ f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
if (f) {
if (!ftd_equal(dsts, n_dsts, f->dsts, f->n_dsts)) {
ftf_set_dsts(f, dsts, n_dsts);
actions_len);
put_actions(dsts, n_dsts, ntohs(flow->dl_vlan), fbuf);
queue_tx(br, fbuf);
+
+ /* The add_flow message will reset the byte counters. */
+ f->last_byte_count = f->byte_count = 0;
}
} else {
- struct ft_flow *f = ft_lookup(br->ft, flow);
+ struct ft_flow *f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
if (f) {
/* XXX delete flow from ft, queue delete-flow openflow message */
}
queue_tx(br, make_add_flow(flow, pkt->buffer_id,
br->flow_idle_time, 0));
- f = ft_lookup(br->ft, flow);
+ f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
if (f) {
ftf_set_dsts(f, NULL, 0);
f->tags = tags;
if (opi->reason == OFPR_NO_MATCH) {
/* Delete any existing flow from the flow table. It must not really be
* there (otherwise we wouldn't be getting a packet-in). */
- struct ft_flow *f = ft_lookup(br->ft, &flow);
+ struct ft_flow *f = ft_lookup(br->ft, &flow, flow_hash(&flow, 0));
if (f) {
ft_remove(br->ft, f);
ftf_destroy(f);
/* We don't use flows with wildcards, so there's nothing to do. */
return;
}
- flow.nw_src = ofe->match.nw_src;
- flow.nw_dst = ofe->match.nw_dst;
- flow.in_port = ofe->match.in_port;
- flow.dl_vlan = ofe->match.dl_vlan;
- flow.dl_type = ofe->match.dl_type;
- flow.tp_src = ofe->match.tp_src;
- flow.tp_dst = ofe->match.tp_dst;
- memcpy(flow.dl_src, ofe->match.dl_src, ETH_ADDR_LEN);
- memcpy(flow.dl_dst, ofe->match.dl_dst, ETH_ADDR_LEN);
- flow.nw_proto = ofe->match.nw_proto;
- flow.reserved = 0;
-
- f = ft_lookup(br->ft, &flow);
+ flow_from_match(&flow, &ofe->match);
+
+ f = ft_lookup(br->ft, &flow, flow_hash(&flow, 0));
if (f) {
+ /* Update statistics. */
+ f->last_byte_count = f->byte_count;
+ f->byte_count = ntohll(ofe->byte_count);
+
ft_remove(br->ft, f);
ftf_destroy(f);
} else if (VLOG_IS_DBG_ENABLED()) {
VLOG_DBG_RL(&rl, "received flow expiration for flow not in table");
}
}
+
+static void
+process_stats_reply(struct bridge *br, void *osr)
+{
+ stats_mgr_receive_stats_reply(br->stats_mgr, osr);
+}
+
+static void
+process_error_msg(struct bridge *br, void *oem)
+{
+ stats_mgr_receive_error_msg(br->stats_mgr, oem);
+}
+
+static void
+flow_from_match(struct flow *flow, const struct ofp_match *match)
+{
+ flow->nw_src = match->nw_src;
+ flow->nw_dst = match->nw_dst;
+ flow->in_port = match->in_port;
+ flow->dl_vlan = match->dl_vlan;
+ flow->dl_type = match->dl_type;
+ flow->tp_src = match->tp_src;
+ flow->tp_dst = match->tp_dst;
+ memcpy(flow->dl_src, match->dl_src, ETH_ADDR_LEN);
+ memcpy(flow->dl_dst, match->dl_dst, ETH_ADDR_LEN);
+ flow->nw_proto = match->nw_proto;
+ flow->reserved = 0;
+}
+\f
+/* Flow statistics collection. */
+
+static void
+request_flow_stats(struct bridge *br)
+{
+ struct ofp_flow_stats_request *ofsr;
+ struct ofp_stats_request *osr;
+ struct ofpbuf *msg;
+
+ osr = make_openflow(sizeof *osr + sizeof *ofsr, OFPT_STATS_REQUEST, &msg);
+ osr->type = htons(OFPST_FLOW);
+ osr->flags = htons(0);
+ ofsr = (struct ofp_flow_stats_request *) osr->body;
+ ofsr->match.wildcards = htonl(OFPFW_ALL);
+ ofsr->table_id = 0xff;
+ ofsr->out_port = htons(OFPP_NONE);
+
+ stats_request_destroy(br->flow_rq);
+ br->flow_rq = stats_request_create(br->stats_mgr, msg);
+ br->next_stats_request = time_now() + 10;
+}
+
+static void
+flowstats_process(struct bridge *br)
+{
+ /* This ft_dst will never appear as a valid flow's action. Thus, we can
+ * set it as a flow's action to force revalidation to update that flow. */
+ static const struct ft_dst invalid_dst = {0xffff, 0xffff};
+
+ tag_type flowstats_tag;
+ struct ofpbuf *msg;
+ struct ft *new_ft;
+ size_t n_tagged;
+
+ new_ft = ft_create();
+ flowstats_tag = tag_create_random();
+ n_tagged = 0;
+ while (stats_request_get_reply(br->flow_rq, &msg)) {
+ struct ofp_stats_reply *osr = msg->data;
+ const struct ofp_flow_stats *fs;
+ struct flow_stats_iterator i;
+
+ for (fs = flow_stats_first(&i, osr); fs; fs = flow_stats_next(&i)) {
+ size_t n_actions = ((ntohs(fs->length)
+ - offsetof(struct ofp_flow_stats, actions))
+ / sizeof(struct ofp_action_header));
+ struct ft_flow *f;
+ struct flow flow;
+ size_t hash;
+
+ if (fs->match.wildcards != htonl(0)) {
+ /* XXX delete flow */
+ continue;
+ }
+ flow_from_match(&flow, &fs->match);
+ hash = flow_hash(&flow, 0);
+
+ f = ft_lookup(br->ft, &flow, hash);
+ if (f) {
+ /* Move from bt->ft to new_ft. */
+ ft_remove(br->ft, f);
+ hmap_insert_fast(&new_ft->flows, &f->node, f->node.hash);
+
+ /* Update statistics. */
+ f->last_byte_count = f->byte_count;
+ f->byte_count = ntohll(fs->byte_count);
+
+ /* Force the flow to be revalidated and replaced if its actions
+ * aren't what we expect. */
+ if (!ftd_equal_actions(f->dsts, f->n_dsts,
+ fs->actions, n_actions,
+ ntohs(flow.dl_vlan))) {
+ ftf_set_dsts(f, &invalid_dst, 1);
+ f->tags |= flowstats_tag;
+ n_tagged++;
+ }
+ } else {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+ f = ft_lookup(new_ft, &flow, hash);
+ if (!f) {
+ VLOG_WARN_RL(&rl, "unexpected flow in flow table");
+ f = ftf_create(&flow, &invalid_dst, 1, flowstats_tag);
+ hmap_insert_fast(&new_ft->flows, &f->node, f->node.hash);
+ f->tags |= flowstats_tag;
+ n_tagged++;
+ f->last_byte_count = f->byte_count = fs->byte_count;
+ } else {
+ VLOG_WARN_RL(&rl, "duplicate flow in flow stats reply");
+ }
+ }
+ }
+ ofpbuf_delete(msg);
+ }
+ hmap_expand(&new_ft->flows);
+ hmap_swap(&br->ft->flows, &new_ft->flows);
+ ft_destroy(new_ft);
+ if (n_tagged) {
+ tag_set_add(&br->revalidate_set, flowstats_tag);
+ }
+}
+
+static void
+flowstats_run(struct bridge *br)
+{
+ stats_mgr_run(br->stats_mgr);
+ if (br->flow_rq) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ switch (stats_request_get_status(br->flow_rq)) {
+ case EAGAIN:
+ return;
+
+ case 0:
+ flowstats_process(br);
+ break;
+
+ default:
+ VLOG_WARN_RL(&rl, "flow stats request returned error: %s",
+ strerror(stats_request_get_status(br->flow_rq)));
+ br->next_stats_request = time_now() + 1;
+ break;
+ }
+ stats_request_destroy(br->flow_rq);
+ br->flow_rq = NULL;
+ } else {
+ if (rconn_is_connected(br->rconn)
+ && time_now() >= br->next_stats_request) {
+ request_flow_stats(br);
+ br->next_stats_request = time_now() + 60;
+ }
+ }
+}
+
+static void
+flowstats_wait(struct bridge *br)
+{
+ if (!br->flow_rq && rconn_is_connected(br->rconn)) {
+ poll_timer_wait((br->next_stats_request - time_now()) * 1000);
+ }
+}
\f
/* Port functions. */