vswitch: Reimplement bond rebalancing.
authorBen Pfaff <blp@nicira.com>
Fri, 5 Jun 2009 17:25:49 +0000 (10:25 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 5 Jun 2009 17:25:49 +0000 (10:25 -0700)
In addition to restoring the bond rebalancing features that were lost some
time ago, this also:

  - Refactors the code and adds comments to make it more easily
    understandable.

  - Improves log messages.

  - Fixes a bug noticed during testing where the bond rebalancer would
    shift load to interfaces that had been disabled.

Bug #1174.

secchan/ofproto.c
secchan/ofproto.h
vswitchd/bridge.c

index 9a70f3e19d6e8b887dbcc60bf816f08ab3a29f52..6454f4800706a2d28d6b760a46dec8cdee4fb6b4 100644 (file)
@@ -108,6 +108,7 @@ struct rule {
     long long int created;      /* Creation time. */
     uint64_t packet_count;      /* Number of packets received. */
     uint64_t byte_count;        /* Number of bytes received. */
+    uint64_t accounted_bytes;   /* Number of bytes passed to account_cb. */
     uint8_t tcp_flags;          /* Bitwise-OR of all TCP flags seen. */
     uint8_t ip_tos;             /* Last-seen IP type-of-service. */
     tag_type tags;              /* Tags (set only by hooks). */
@@ -862,6 +863,14 @@ ofproto_run1(struct ofproto *p)
         update_used(p);
 
         classifier_for_each(&p->cls, CLS_INC_ALL, expire_rule, p);
+
+        /* Let the hook know that we're at a stable point: all outstanding data
+         * in existing flows has been accounted to the account_cb.  Thus, the
+         * hook can now reasonably do operations that depend on having accurate
+         * flow volume accounting (currently, that's just bond rebalancing). */
+        if (p->ofhooks->account_checkpoint_cb) {
+            p->ofhooks->account_checkpoint_cb(p->aux);
+        }
     }
 
     if (p->netflow) {
@@ -1637,6 +1646,21 @@ rule_update_actions(struct ofproto *ofproto, struct rule *rule)
     }
 }
 
+static void
+rule_account(struct ofproto *ofproto, struct rule *rule, uint64_t extra_bytes)
+{
+    uint64_t total_bytes = rule->byte_count + extra_bytes;
+
+    if (ofproto->ofhooks->account_flow_cb
+        && total_bytes > rule->accounted_bytes)
+    {
+        ofproto->ofhooks->account_flow_cb(
+            &rule->cr.flow, rule->odp_actions, rule->n_odp_actions,
+            total_bytes - rule->accounted_bytes, ofproto->aux);
+        rule->accounted_bytes = total_bytes;
+    }
+}
+
 static void
 rule_uninstall(struct ofproto *p, struct rule *rule)
 {
@@ -1661,6 +1685,7 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule)
 {
     struct rule *super = rule->super;
 
+    rule_account(ofproto, rule, 0);
     if (ofproto->netflow) {
         struct ofexpired expired;
         expired.flow = rule->cr.flow;
@@ -1685,6 +1710,7 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule)
      * reinstalled. */
     rule->packet_count = 0;
     rule->byte_count = 0;
+    rule->accounted_bytes = 0;
     rule->tcp_flags = 0;
     rule->ip_tos = 0;
 }
@@ -3148,6 +3174,7 @@ update_used(struct ofproto *p)
         }
 
         update_time(rule, &f->stats);
+        rule_account(p, rule, f->stats.n_bytes);
     }
     free(flows);
 }
@@ -3290,4 +3317,6 @@ default_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet,
 static const struct ofhooks default_ofhooks = {
     NULL,
     default_normal_ofhook_cb,
+    NULL,
+    NULL
 };
index df90d0b9f37b461bb37ab2118b51a194631866c9..b01ba1fd7a1df7271fe6de224412d4ccb1039f3e 100644 (file)
@@ -115,6 +115,10 @@ struct ofhooks {
                             void *aux);
     bool (*normal_cb)(const flow_t *, const struct ofpbuf *packet,
                       struct odp_actions *, tag_type *, void *aux);
+    void (*account_flow_cb)(const flow_t *, const union odp_action *,
+                            size_t n_actions, unsigned long long int n_bytes,
+                            void *aux);
+    void (*account_checkpoint_cb)(void *aux);
 };
 void ofproto_revalidate(struct ofproto *, tag_type);
 struct tag_set *ofproto_get_revalidate_set(struct ofproto *);
index ce23a318c8f7269d41c7e03d56640e1eb7baf18f..bd04c11f339b603d82e3677a2d5b008cc2952a53 100644 (file)
@@ -45,6 +45,7 @@
 #include "coverage.h"
 #include "dirs.h"
 #include "dpif.h"
+#include "dynamic-string.h"
 #include "flow.h"
 #include "hash.h"
 #include "list.h"
@@ -173,6 +174,10 @@ struct bridge {
     struct port **ports;
     size_t n_ports, allocated_ports;
 
+    /* Bonding. */
+    bool has_bonded_ports;
+    long long int bond_next_rebalance;
+
     /* Flow tracking. */
     bool flush;
 
@@ -212,6 +217,7 @@ static uint64_t dpid_from_hash(const void *, size_t nbytes);
 
 static void bond_run(struct bridge *);
 static void bond_wait(struct bridge *);
+static void bond_rebalance_port(struct port *);
 
 static void port_create(struct bridge *, const char *name);
 static void port_reconfigure(struct port *);
@@ -824,6 +830,7 @@ bridge_create(const char *name)
     port_array_init(&br->ifaces);
 
     br->flush = false;
+    br->bond_next_rebalance = time_msec() + 10000;
 
     list_push_back(&all_bridges, &br->node);
 
@@ -1845,11 +1852,322 @@ bridge_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet,
     return process_flow(br, flow, packet, actions, tags);
 }
 
+static void
+bridge_account_flow_ofhook_cb(const flow_t *flow,
+                              const union odp_action *actions,
+                              size_t n_actions, unsigned long long int n_bytes,
+                              void *br_)
+{
+    struct bridge *br = br_;
+    const union odp_action *a;
+
+    if (!br->has_bonded_ports) {
+        return;
+    }
+
+    for (a = actions; a < &actions[n_actions]; a++) {
+        if (a->type == ODPAT_OUTPUT) {
+            struct port *port = port_from_dp_ifidx(br, a->output.port);
+            if (port && port->n_ifaces >= 2) {
+                struct bond_entry *e = lookup_bond_entry(port, flow->dl_src);
+                e->tx_bytes += n_bytes;
+            }
+        }
+    }
+}
+
+static void
+bridge_account_checkpoint_ofhook_cb(void *br_)
+{
+    struct bridge *br = br_;
+    size_t i;
+
+    if (!br->has_bonded_ports) {
+        return;
+    }
+
+    /* The current ofproto implementation calls this callback at least once a
+     * second, so this timer implementation is sufficient. */
+    if (time_msec() < br->bond_next_rebalance) {
+        return;
+    }
+    br->bond_next_rebalance = time_msec() + 10000;
+
+    for (i = 0; i < br->n_ports; i++) {
+        struct port *port = br->ports[i];
+        if (port->n_ifaces > 1) {
+            bond_rebalance_port(port);
+        }
+    }
+}
+
 static struct ofhooks bridge_ofhooks = {
     bridge_port_changed_ofhook_cb,
     bridge_normal_ofhook_cb,
+    bridge_account_flow_ofhook_cb,
+    bridge_account_checkpoint_ofhook_cb,
 };
 \f
+/* Statistics for a single interface on a bonded port, used for load-based
+ * bond rebalancing.  */
+struct slave_balance {
+    struct iface *iface;        /* The interface. */
+    uint64_t tx_bytes;          /* Sum of hashes[*]->tx_bytes. */
+
+    /* All the "bond_entry"s that are assigned to this interface, in order of
+     * increasing tx_bytes. */
+    struct bond_entry **hashes;
+    size_t n_hashes;
+};
+
+/* Sorts pointers to pointers to bond_entries in ascending order by the
+ * interface to which they are assigned, and within a single interface in
+ * ascending order of bytes transmitted. */
+static int
+compare_bond_entries(const void *a_, const void *b_)
+{
+    const struct bond_entry *const *ap = a_;
+    const struct bond_entry *const *bp = b_;
+    const struct bond_entry *a = *ap;
+    const struct bond_entry *b = *bp;
+    if (a->iface_idx != b->iface_idx) {
+        return a->iface_idx > b->iface_idx ? 1 : -1;
+    } else if (a->tx_bytes != b->tx_bytes) {
+        return a->tx_bytes > b->tx_bytes ? 1 : -1;
+    } else {
+        return 0;
+    }
+}
+
+/* Sorts slave_balances so that enabled ports come first, and otherwise in
+ * *descending* order by number of bytes transmitted. */
+static int
+compare_slave_balance(const void *a_, const void *b_)
+{
+    const struct slave_balance *a = a_;
+    const struct slave_balance *b = b_;
+    if (a->iface->enabled != b->iface->enabled) {
+        return a->iface->enabled ? -1 : 1;
+    } else if (a->tx_bytes != b->tx_bytes) {
+        return a->tx_bytes > b->tx_bytes ? -1 : 1;
+    } else {
+        return 0;
+    }
+}
+
+static void
+swap_bals(struct slave_balance *a, struct slave_balance *b)
+{
+    struct slave_balance tmp = *a;
+    *a = *b;
+    *b = tmp;
+}
+
+/* Restores the 'n_bals' slave_balance structures in 'bals' to sorted order
+ * given that 'p' (and only 'p') might be in the wrong location.
+ *
+ * This function invalidates 'p', since it might now be in a different memory
+ * location. */
+static void
+resort_bals(struct slave_balance *p,
+            struct slave_balance bals[], size_t n_bals)
+{
+    if (n_bals > 1) {
+        for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) {
+            swap_bals(p, p - 1);
+        }
+        for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) {
+            swap_bals(p, p + 1);
+        }
+    }
+}
+
+static void
+log_bals(const struct slave_balance *bals, size_t n_bals, struct port *port)
+{
+    if (VLOG_IS_DBG_ENABLED()) {
+        struct ds ds = DS_EMPTY_INITIALIZER;
+        const struct slave_balance *b;
+
+        for (b = bals; b < bals + n_bals; b++) {
+            size_t i;
+
+            if (b > bals) {
+                ds_put_char(&ds, ',');
+            }
+            ds_put_format(&ds, " %s %"PRIu64"kB",
+                          b->iface->name, b->tx_bytes / 1024);
+
+            if (!b->iface->enabled) {
+                ds_put_cstr(&ds, " (disabled)");
+            }
+            if (b->n_hashes > 0) {
+                ds_put_cstr(&ds, " (");
+                for (i = 0; i < b->n_hashes; i++) {
+                    const struct bond_entry *e = b->hashes[i];
+                    if (i > 0) {
+                        ds_put_cstr(&ds, " + ");
+                    }
+                    ds_put_format(&ds, "h%td: %"PRIu64"kB",
+                                  e - port->bond_hash, e->tx_bytes / 1024);
+                }
+                ds_put_cstr(&ds, ")");
+            }
+        }
+        VLOG_DBG("bond %s:%s", port->name, ds_cstr(&ds));
+        ds_destroy(&ds);
+    }
+}
+
+/* Shifts 'hash' from 'from' to 'to' within 'port'. */
+static void
+bond_shift_load(struct slave_balance *from, struct slave_balance *to,
+                struct bond_entry *hash)
+{
+    struct port *port = from->iface->port;
+    uint64_t delta = hash->tx_bytes;
+
+    VLOG_INFO("bond %s: shift %"PRIu64"kB of load (with hash %td) "
+              "from %s to %s (now carrying %"PRIu64"kB and "
+              "%"PRIu64"kB load, respectively)",
+              port->name, delta / 1024, hash - port->bond_hash,
+              from->iface->name, to->iface->name,
+              (from->tx_bytes - delta) / 1024,
+              (to->tx_bytes + delta) / 1024);
+
+    /* Delete element from from->hashes.
+     *
+     * We don't bother to add the element to to->hashes because not only would
+     * it require more work, the only purpose it would be to allow that hash to
+     * be migrated to another slave in this rebalancing run, and there is no
+     * point in doing that.  */
+    if (from->hashes[0] == hash) {
+        from->hashes++;
+    } else {
+        int i = hash - from->hashes[0];
+        memmove(from->hashes + i, from->hashes + i + 1,
+                (from->n_hashes - (i + 1)) * sizeof *from->hashes);
+    }
+    from->n_hashes--;
+
+    /* Shift load away from 'from' to 'to'. */
+    from->tx_bytes -= delta;
+    to->tx_bytes += delta;
+
+    /* Arrange for flows to be revalidated. */
+    ofproto_revalidate(port->bridge->ofproto, hash->iface_tag);
+    hash->iface_idx = to->iface->port_ifidx;
+    hash->iface_tag = tag_create_random();
+
+}
+
+static void
+bond_rebalance_port(struct port *port)
+{
+    struct slave_balance bals[DP_MAX_PORTS];
+    size_t n_bals;
+    struct bond_entry *hashes[BOND_MASK + 1];
+    struct slave_balance *b, *from, *to;
+    struct bond_entry *e;
+    size_t i;
+
+    /* Sets up 'bals' to describe each of the port's interfaces, sorted in
+     * descending order of tx_bytes, so that bals[0] represents the most
+     * heavily loaded slave and bals[n_bals - 1] represents the least heavily
+     * loaded slave.
+     *
+     * The code is a bit tricky: to avoid dynamically allocating a 'hashes'
+     * array for each slave_balance structure, we sort our local array of
+     * hashes in order by slave, so that all of the hashes for a given slave
+     * become contiguous in memory, and then we point each 'hashes' members of
+     * a slave_balance structure to the start of a contiguous group. */
+    n_bals = port->n_ifaces;
+    for (b = bals; b < &bals[n_bals]; b++) {
+        b->iface = port->ifaces[b - bals];
+        b->tx_bytes = 0;
+        b->hashes = NULL;
+        b->n_hashes = 0;
+    }
+    for (i = 0; i <= BOND_MASK; i++) {
+        hashes[i] = &port->bond_hash[i];
+    }
+    qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries);
+    for (i = 0; i <= BOND_MASK; i++) {
+        e = hashes[i];
+        if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) {
+            b = &bals[e->iface_idx];
+            b->tx_bytes += e->tx_bytes;
+            if (!b->hashes) {
+                b->hashes = &hashes[i];
+            }
+            b->n_hashes++;
+        }
+    }
+    qsort(bals, n_bals, sizeof *bals, compare_slave_balance);
+    log_bals(bals, n_bals, port);
+
+    /* Discard slaves that aren't enabled (which were sorted to the back of the
+     * array earlier). */
+    while (!bals[n_bals - 1].iface->enabled) {
+        n_bals--;
+        if (!n_bals) {
+            return;
+        }
+    }
+
+    /* Shift load from the most-loaded slaves to the least-loaded slaves. */
+    to = &bals[n_bals - 1];
+    for (from = bals; from < to; ) {
+        uint64_t overload = from->tx_bytes - to->tx_bytes;
+        if (overload < to->tx_bytes >> 5 || overload < 100000) {
+            /* The extra load on 'from' (and all less-loaded slaves), compared
+             * to that of 'to' (the least-loaded slave), is less than ~3%, or
+             * it is less than ~1Mbps.  No point in rebalancing. */
+            break;
+        } else if (from->n_hashes == 1) {
+            /* 'from' only carries a single MAC hash, so we can't shift any
+             * load away from it, even though we want to. */
+            from++;
+        } else {
+            /* 'from' is carrying significantly more load than 'to', and that
+             * load is split across at least two different hashes.  Pick a hash
+             * to migrate to 'to' (the least-loaded slave), given that doing so
+             * must not cause 'to''s load to exceed 'from''s load.
+             *
+             * The sort order we use means that we prefer to shift away the
+             * smallest hashes instead of the biggest ones.  There is little
+             * reason behind this decision; we could use the opposite sort
+             * order to shift away big hashes ahead of small ones. */
+            size_t i;
+
+            for (i = 0; i < from->n_hashes; i++) {
+                uint64_t delta = from->hashes[i]->tx_bytes;
+                if (to->tx_bytes + delta < from->tx_bytes - delta) {
+                    break;
+                }
+            }
+            if (i < from->n_hashes) {
+                bond_shift_load(from, to, from->hashes[i]);
+
+                /* Re-sort 'bals'.  Note that this may make 'from' and 'to'
+                 * point to different slave_balance structures.  It is only
+                 * valid to do these two operations in a row at all because we
+                 * know that 'from' will not move past 'to' and vice versa. */
+                resort_bals(from, bals, n_bals);
+                resort_bals(to, bals, n_bals);
+            } else {
+                from++;
+            }
+        }
+    }
+
+    /* Implement exponentially weighted moving average.  A weight of 1/2 causes
+     * historical data to decay to <1% in 7 rebalancing runs.  */
+    for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) {
+        e->tx_bytes /= 2;
+    }
+}
+\f
 /* Port functions. */
 
 static void
@@ -2172,6 +2490,9 @@ iface_create(struct port *port, const char *name)
                                   sizeof *port->ifaces);
     }
     port->ifaces[port->n_ifaces++] = iface;
+    if (port->n_ifaces > 1) {
+        port->bridge->has_bonded_ports = true;
+    }
 
     VLOG_DBG("attached network device %s to port %s", iface->name, port->name);