#include "ofp-print.h"
#include "ofpbuf.h"
#include "poll-loop.h"
+#include "port-array.h"
#include "process.h"
#include "rconn.h"
#include "socket-util.h"
char *name; /* Host network device name. */
int dp_ifidx; /* Index within kernel datapath. */
+
+ tag_type tag; /* Tag associated with this interface. */
+ bool enabled; /* May be chosen for flows? */
+ long long delay_expires; /* Time after which 'enabled' may change. */
};
#define BOND_MASK 0xff
/* Bonding info. */
struct bond_entry *bond_hash; /* An array of (BOND_MASK + 1) elements. */
- int active_iface;
+ int active_iface; /* Ifidx on which bcasts accepted, or -1. */
+ tag_type active_iface_tag; /* Tag for bcast flows. */
+ tag_type no_ifaces_tag; /* Tag for flows when all ifaces disabled. */
+ int updelay, downdelay; /* Delay before iface goes up/down, in ms. */
};
#define DP_MAX_PORTS 255
int txqlen; /* # of messages queued to send on 'rconn'. */
struct mac_learning *ml; /* MAC learning table, or null not to learn. */
int flow_idle_time; /* Idle time for flows we set up. */
- bool sent_config; /* Successfully sent config request? */
+ bool sent_config_request; /* Successfully sent config request? */
+ bool sent_features_request; /* Successfully sent features request? */
/* Secure channel. */
enum {
/* Kernel datapath information. */
int dp_idx; /* Kernel datapath index. */
- struct iface *ifaces[DP_MAX_PORTS]; /* Index by kernel datapath port no. */
+ struct port_array ifaces; /* Indexed by kernel datapath port number. */
/* Bridge ports. */
struct port **ports;
static void flowstats_run(struct bridge *);
static void flowstats_wait(struct bridge *);
+static void bond_run(struct bridge *);
+static void bond_wait(struct bridge *);
+
static void port_create(struct bridge *, const char *name);
static void port_reconfigure(struct port *);
static void port_destroy(struct port *);
+static struct port *port_from_dp_ifidx(const struct bridge *,
+ uint16_t dp_ifidx);
static void iface_create(struct port *, const char *name);
static void iface_destroy(struct iface *);
static struct iface *iface_lookup(const struct bridge *, const char *name);
+static struct iface *iface_from_dp_ifidx(const struct bridge *,
+ uint16_t dp_ifidx);
\f
/* Public functions. */
}
stats_mgr_wait(br->stats_mgr);
flowstats_wait(br);
+ bond_wait(br);
}
}
\f
br->txqlen = 0;
br->ml = mac_learning_create();
br->flow_idle_time = 5;
- br->sent_config = false;
+ br->sent_config_request = false;
+ br->sent_features_request = false;
br->sc_state = SC_UNSTARTED;
br->sc_retries = 0;
br->secchan = NULL;
br->rconn = rconn_create(30, 1);
+ port_array_init(&br->ifaces);
+
br->ft = ft_create();
br->stats_mgr = stats_mgr_create(br->rconn);
goto error;
}
br->sc_state = SC_RUNNING;
- br->sent_config = false;
+ br->sent_config_request = false;
+ br->sent_features_request = false;
br->next_stats_request = time_now();
return;
stats_request_destroy(br->flow_rq);
stats_mgr_destroy(br->stats_mgr);
mac_learning_destroy(br->ml);
+ port_array_destroy(&br->ifaces);
free(br->ports);
free(br);
}
}
static void
-send_set_config(struct bridge *br)
+send_set_config_request(struct bridge *br)
{
struct ofp_switch_config *osc;
struct ofpbuf *msg;
- int retval;
osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &msg);
osc->flags = htons(OFPC_SEND_FLOW_EXP | OFPC_FRAG_NORMAL);
osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
- retval = rconn_send(br->rconn, msg, &br->txqlen);
- if (retval) {
- ofpbuf_delete(msg);
+ if (!rconn_send_with_limit(br->rconn, msg, &br->txqlen, INT_MAX)) {
+ br->sent_config_request = true;
+ }
+}
+
+static void
+send_features_request(struct bridge *br)
+{
+ struct ofp_header *oh;
+ struct ofpbuf *msg;
+
+ oh = make_openflow(sizeof *oh, OFPT_FEATURES_REQUEST, &msg);
+ if (!rconn_send_with_limit(br->rconn, msg, &br->txqlen, INT_MAX)) {
+ br->sent_features_request = true;
}
}
rconn_run(br->rconn);
- if (rconn_is_connected(br->rconn) && !br->sent_config) {
- send_set_config(br);
- br->sent_config = true;
+ if (rconn_is_connected(br->rconn)) {
+ if (!br->sent_config_request) {
+ send_set_config_request(br);
+ }
+ if (!br->sent_features_request) {
+ send_features_request(br);
+ }
}
tag_set_init(&br->revalidate_set);
mac_learning_run(br->ml, &br->revalidate_set);
}
flowstats_run(br);
+ bond_run(br);
/* Now revalidate any flows that need it. */
if (!tag_set_is_empty(&br->revalidate_set)) {
iface->dp_ifidx = -1;
}
}
- for (i = 0; i < ARRAY_SIZE(br->ifaces); i++) {
- br->ifaces[i] = NULL;
- }
+ port_array_clear(&br->ifaces);
/* Open connection to datapath. */
vconn_name = xasprintf("nl:%d", br->dp_idx);
int port_no = ntohs(opp->port_no);
struct iface *iface;
- /* Ignore special ports in general and OFPP_LOCAL in particular. */
sanitize_opp(opp);
- if (port_no >= DP_MAX_PORTS) {
- if (port_no < OFPP_MAX) {
- VLOG_WARN("datapath nl:%d reports having port %d (%s), which "
- "exceeds vswitchd maximum supported port number %d",
- br->dp_idx, port_no, opp->name,
- DP_MAX_PORTS - 1);
- }
- continue;
- }
iface = iface_lookup(br, (const char *) opp->name);
if (iface) {
if (iface->dp_ifidx >= 0) {
VLOG_WARN("datapath nl:%d reported interface %s twice",
br->dp_idx, opp->name);
- } else if (br->ifaces[port_no]) {
+ } else if (iface_from_dp_ifidx(br, port_no)) {
VLOG_WARN("datapath nl:%d reported interface %d twice",
br->dp_idx, port_no);
} else {
+ port_array_set(&br->ifaces, port_no, iface);
iface->dp_ifidx = port_no;
- br->ifaces[port_no] = iface;
}
}
if (iface_names) {
static packet_handler_func process_flow_expired;
static packet_handler_func process_stats_reply;
static packet_handler_func process_error_msg;
+static packet_handler_func process_features_reply;
+static packet_handler_func process_port_status;
static void flow_from_match(struct flow *, const struct ofp_match *);
OFPT_PACKET_IN,
process_packet_in
},
- {
- OFPT_PORT_STATUS,
- NULL
- },
{
OFPT_FLOW_EXPIRED,
process_flow_expired
OFPT_ERROR,
process_error_msg
},
+ {
+ OFPT_FEATURES_REPLY,
+ process_features_reply,
+ },
+ {
+ OFPT_PORT_STATUS,
+ process_port_status
+ },
};
const size_t n_processors = ARRAY_SIZE(processors);
const struct processor *p;
return &port->bond_hash[h & BOND_MASK];
}
-static tag_type
+static int
+bond_choose_iface(const struct port *port)
+{
+ size_t i;
+ for (i = 0; i < port->n_ifaces; i++) {
+ if (port->ifaces[i]->enabled) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static bool
choose_output_iface(const struct port *port, const struct flow *flow,
- uint16_t *dp_ifidx)
+ uint16_t *dp_ifidx, tag_type *tags)
{
+ struct iface *iface;
+
assert(port->n_ifaces);
if (port->n_ifaces == 1) {
- *dp_ifidx = port->ifaces[0]->dp_ifidx;
- return 0;
+ iface = port->ifaces[0];
} else {
struct bond_entry *e = lookup_bond_entry(port, flow->dl_src);
- if (e->iface_idx < 0 || e->iface_idx >= port->n_ifaces) {
+ if (e->iface_idx < 0 || e->iface_idx >= port->n_ifaces
+ || !port->ifaces[e->iface_idx]->enabled) {
/* XXX select interface properly. The current interface selection
* is only good for testing the rebalancing code. */
- e->iface_idx = 0;
+ e->iface_idx = bond_choose_iface(port);
+ if (e->iface_idx < 0) {
+ *tags |= port->no_ifaces_tag;
+ return false;
+ }
e->iface_tag = tag_create_random();
}
- *dp_ifidx = port->ifaces[e->iface_idx]->dp_ifidx;
- return e->iface_tag;
+ *tags |= e->iface_tag;
+ iface = port->ifaces[e->iface_idx];
}
+ *dp_ifidx = iface->dp_ifidx;
+ *tags |= iface->tag; /* Currently only used for bonding. */
+ return true;
}
static void
/* XXX return immediately if no bonded interfaces. */
for (dst = &f->dsts[0]; dst < &f->dsts[f->n_dsts]; dst++) {
- uint16_t dp_ifidx = dst->dp_ifidx;
- struct port *port;
- struct bond_entry *e;
+ struct port *port = port_from_dp_ifidx(br, dst->dp_ifidx);
+ if (port && port->n_ifaces >= 2) {
+ struct bond_entry *e = lookup_bond_entry(port, f->flow.dl_src);
+ e->tx_bytes += f->byte_count - f->last_byte_count;
+ }
+ }
+}
- /* Find the interface and port structure for 'flow'. */
- if (dp_ifidx >= ARRAY_SIZE(br->ifaces) || !br->ifaces[dp_ifidx]) {
- /* shouldn't happen, but... */
- continue;
+static void
+bond_link_status_update(struct iface *iface, bool carrier)
+{
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
+ struct port *port = iface->port;
+
+ if ((carrier == iface->enabled) == (iface->delay_expires == LLONG_MAX)) {
+ /* Nothing to do. */
+ return;
+ }
+ VLOG_WARN_RL(&rl, "interface %s: carrier %s",
+ iface->name, carrier ? "detected" : "dropped");
+ if (carrier == iface->enabled) {
+ iface->delay_expires = LLONG_MAX;
+ VLOG_WARN_RL(&rl, "interface %s: will not be %s",
+ iface->name, carrier ? "disabled" : "enabled");
+ } else {
+ int delay = carrier ? port->updelay : port->downdelay;
+ iface->delay_expires = time_msec() + delay;
+ if (delay) {
+ VLOG_WARN_RL(&rl,
+ "interface %s: will be %s if it stays %s for %d ms",
+ iface->name,
+ carrier ? "enabled" : "disabled",
+ carrier ? "up" : "down",
+ delay);
}
- port = br->ifaces[dp_ifidx]->port;
+ }
+}
+
+static void
+bond_choose_active_iface(struct port *port)
+{
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
+
+ port->active_iface = bond_choose_iface(port);
+ port->active_iface_tag = tag_create_random();
+ if (port->active_iface >= 0) {
+ VLOG_WARN_RL(&rl, "port %s: active interface is now %s",
+ port->name, port->ifaces[port->active_iface]->name);
+ } else {
+ VLOG_WARN_RL(&rl, "port %s: all ports disabled, no active interface",
+ port->name);
+ }
+}
+
+static void
+bond_run(struct bridge *br)
+{
+ size_t i, j;
+
+ for (i = 0; i < br->n_ports; i++) {
+ struct port *port = br->ports[i];
if (port->n_ifaces < 2) {
- /* Not a bonded interface. */
continue;
}
+ for (j = 0; j < port->n_ifaces; j++) {
+ struct iface *iface = port->ifaces[j];
+ if (time_msec() > iface->delay_expires) {
+ iface->delay_expires = LLONG_MAX;
+ iface->enabled = !iface->enabled;
+ VLOG_WARN("interface %s: %s",
+ iface->name,
+ iface->enabled ? "enable" : "disabled");
+ if (!iface->enabled) {
+ tag_set_add(&br->revalidate_set, iface->tag);
+ if (iface->port_ifidx == port->active_iface) {
+ tag_set_add(&br->revalidate_set,
+ port->active_iface_tag);
+ bond_choose_active_iface(port);
+ }
+ } else {
+ if (port->active_iface < 0) {
+ tag_set_add(&br->revalidate_set, port->no_ifaces_tag);
+ bond_choose_active_iface(port);
+ }
+ iface->tag = tag_create_random();
+ }
+ }
+ }
+ }
+}
+
+static void
+bond_wait(struct bridge *br)
+{
+ size_t i, j;
- e = lookup_bond_entry(port, f->flow.dl_src);
- e->tx_bytes += f->byte_count - f->last_byte_count;
+ for (i = 0; i < br->n_ports; i++) {
+ struct port *port = br->ports[i];
+ if (port->n_ifaces < 2) {
+ continue;
+ }
+ for (j = 0; j < port->n_ifaces; j++) {
+ struct iface *iface = port->ifaces[j];
+ if (iface->delay_expires != LLONG_MAX) {
+ poll_timer_wait(iface->delay_expires - time_msec());
+ }
+ }
}
}
-static tag_type
+static bool
set_dst(struct ft_dst *p, const struct flow *flow,
- const struct port *in_port, const struct port *out_port)
+ const struct port *in_port, const struct port *out_port,
+ tag_type *tags)
{
p->vlan = (out_port->vlan ? OFP_VLAN_NONE
: in_port->vlan ? in_port->vlan
: ntohs(flow->dl_vlan));
- return choose_output_iface(out_port, flow, &p->dp_ifidx);
+ return choose_output_iface(out_port, flow, &p->dp_ifidx, tags);
}
static void
dst = vlan_dsts = dsts;
for (i = 0; i < br->n_ports; i++) {
struct port *port = br->ports[i];
- if (port != in_port && (!port->vlan || vlan == port->vlan)) {
+ if (port != in_port && (!port->vlan || vlan == port->vlan)
+ && set_dst(dst, flow, in_port, port, tags)) {
/* Put destinations for original VLAN at the front, so that we
* don't have to add actions to set the VLAN tag for those. */
- *tags |= set_dst(dst, flow, in_port, port);
if (dst->vlan == ntohs(flow->dl_vlan)) {
swap_dst(dst, vlan_dsts++);
}
return dst - dsts;
} else if (out_port) {
/* Unicast. */
- *tags |= set_dst(dsts, flow, in_port, out_port);
- return 1;
+ return set_dst(dsts, flow, in_port, out_port, tags);
} else {
/* Drop. */
return 0;
int vlan;
/* Find the interface and port structure for the received packet. */
- if (in_ifidx >= ARRAY_SIZE(br->ifaces) || !br->ifaces[in_ifidx]) {
+ in_iface = iface_from_dp_ifidx(br, in_ifidx);
+ if (!in_iface) {
struct ft_flow *f;
if (pkt->buf) {
return;
}
- in_iface = br->ifaces[in_ifidx];
in_port = in_iface->port;
/* Figure out what VLAN this packet belongs to.
/* Drop multicast and broadcast packets on inactive bonded interfaces, to
* avoid receiving duplicates. */
- if (in_port->n_ifaces > 0
- && in_port->active_iface != in_iface->port_ifidx
- && eth_addr_is_multicast(flow->dl_dst)) {
- //tags |= in_iface->inactive_iface_tag;
- goto done;
+ if (in_port->n_ifaces > 1 && eth_addr_is_multicast(flow->dl_dst)) {
+ tags |= in_port->active_iface_tag;
+ if (in_port->active_iface != in_iface->port_ifidx) {
+ goto done;
+ }
}
/* MAC learning. */
stats_mgr_receive_error_msg(br->stats_mgr, oem);
}
+static void
+phy_port_changed(struct bridge *br, enum ofp_port_reason reason,
+ const struct ofp_phy_port *opp)
+{
+ struct iface *iface;
+ struct port *port;
+
+ iface = iface_from_dp_ifidx(br, ntohs(opp->port_no));
+ if (!iface) {
+ return;
+ }
+
+ port = iface->port;
+ if (port && port->n_ifaces > 1) {
+ bond_link_status_update(iface,
+ (reason != OFPPR_DELETE
+ && !(opp->state & htonl(OFPPS_LINK_DOWN))));
+ }
+}
+
+static void
+process_features_reply(struct bridge *br, void *osf_)
+{
+ struct ofp_switch_features *osf = osf_;
+ struct ofp_phy_port *pp;
+ size_t n_pp;
+
+ if (check_ofp_message_array(&osf->header, OFPT_FEATURES_REPLY,
+ sizeof *osf, sizeof *osf->ports, &n_pp)) {
+ return;
+ }
+ for (pp = &osf->ports[0]; pp < &osf->ports[n_pp]; pp++) {
+ phy_port_changed(br, OFPPR_MODIFY, pp);
+ }
+}
+
+static void
+process_port_status(struct bridge *br, void *ops_)
+{
+ struct ofp_port_status *ops = ops_;
+
+ if (check_ofp_message(&ops->header, OFPT_PORT_STATUS, sizeof *ops)) {
+ return;
+ }
+ phy_port_changed(br, ops->reason, &ops->desc);
+}
+
static void
flow_from_match(struct flow *flow, const struct ofp_match *match)
{
}
static void
-print_bals(const char *title, const struct slave_balance *bals, size_t n_bals,
+log_bals(const char *title, const struct slave_balance *bals, size_t n_bals,
struct port *port)
{
const struct slave_balance *b;
- printf("slave balance %s:\n", title);
- for (b = bals; b < bals + n_bals; b++) {
- size_t i;
+ if (VLOG_IS_DBG_ENABLED()) {
+ VLOG_DBG("slave balance %s:", title);
+ for (b = bals; b < bals + n_bals; b++) {
+ size_t i;
- printf("\t%s: %"PRIu64" bytes over:\n", b->iface->name, b->tx_bytes);
- for (i = 0; i < b->n_hashes; i++) {
- const struct bond_entry *e = b->hashes[i];
- printf("\t\thash %td: %"PRIu64" bytes\n",
- e - port->bond_hash, e->tx_bytes);
+ VLOG_DBG("\t%s: %"PRIu64" bytes over:",
+ b->iface->name, b->tx_bytes);
+ for (i = 0; i < b->n_hashes; i++) {
+ const struct bond_entry *e = b->hashes[i];
+ VLOG_DBG("\t\thash %td: %"PRIu64" bytes",
+ e - port->bond_hash, e->tx_bytes);
+ }
}
}
}
/* Sort in decreasing order of tx_bytes. */
qsort(bals, port->n_ifaces, sizeof *bals, compare_slave_balance);
- print_bals("before", bals, port->n_ifaces, port);
/* Shift load from the most-heavily-loaded slaves to the
* least-heavily-loaded slaves. */
least_loaded = &bals[port->n_ifaces - 1];
+ log_bals("before", bals, port->n_ifaces, port);
for (b = bals; b < least_loaded; ) {
uint64_t overload = b->tx_bytes - least_loaded->tx_bytes;
if (overload < least_loaded->tx_bytes >> 5 || overload < 100000) {
again: ;
}
-
- print_bals("after", bals, port->n_ifaces, port);
+ log_bals("after", bals, port->n_ifaces, port);
/* Implement exponentially weighted moving average. A weight of 1/2 causes
* historical data to decay to <1% in 7 rebalancing runs. */
VLOG_WARN("port %s: only 1 interface specified for bonded port",
port->name);
}
+
+ port->updelay = cfg_get_int(0, "bonding.%s.updelay", port->name);
+ if (port->updelay < 0) {
+ port->updelay = 0;
+ }
+ port->downdelay = cfg_get_int(0, "bonding.%s.downdelay", port->name);
+ if (port->downdelay < 0) {
+ port->downdelay = 0;
+ }
} else {
svec_init(&new_ifaces);
svec_add(&new_ifaces, port->name);
}
}
+static struct port *
+port_from_dp_ifidx(const struct bridge *br, uint16_t dp_ifidx)
+{
+ struct iface *iface = iface_from_dp_ifidx(br, dp_ifidx);
+ return iface ? iface->port : NULL;
+}
+
static void
port_update_bonding(struct port *port)
{
e->iface_idx = -1;
e->tx_bytes = 0;
}
- port->active_iface = 0;
+ bond_choose_active_iface(port);
+ port->no_ifaces_tag = tag_create_random();
}
}
}
iface->port_ifidx = port->n_ifaces;
iface->name = xstrdup(name);
iface->dp_ifidx = -1;
+ iface->tag = tag_create_random();
+ iface->enabled = true;
+ iface->delay_expires = LLONG_MAX;
if (port->n_ifaces >= port->allocated_ifaces) {
port->ifaces = x2nrealloc(port->ifaces, &port->allocated_ifaces,
}
return NULL;
}
+
+static struct iface *
+iface_from_dp_ifidx(const struct bridge *br, uint16_t dp_ifidx)
+{
+ return port_array_get(&br->ifaces, dp_ifidx);
+}