vswitchd: Basic bonding rebalancing works.
authorBen Pfaff <blp@nicira.com>
Sat, 27 Dec 2008 00:47:35 +0000 (16:47 -0800)
committerBen Pfaff <blp@nicira.com>
Sat, 27 Dec 2008 00:48:29 +0000 (16:48 -0800)
So far only tested with hping3.  At least, need to make sure that existing
flows get redirected through the new interface as well.

vswitchd/bridge.c

index 998208b0692d8daaa35f9c4b85f5a0f6970d583e..2e67bec6ce973a1a112542c4dd41640774373d51 100644 (file)
@@ -80,6 +80,8 @@ struct iface {
 #define BOND_MASK 0xff
 struct bond_entry {
     int iface_idx;              /* Index of assigned iface, or -1 if none. */
+    uint64_t tx_bytes;          /* Count of bytes recently transmitted. */
+    tag_type iface_tag;         /* Tag associated with iface_idx. */
 };
 
 #define FLOOD_PORT ((struct port *) 1) /* The 'flood' output port. */
@@ -913,32 +915,74 @@ queue_tx(struct bridge *br, struct ofpbuf *msg)
     }
 }
 
-static struct iface *
-choose_output_iface(const struct port *port, const struct flow *flow)
+static struct bond_entry *
+lookup_bond_entry(const struct port *port, const uint8_t mac[ETH_ADDR_LEN])
+{
+    size_t h = hash_fnv(mac, ETH_ADDR_LEN, HASH_FNV_BASIS);
+    return &port->bond_hash[h & BOND_MASK];
+}
+
+static tag_type
+choose_output_iface(const struct port *port, const struct flow *flow,
+                    uint16_t *dp_ifidx)
 {
     assert(port->n_ifaces);
     if (port->n_ifaces == 1) {
-        return port->ifaces[0];
+        *dp_ifidx = port->ifaces[0]->dp_ifidx;
+        return 0;
     } else {
-        size_t h = hash_fnv(flow->dl_src, sizeof flow->dl_src, HASH_FNV_BASIS);
-        struct bond_entry *e = &port->bond_hash[h & BOND_MASK];
+        struct bond_entry *e = lookup_bond_entry(port, flow->dl_src);
         if (e->iface_idx < 0 || e->iface_idx >= port->n_ifaces) {
-            /* XXX select interface properly */
-            static int count = 0;
-            e->iface_idx = count++ % port->n_ifaces;
+            /* XXX select interface properly.  The current interface selection
+             * is only good for testing the rebalancing code. */
+            e->iface_idx = 0;
+            e->iface_tag = tag_create_random();
         }
-        return port->ifaces[e->iface_idx];
+        *dp_ifidx = port->ifaces[e->iface_idx]->dp_ifidx;
+        return e->iface_tag;
     }
 }
 
 static void
+bond_account_flow(struct bridge *br, const struct ft_flow *f)
+{
+    const struct ft_dst *dst;
+
+    if (f->byte_count <= f->last_byte_count) {
+        /* No bytes to add, so don't waste our time. */
+        return;
+    }
+    /* XXX return immediately if no bonded interfaces. */
+
+    for (dst = &f->dsts[0]; dst < &f->dsts[f->n_dsts]; dst++) {
+        uint16_t dp_ifidx = dst->dp_ifidx;
+        struct port *port;
+        struct bond_entry *e;
+
+        /* Find the interface and port structure for 'flow'. */
+        if (dp_ifidx >= ARRAY_SIZE(br->ifaces) || !br->ifaces[dp_ifidx]) {
+            /* shouldn't happen, but... */
+            continue;
+        }
+        port = br->ifaces[dp_ifidx]->port;
+        if (port->n_ifaces < 2) {
+            /* Not a bonded interface. */
+            continue;
+        }
+
+        e = lookup_bond_entry(port, f->flow.dl_src);
+        e->tx_bytes += f->byte_count - f->last_byte_count;
+    }
+}
+
+static tag_type
 set_dst(struct ft_dst *p, const struct flow *flow,
         const struct port *in_port, const struct port *out_port)
 {
     p->vlan = (out_port->vlan ? OFP_VLAN_NONE
               : in_port->vlan ? in_port->vlan
               : ntohs(flow->dl_vlan));
-    p->dp_ifidx = choose_output_iface(out_port, flow)->dp_ifidx;
+    return choose_output_iface(out_port, flow, &p->dp_ifidx);
 }
 
 static void
@@ -986,6 +1030,7 @@ compose_dsts(const struct bridge *br, const struct flow *flow, uint16_t vlan,
 {
     if (out_port == FLOOD_PORT) {
         /* Flood. */
+        /* XXX use OFPP_FLOOD if no vlans or bonding. */
         struct ft_dst *dst;       /* Next element in 'dsts'. */
         struct ft_dst *vlan_dsts; /* First 'dsts' with vlan != flow->dl_vlan */
         size_t i;
@@ -1070,7 +1115,8 @@ send_packets(struct bridge *br, const struct flow *flow,
             }
             f->tags = tags;
         } else {
-            ft_insert(br->ft, ftf_create(flow, dsts, n_dsts, tags));
+            f = ftf_create(flow, dsts, n_dsts, tags);
+            ft_insert(br->ft, f);
             queue = true;
         }
 
@@ -1341,6 +1387,7 @@ process_flow_expired(struct bridge *br, void *ofe_)
         /* Update statistics. */
         f->last_byte_count = f->byte_count;
         f->byte_count = ntohll(ofe->byte_count);
+        bond_account_flow(br, f);
 
         ft_remove(br->ft, f);
         ftf_destroy(f);
@@ -1380,6 +1427,192 @@ flow_from_match(struct flow *flow, const struct ofp_match *match)
 \f
 /* Flow statistics collection. */
 
+struct slave_balance {
+    struct iface *iface;
+    uint64_t tx_bytes;
+    struct bond_entry **hashes;
+    size_t n_hashes;
+};
+
+/* Sorts pointers to pointers to bond_entries in ascending order by the
+ * interface to which they are assigned, and within a single interface in
+ * ascending order of bytes transmitted. */
+static int
+compare_bond_entries(const void *a_, const void *b_)
+{
+    const struct bond_entry *const *ap = a_;
+    const struct bond_entry *const *bp = b_;
+    const struct bond_entry *a = *ap;
+    const struct bond_entry *b = *bp;
+    if (a->iface_idx != b->iface_idx) {
+        return a->iface_idx > b->iface_idx ? 1 : -1;
+    } else if (a->tx_bytes != b->tx_bytes) {
+        return a->tx_bytes > b->tx_bytes ? 1 : -1;
+    } else {
+        return 0;
+    }
+}
+
+/* Sorts slave_balances in *descending* order by number of bytes
+ * transmitted. */
+static int
+compare_slave_balance(const void *a_, const void *b_)
+{
+    const struct slave_balance *a = a_;
+    const struct slave_balance *b = b_;
+    return a->tx_bytes > b->tx_bytes ? -1 : a->tx_bytes < b->tx_bytes;
+}
+
+static void
+swap_bals(struct slave_balance *a, struct slave_balance *b)
+{
+    struct slave_balance tmp = *a;
+    *a = *b;
+    *b = tmp;
+}
+
+static void
+resort_bals(struct slave_balance *p, struct slave_balance *bals, size_t n_bals)
+{
+    if (n_bals > 1) {
+        for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) {
+            swap_bals(p, p - 1);
+        }
+        for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) {
+            swap_bals(p, p + 1);
+        }
+    }
+}
+
+static void
+print_bals(const char *title, const struct slave_balance *bals, size_t n_bals,
+           struct port *port)
+{
+    const struct slave_balance *b;
+
+    printf("slave balance %s:\n", title);
+    for (b = bals; b < bals + n_bals; b++) {
+        size_t i;
+
+        printf("\t%s: %"PRIu64" bytes over:\n", b->iface->name, b->tx_bytes);
+        for (i = 0; i < b->n_hashes; i++) {
+            const struct bond_entry *e = b->hashes[i];
+            printf("\t\thash %td: %"PRIu64" bytes\n",
+                   e - port->bond_hash, e->tx_bytes);
+        }
+    }
+}
+
+static void
+bond_rebalance_port(struct port *port)
+{
+    struct slave_balance bals[DP_MAX_PORTS];
+    struct bond_entry *hashes[BOND_MASK + 1];
+    struct slave_balance *b, *least_loaded;
+    struct bond_entry *e;
+    size_t i;
+
+    for (b = bals; b < &bals[port->n_ifaces]; b++) {
+        b->iface = port->ifaces[b - bals];
+        b->tx_bytes = 0;
+        b->hashes = NULL;
+        b->n_hashes = 0;
+    }
+    for (i = 0; i <= BOND_MASK; i++) {
+        hashes[i] = &port->bond_hash[i];
+    }
+    qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries);
+    for (i = 0; i <= BOND_MASK; i++) {
+        e = hashes[i];
+        if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) {
+            b = &bals[e->iface_idx];
+            b->tx_bytes += e->tx_bytes;
+            if (!b->hashes) {
+                b->hashes = &hashes[i];
+            }
+            b->n_hashes++;
+        }
+    }
+
+    /* Sort in decreasing order of tx_bytes. */
+    qsort(bals, port->n_ifaces, sizeof *bals, compare_slave_balance);
+
+    print_bals("before", bals, port->n_ifaces, port);
+
+    /* Shift load from the most-heavily-loaded slaves to the
+     * least-heavily-loaded slaves. */
+    least_loaded = &bals[port->n_ifaces - 1];
+    for (b = bals; b < least_loaded; ) {
+        uint64_t overload = b->tx_bytes - least_loaded->tx_bytes;
+        if (overload < least_loaded->tx_bytes >> 5 || overload < 100000) {
+            /* The extra load on this slave (and all less-loaded slaves),
+             * compared to that of the least-loaded slave, is less than ~3%, or
+             * it is less than ~1Mbps.  No point in rebalancing. */
+            break;
+        }
+
+        if (b->n_hashes == 1) {
+            /* This slave only carries a single MAC hash, so we can't shift any
+             * load away from it, even though we want to. */
+            b++;
+            continue;
+        }
+
+        for (i = 0; i < b->n_hashes; i++) {
+            e = b->hashes[i];
+            if (least_loaded->tx_bytes + e->tx_bytes
+                < b->tx_bytes - e->tx_bytes) {
+                /* Delete element from e->hashes. */
+                if (!i) {
+                    b->hashes++;
+                } else {
+                    memmove(b->hashes + i, b->hashes + i + 1,
+                            (b->n_hashes - (i + 1)) * sizeof *b->hashes);
+                }
+                b->n_hashes--;
+
+                /* Shift load away from 'b'. */
+                b->tx_bytes -= e->tx_bytes;
+                resort_bals(b, bals, port->n_ifaces);
+
+                /* Shift load to 'least_loaded'. */
+                tag_set_add(&port->bridge->revalidate_set, e->iface_tag);
+                e->iface_idx = least_loaded->iface->port_ifidx;
+                e->iface_tag = tag_create_random();
+                least_loaded->tx_bytes += e->tx_bytes;
+                resort_bals(least_loaded, bals, port->n_ifaces);
+                goto again;
+            }
+        }
+        b++;
+
+    again: ;
+    }
+
+    print_bals("after", bals, port->n_ifaces, port);
+
+    /* Implement exponentially weighted moving average.  A weight of 1/2 causes
+     * historical data to decay to <1% in 7 rebalancing runs.  */
+    for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) {
+        e->tx_bytes /= 2;
+    }
+}
+
+static void
+bond_rebalance(struct bridge *br)
+{
+    size_t i;
+
+    for (i = 0; i < br->n_ports; i++) {
+        struct port *port = br->ports[i];
+        if (port->n_ifaces < 2) {
+            continue;
+        }
+
+        bond_rebalance_port(port);
+    }
+}
+
 static void
 request_flow_stats(struct bridge *br)
 {
@@ -1397,7 +1630,6 @@ request_flow_stats(struct bridge *br)
 
     stats_request_destroy(br->flow_rq);
     br->flow_rq = stats_request_create(br->stats_mgr, msg);
-    br->next_stats_request = time_now() + 10;
 }
 
 static void
@@ -1453,6 +1685,8 @@ flowstats_process(struct bridge *br)
                     ftf_set_dsts(f, &invalid_dst, 1);
                     f->tags |= flowstats_tag;
                     n_tagged++;
+                } else {
+                    bond_account_flow(br, f);
                 }
             } else {
                 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -1471,12 +1705,21 @@ flowstats_process(struct bridge *br)
         }
         ofpbuf_delete(msg);
     }
+
+    /* Delete all the flows that remain in br->ft, replacing them by the ones
+     * from new_ft. */
     hmap_expand(&new_ft->flows);
     hmap_swap(&br->ft->flows, &new_ft->flows);
     ft_destroy(new_ft);
+
+    /* If we tagged anything for revalidation, add that tag to the revalidation
+     * set. */
     if (n_tagged) {
         tag_set_add(&br->revalidate_set, flowstats_tag);
     }
+
+    /* Rebalance any bonded ports. */
+    bond_rebalance(br);
 }
 
 static void
@@ -1506,7 +1749,7 @@ flowstats_run(struct bridge *br)
         if (rconn_is_connected(br->rconn)
             && time_now() >= br->next_stats_request) {
             request_flow_stats(br);
-            br->next_stats_request = time_now() + 60;
+            br->next_stats_request = time_now() + 10;
         }
     }
 }
@@ -1640,6 +1883,7 @@ port_update_bonding(struct port *port)
             for (i = 0; i <= BOND_MASK; i++) {
                 struct bond_entry *e = &port->bond_hash[i];
                 e->iface_idx = -1;
+                e->tx_bytes = 0;
             }
             port->active_iface = 0;
         }