#include "coverage.h"
#include "dirs.h"
#include "dpif.h"
+#include "dynamic-string.h"
#include "flow.h"
#include "hash.h"
#include "list.h"
struct port **ports;
size_t n_ports, allocated_ports;
+ /* Bonding. */
+ bool has_bonded_ports;
+ long long int bond_next_rebalance;
+
/* Flow tracking. */
bool flush;
static void bond_run(struct bridge *);
static void bond_wait(struct bridge *);
+static void bond_rebalance_port(struct port *);
static void port_create(struct bridge *, const char *name);
static void port_reconfigure(struct port *);
port_array_init(&br->ifaces);
br->flush = false;
+ br->bond_next_rebalance = time_msec() + 10000;
list_push_back(&all_bridges, &br->node);
return process_flow(br, flow, packet, actions, tags);
}
+static void
+bridge_account_flow_ofhook_cb(const flow_t *flow,
+ const union odp_action *actions,
+ size_t n_actions, unsigned long long int n_bytes,
+ void *br_)
+{
+ struct bridge *br = br_;
+ const union odp_action *a;
+
+ if (!br->has_bonded_ports) {
+ return;
+ }
+
+ for (a = actions; a < &actions[n_actions]; a++) {
+ if (a->type == ODPAT_OUTPUT) {
+ struct port *port = port_from_dp_ifidx(br, a->output.port);
+ if (port && port->n_ifaces >= 2) {
+ struct bond_entry *e = lookup_bond_entry(port, flow->dl_src);
+ e->tx_bytes += n_bytes;
+ }
+ }
+ }
+}
+
+static void
+bridge_account_checkpoint_ofhook_cb(void *br_)
+{
+ struct bridge *br = br_;
+ size_t i;
+
+ if (!br->has_bonded_ports) {
+ return;
+ }
+
+ /* The current ofproto implementation calls this callback at least once a
+ * second, so this timer implementation is sufficient. */
+ if (time_msec() < br->bond_next_rebalance) {
+ return;
+ }
+ br->bond_next_rebalance = time_msec() + 10000;
+
+ for (i = 0; i < br->n_ports; i++) {
+ struct port *port = br->ports[i];
+ if (port->n_ifaces > 1) {
+ bond_rebalance_port(port);
+ }
+ }
+}
+
static struct ofhooks bridge_ofhooks = {
bridge_port_changed_ofhook_cb,
bridge_normal_ofhook_cb,
+ bridge_account_flow_ofhook_cb,
+ bridge_account_checkpoint_ofhook_cb,
};
\f
+/* Statistics for a single interface on a bonded port, used for load-based
+ * bond rebalancing. */
+struct slave_balance {
+ struct iface *iface; /* The interface. */
+ uint64_t tx_bytes; /* Sum of hashes[*]->tx_bytes. */
+
+ /* All the "bond_entry"s that are assigned to this interface, in order of
+ * increasing tx_bytes. */
+ struct bond_entry **hashes;
+ size_t n_hashes;
+};
+
+/* Sorts pointers to pointers to bond_entries in ascending order by the
+ * interface to which they are assigned, and within a single interface in
+ * ascending order of bytes transmitted. */
+static int
+compare_bond_entries(const void *a_, const void *b_)
+{
+ const struct bond_entry *const *ap = a_;
+ const struct bond_entry *const *bp = b_;
+ const struct bond_entry *a = *ap;
+ const struct bond_entry *b = *bp;
+ if (a->iface_idx != b->iface_idx) {
+ return a->iface_idx > b->iface_idx ? 1 : -1;
+ } else if (a->tx_bytes != b->tx_bytes) {
+ return a->tx_bytes > b->tx_bytes ? 1 : -1;
+ } else {
+ return 0;
+ }
+}
+
+/* Sorts slave_balances so that enabled ports come first, and otherwise in
+ * *descending* order by number of bytes transmitted. */
+static int
+compare_slave_balance(const void *a_, const void *b_)
+{
+ const struct slave_balance *a = a_;
+ const struct slave_balance *b = b_;
+ if (a->iface->enabled != b->iface->enabled) {
+ return a->iface->enabled ? -1 : 1;
+ } else if (a->tx_bytes != b->tx_bytes) {
+ return a->tx_bytes > b->tx_bytes ? -1 : 1;
+ } else {
+ return 0;
+ }
+}
+
+static void
+swap_bals(struct slave_balance *a, struct slave_balance *b)
+{
+ struct slave_balance tmp = *a;
+ *a = *b;
+ *b = tmp;
+}
+
+/* Restores the 'n_bals' slave_balance structures in 'bals' to sorted order
+ * given that 'p' (and only 'p') might be in the wrong location.
+ *
+ * This function invalidates 'p', since it might now be in a different memory
+ * location. */
+static void
+resort_bals(struct slave_balance *p,
+ struct slave_balance bals[], size_t n_bals)
+{
+ if (n_bals > 1) {
+ for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) {
+ swap_bals(p, p - 1);
+ }
+ for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) {
+ swap_bals(p, p + 1);
+ }
+ }
+}
+
+static void
+log_bals(const struct slave_balance *bals, size_t n_bals, struct port *port)
+{
+ if (VLOG_IS_DBG_ENABLED()) {
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ const struct slave_balance *b;
+
+ for (b = bals; b < bals + n_bals; b++) {
+ size_t i;
+
+ if (b > bals) {
+ ds_put_char(&ds, ',');
+ }
+ ds_put_format(&ds, " %s %"PRIu64"kB",
+ b->iface->name, b->tx_bytes / 1024);
+
+ if (!b->iface->enabled) {
+ ds_put_cstr(&ds, " (disabled)");
+ }
+ if (b->n_hashes > 0) {
+ ds_put_cstr(&ds, " (");
+ for (i = 0; i < b->n_hashes; i++) {
+ const struct bond_entry *e = b->hashes[i];
+ if (i > 0) {
+ ds_put_cstr(&ds, " + ");
+ }
+ ds_put_format(&ds, "h%td: %"PRIu64"kB",
+ e - port->bond_hash, e->tx_bytes / 1024);
+ }
+ ds_put_cstr(&ds, ")");
+ }
+ }
+ VLOG_DBG("bond %s:%s", port->name, ds_cstr(&ds));
+ ds_destroy(&ds);
+ }
+}
+
+/* Shifts 'hash' from 'from' to 'to' within 'port'. */
+static void
+bond_shift_load(struct slave_balance *from, struct slave_balance *to,
+ struct bond_entry *hash)
+{
+ struct port *port = from->iface->port;
+ uint64_t delta = hash->tx_bytes;
+
+ VLOG_INFO("bond %s: shift %"PRIu64"kB of load (with hash %td) "
+ "from %s to %s (now carrying %"PRIu64"kB and "
+ "%"PRIu64"kB load, respectively)",
+ port->name, delta / 1024, hash - port->bond_hash,
+ from->iface->name, to->iface->name,
+ (from->tx_bytes - delta) / 1024,
+ (to->tx_bytes + delta) / 1024);
+
+ /* Delete element from from->hashes.
+ *
+ * We don't bother to add the element to to->hashes because not only would
+ * it require more work, the only purpose it would be to allow that hash to
+ * be migrated to another slave in this rebalancing run, and there is no
+ * point in doing that. */
+ if (from->hashes[0] == hash) {
+ from->hashes++;
+ } else {
+ int i = hash - from->hashes[0];
+ memmove(from->hashes + i, from->hashes + i + 1,
+ (from->n_hashes - (i + 1)) * sizeof *from->hashes);
+ }
+ from->n_hashes--;
+
+ /* Shift load away from 'from' to 'to'. */
+ from->tx_bytes -= delta;
+ to->tx_bytes += delta;
+
+ /* Arrange for flows to be revalidated. */
+ ofproto_revalidate(port->bridge->ofproto, hash->iface_tag);
+ hash->iface_idx = to->iface->port_ifidx;
+ hash->iface_tag = tag_create_random();
+
+}
+
+static void
+bond_rebalance_port(struct port *port)
+{
+ struct slave_balance bals[DP_MAX_PORTS];
+ size_t n_bals;
+ struct bond_entry *hashes[BOND_MASK + 1];
+ struct slave_balance *b, *from, *to;
+ struct bond_entry *e;
+ size_t i;
+
+ /* Sets up 'bals' to describe each of the port's interfaces, sorted in
+ * descending order of tx_bytes, so that bals[0] represents the most
+ * heavily loaded slave and bals[n_bals - 1] represents the least heavily
+ * loaded slave.
+ *
+ * The code is a bit tricky: to avoid dynamically allocating a 'hashes'
+ * array for each slave_balance structure, we sort our local array of
+ * hashes in order by slave, so that all of the hashes for a given slave
+ * become contiguous in memory, and then we point each 'hashes' members of
+ * a slave_balance structure to the start of a contiguous group. */
+ n_bals = port->n_ifaces;
+ for (b = bals; b < &bals[n_bals]; b++) {
+ b->iface = port->ifaces[b - bals];
+ b->tx_bytes = 0;
+ b->hashes = NULL;
+ b->n_hashes = 0;
+ }
+ for (i = 0; i <= BOND_MASK; i++) {
+ hashes[i] = &port->bond_hash[i];
+ }
+ qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries);
+ for (i = 0; i <= BOND_MASK; i++) {
+ e = hashes[i];
+ if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) {
+ b = &bals[e->iface_idx];
+ b->tx_bytes += e->tx_bytes;
+ if (!b->hashes) {
+ b->hashes = &hashes[i];
+ }
+ b->n_hashes++;
+ }
+ }
+ qsort(bals, n_bals, sizeof *bals, compare_slave_balance);
+ log_bals(bals, n_bals, port);
+
+ /* Discard slaves that aren't enabled (which were sorted to the back of the
+ * array earlier). */
+ while (!bals[n_bals - 1].iface->enabled) {
+ n_bals--;
+ if (!n_bals) {
+ return;
+ }
+ }
+
+ /* Shift load from the most-loaded slaves to the least-loaded slaves. */
+ to = &bals[n_bals - 1];
+ for (from = bals; from < to; ) {
+ uint64_t overload = from->tx_bytes - to->tx_bytes;
+ if (overload < to->tx_bytes >> 5 || overload < 100000) {
+ /* The extra load on 'from' (and all less-loaded slaves), compared
+ * to that of 'to' (the least-loaded slave), is less than ~3%, or
+ * it is less than ~1Mbps. No point in rebalancing. */
+ break;
+ } else if (from->n_hashes == 1) {
+ /* 'from' only carries a single MAC hash, so we can't shift any
+ * load away from it, even though we want to. */
+ from++;
+ } else {
+ /* 'from' is carrying significantly more load than 'to', and that
+ * load is split across at least two different hashes. Pick a hash
+ * to migrate to 'to' (the least-loaded slave), given that doing so
+ * must not cause 'to''s load to exceed 'from''s load.
+ *
+ * The sort order we use means that we prefer to shift away the
+ * smallest hashes instead of the biggest ones. There is little
+ * reason behind this decision; we could use the opposite sort
+ * order to shift away big hashes ahead of small ones. */
+ size_t i;
+
+ for (i = 0; i < from->n_hashes; i++) {
+ uint64_t delta = from->hashes[i]->tx_bytes;
+ if (to->tx_bytes + delta < from->tx_bytes - delta) {
+ break;
+ }
+ }
+ if (i < from->n_hashes) {
+ bond_shift_load(from, to, from->hashes[i]);
+
+ /* Re-sort 'bals'. Note that this may make 'from' and 'to'
+ * point to different slave_balance structures. It is only
+ * valid to do these two operations in a row at all because we
+ * know that 'from' will not move past 'to' and vice versa. */
+ resort_bals(from, bals, n_bals);
+ resort_bals(to, bals, n_bals);
+ } else {
+ from++;
+ }
+ }
+ }
+
+ /* Implement exponentially weighted moving average. A weight of 1/2 causes
+ * historical data to decay to <1% in 7 rebalancing runs. */
+ for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) {
+ e->tx_bytes /= 2;
+ }
+}
+\f
/* Port functions. */
static void
sizeof *port->ifaces);
}
port->ifaces[port->n_ifaces++] = iface;
+ if (port->n_ifaces > 1) {
+ port->bridge->has_bonded_ports = true;
+ }
VLOG_DBG("attached network device %s to port %s", iface->name, port->name);