#define BOND_MASK 0xff
struct bond_entry {
int iface_idx; /* Index of assigned iface, or -1 if none. */
+ uint64_t tx_bytes; /* Count of bytes recently transmitted. */
+ tag_type iface_tag; /* Tag associated with iface_idx. */
};
#define FLOOD_PORT ((struct port *) 1) /* The 'flood' output port. */
}
}
-static struct iface *
-choose_output_iface(const struct port *port, const struct flow *flow)
+static struct bond_entry *
+lookup_bond_entry(const struct port *port, const uint8_t mac[ETH_ADDR_LEN])
+{
+ size_t h = hash_fnv(mac, ETH_ADDR_LEN, HASH_FNV_BASIS);
+ return &port->bond_hash[h & BOND_MASK];
+}
+
+static tag_type
+choose_output_iface(const struct port *port, const struct flow *flow,
+ uint16_t *dp_ifidx)
{
assert(port->n_ifaces);
if (port->n_ifaces == 1) {
- return port->ifaces[0];
+ *dp_ifidx = port->ifaces[0]->dp_ifidx;
+ return 0;
} else {
- size_t h = hash_fnv(flow->dl_src, sizeof flow->dl_src, HASH_FNV_BASIS);
- struct bond_entry *e = &port->bond_hash[h & BOND_MASK];
+ struct bond_entry *e = lookup_bond_entry(port, flow->dl_src);
if (e->iface_idx < 0 || e->iface_idx >= port->n_ifaces) {
- /* XXX select interface properly */
- static int count = 0;
- e->iface_idx = count++ % port->n_ifaces;
+ /* XXX select interface properly. The current interface selection
+ * is only good for testing the rebalancing code. */
+ e->iface_idx = 0;
+ e->iface_tag = tag_create_random();
}
- return port->ifaces[e->iface_idx];
+ *dp_ifidx = port->ifaces[e->iface_idx]->dp_ifidx;
+ return e->iface_tag;
}
}
static void
+bond_account_flow(struct bridge *br, const struct ft_flow *f)
+{
+ const struct ft_dst *dst;
+
+ if (f->byte_count <= f->last_byte_count) {
+ /* No bytes to add, so don't waste our time. */
+ return;
+ }
+ /* XXX return immediately if no bonded interfaces. */
+
+ for (dst = &f->dsts[0]; dst < &f->dsts[f->n_dsts]; dst++) {
+ uint16_t dp_ifidx = dst->dp_ifidx;
+ struct port *port;
+ struct bond_entry *e;
+
+ /* Find the interface and port structure for 'flow'. */
+ if (dp_ifidx >= ARRAY_SIZE(br->ifaces) || !br->ifaces[dp_ifidx]) {
+ /* shouldn't happen, but... */
+ continue;
+ }
+ port = br->ifaces[dp_ifidx]->port;
+ if (port->n_ifaces < 2) {
+ /* Not a bonded interface. */
+ continue;
+ }
+
+ e = lookup_bond_entry(port, f->flow.dl_src);
+ e->tx_bytes += f->byte_count - f->last_byte_count;
+ }
+}
+
+static tag_type
set_dst(struct ft_dst *p, const struct flow *flow,
const struct port *in_port, const struct port *out_port)
{
p->vlan = (out_port->vlan ? OFP_VLAN_NONE
: in_port->vlan ? in_port->vlan
: ntohs(flow->dl_vlan));
- p->dp_ifidx = choose_output_iface(out_port, flow)->dp_ifidx;
+ return choose_output_iface(out_port, flow, &p->dp_ifidx);
}
static void
{
if (out_port == FLOOD_PORT) {
/* Flood. */
+ /* XXX use OFPP_FLOOD if no vlans or bonding. */
struct ft_dst *dst; /* Next element in 'dsts'. */
struct ft_dst *vlan_dsts; /* First 'dsts' with vlan != flow->dl_vlan */
size_t i;
}
f->tags = tags;
} else {
- ft_insert(br->ft, ftf_create(flow, dsts, n_dsts, tags));
+ f = ftf_create(flow, dsts, n_dsts, tags);
+ ft_insert(br->ft, f);
queue = true;
}
/* Update statistics. */
f->last_byte_count = f->byte_count;
f->byte_count = ntohll(ofe->byte_count);
+ bond_account_flow(br, f);
ft_remove(br->ft, f);
ftf_destroy(f);
\f
/* Flow statistics collection. */
+struct slave_balance {
+ struct iface *iface;
+ uint64_t tx_bytes;
+ struct bond_entry **hashes;
+ size_t n_hashes;
+};
+
+/* Sorts pointers to pointers to bond_entries in ascending order by the
+ * interface to which they are assigned, and within a single interface in
+ * ascending order of bytes transmitted. */
+static int
+compare_bond_entries(const void *a_, const void *b_)
+{
+ const struct bond_entry *const *ap = a_;
+ const struct bond_entry *const *bp = b_;
+ const struct bond_entry *a = *ap;
+ const struct bond_entry *b = *bp;
+ if (a->iface_idx != b->iface_idx) {
+ return a->iface_idx > b->iface_idx ? 1 : -1;
+ } else if (a->tx_bytes != b->tx_bytes) {
+ return a->tx_bytes > b->tx_bytes ? 1 : -1;
+ } else {
+ return 0;
+ }
+}
+
+/* Sorts slave_balances in *descending* order by number of bytes
+ * transmitted. */
+static int
+compare_slave_balance(const void *a_, const void *b_)
+{
+ const struct slave_balance *a = a_;
+ const struct slave_balance *b = b_;
+ return a->tx_bytes > b->tx_bytes ? -1 : a->tx_bytes < b->tx_bytes;
+}
+
+static void
+swap_bals(struct slave_balance *a, struct slave_balance *b)
+{
+ struct slave_balance tmp = *a;
+ *a = *b;
+ *b = tmp;
+}
+
+static void
+resort_bals(struct slave_balance *p, struct slave_balance *bals, size_t n_bals)
+{
+ if (n_bals > 1) {
+ for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) {
+ swap_bals(p, p - 1);
+ }
+ for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) {
+ swap_bals(p, p + 1);
+ }
+ }
+}
+
+static void
+print_bals(const char *title, const struct slave_balance *bals, size_t n_bals,
+ struct port *port)
+{
+ const struct slave_balance *b;
+
+ printf("slave balance %s:\n", title);
+ for (b = bals; b < bals + n_bals; b++) {
+ size_t i;
+
+ printf("\t%s: %"PRIu64" bytes over:\n", b->iface->name, b->tx_bytes);
+ for (i = 0; i < b->n_hashes; i++) {
+ const struct bond_entry *e = b->hashes[i];
+ printf("\t\thash %td: %"PRIu64" bytes\n",
+ e - port->bond_hash, e->tx_bytes);
+ }
+ }
+}
+
+static void
+bond_rebalance_port(struct port *port)
+{
+ struct slave_balance bals[DP_MAX_PORTS];
+ struct bond_entry *hashes[BOND_MASK + 1];
+ struct slave_balance *b, *least_loaded;
+ struct bond_entry *e;
+ size_t i;
+
+ for (b = bals; b < &bals[port->n_ifaces]; b++) {
+ b->iface = port->ifaces[b - bals];
+ b->tx_bytes = 0;
+ b->hashes = NULL;
+ b->n_hashes = 0;
+ }
+ for (i = 0; i <= BOND_MASK; i++) {
+ hashes[i] = &port->bond_hash[i];
+ }
+ qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries);
+ for (i = 0; i <= BOND_MASK; i++) {
+ e = hashes[i];
+ if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) {
+ b = &bals[e->iface_idx];
+ b->tx_bytes += e->tx_bytes;
+ if (!b->hashes) {
+ b->hashes = &hashes[i];
+ }
+ b->n_hashes++;
+ }
+ }
+
+ /* Sort in decreasing order of tx_bytes. */
+ qsort(bals, port->n_ifaces, sizeof *bals, compare_slave_balance);
+
+ print_bals("before", bals, port->n_ifaces, port);
+
+ /* Shift load from the most-heavily-loaded slaves to the
+ * least-heavily-loaded slaves. */
+ least_loaded = &bals[port->n_ifaces - 1];
+ for (b = bals; b < least_loaded; ) {
+ uint64_t overload = b->tx_bytes - least_loaded->tx_bytes;
+ if (overload < least_loaded->tx_bytes >> 5 || overload < 100000) {
+ /* The extra load on this slave (and all less-loaded slaves),
+ * compared to that of the least-loaded slave, is less than ~3%, or
+ * it is less than ~1Mbps. No point in rebalancing. */
+ break;
+ }
+
+ if (b->n_hashes == 1) {
+ /* This slave only carries a single MAC hash, so we can't shift any
+ * load away from it, even though we want to. */
+ b++;
+ continue;
+ }
+
+ for (i = 0; i < b->n_hashes; i++) {
+ e = b->hashes[i];
+ if (least_loaded->tx_bytes + e->tx_bytes
+ < b->tx_bytes - e->tx_bytes) {
+ /* Delete element from e->hashes. */
+ if (!i) {
+ b->hashes++;
+ } else {
+ memmove(b->hashes + i, b->hashes + i + 1,
+ (b->n_hashes - (i + 1)) * sizeof *b->hashes);
+ }
+ b->n_hashes--;
+
+ /* Shift load away from 'b'. */
+ b->tx_bytes -= e->tx_bytes;
+ resort_bals(b, bals, port->n_ifaces);
+
+ /* Shift load to 'least_loaded'. */
+ tag_set_add(&port->bridge->revalidate_set, e->iface_tag);
+ e->iface_idx = least_loaded->iface->port_ifidx;
+ e->iface_tag = tag_create_random();
+ least_loaded->tx_bytes += e->tx_bytes;
+ resort_bals(least_loaded, bals, port->n_ifaces);
+ goto again;
+ }
+ }
+ b++;
+
+ again: ;
+ }
+
+ print_bals("after", bals, port->n_ifaces, port);
+
+ /* Implement exponentially weighted moving average. A weight of 1/2 causes
+ * historical data to decay to <1% in 7 rebalancing runs. */
+ for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) {
+ e->tx_bytes /= 2;
+ }
+}
+
+static void
+bond_rebalance(struct bridge *br)
+{
+ size_t i;
+
+ for (i = 0; i < br->n_ports; i++) {
+ struct port *port = br->ports[i];
+ if (port->n_ifaces < 2) {
+ continue;
+ }
+
+ bond_rebalance_port(port);
+ }
+}
+
static void
request_flow_stats(struct bridge *br)
{
stats_request_destroy(br->flow_rq);
br->flow_rq = stats_request_create(br->stats_mgr, msg);
- br->next_stats_request = time_now() + 10;
}
static void
ftf_set_dsts(f, &invalid_dst, 1);
f->tags |= flowstats_tag;
n_tagged++;
+ } else {
+ bond_account_flow(br, f);
}
} else {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
}
ofpbuf_delete(msg);
}
+
+ /* Delete all the flows that remain in br->ft, replacing them by the ones
+ * from new_ft. */
hmap_expand(&new_ft->flows);
hmap_swap(&br->ft->flows, &new_ft->flows);
ft_destroy(new_ft);
+
+ /* If we tagged anything for revalidation, add that tag to the revalidation
+ * set. */
if (n_tagged) {
tag_set_add(&br->revalidate_set, flowstats_tag);
}
+
+ /* Rebalance any bonded ports. */
+ bond_rebalance(br);
}
static void
if (rconn_is_connected(br->rconn)
&& time_now() >= br->next_stats_request) {
request_flow_stats(br);
- br->next_stats_request = time_now() + 60;
+ br->next_stats_request = time_now() + 10;
}
}
}
for (i = 0; i <= BOND_MASK; i++) {
struct bond_entry *e = &port->bond_hash[i];
e->iface_idx = -1;
+ e->tx_bytes = 0;
}
port->active_iface = 0;
}