From: Ben Pfaff Date: Sat, 27 Dec 2008 00:47:35 +0000 (-0800) Subject: vswitchd: Basic bonding rebalancing works. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=615a05376000931a5acd8259f91462e9cd1f3111;p=openvswitch vswitchd: Basic bonding rebalancing works. So far only tested with hping3. At least, need to make sure that existing flows get redirected through the new interface as well. --- diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c index 998208b0..2e67bec6 100644 --- a/vswitchd/bridge.c +++ b/vswitchd/bridge.c @@ -80,6 +80,8 @@ struct iface { #define BOND_MASK 0xff struct bond_entry { int iface_idx; /* Index of assigned iface, or -1 if none. */ + uint64_t tx_bytes; /* Count of bytes recently transmitted. */ + tag_type iface_tag; /* Tag associated with iface_idx. */ }; #define FLOOD_PORT ((struct port *) 1) /* The 'flood' output port. */ @@ -913,32 +915,74 @@ queue_tx(struct bridge *br, struct ofpbuf *msg) } } -static struct iface * -choose_output_iface(const struct port *port, const struct flow *flow) +static struct bond_entry * +lookup_bond_entry(const struct port *port, const uint8_t mac[ETH_ADDR_LEN]) +{ + size_t h = hash_fnv(mac, ETH_ADDR_LEN, HASH_FNV_BASIS); + return &port->bond_hash[h & BOND_MASK]; +} + +static tag_type +choose_output_iface(const struct port *port, const struct flow *flow, + uint16_t *dp_ifidx) { assert(port->n_ifaces); if (port->n_ifaces == 1) { - return port->ifaces[0]; + *dp_ifidx = port->ifaces[0]->dp_ifidx; + return 0; } else { - size_t h = hash_fnv(flow->dl_src, sizeof flow->dl_src, HASH_FNV_BASIS); - struct bond_entry *e = &port->bond_hash[h & BOND_MASK]; + struct bond_entry *e = lookup_bond_entry(port, flow->dl_src); if (e->iface_idx < 0 || e->iface_idx >= port->n_ifaces) { - /* XXX select interface properly */ - static int count = 0; - e->iface_idx = count++ % port->n_ifaces; + /* XXX select interface properly. The current interface selection + * is only good for testing the rebalancing code. */ + e->iface_idx = 0; + e->iface_tag = tag_create_random(); } - return port->ifaces[e->iface_idx]; + *dp_ifidx = port->ifaces[e->iface_idx]->dp_ifidx; + return e->iface_tag; } } static void +bond_account_flow(struct bridge *br, const struct ft_flow *f) +{ + const struct ft_dst *dst; + + if (f->byte_count <= f->last_byte_count) { + /* No bytes to add, so don't waste our time. */ + return; + } + /* XXX return immediately if no bonded interfaces. */ + + for (dst = &f->dsts[0]; dst < &f->dsts[f->n_dsts]; dst++) { + uint16_t dp_ifidx = dst->dp_ifidx; + struct port *port; + struct bond_entry *e; + + /* Find the interface and port structure for 'flow'. */ + if (dp_ifidx >= ARRAY_SIZE(br->ifaces) || !br->ifaces[dp_ifidx]) { + /* shouldn't happen, but... */ + continue; + } + port = br->ifaces[dp_ifidx]->port; + if (port->n_ifaces < 2) { + /* Not a bonded interface. */ + continue; + } + + e = lookup_bond_entry(port, f->flow.dl_src); + e->tx_bytes += f->byte_count - f->last_byte_count; + } +} + +static tag_type set_dst(struct ft_dst *p, const struct flow *flow, const struct port *in_port, const struct port *out_port) { p->vlan = (out_port->vlan ? OFP_VLAN_NONE : in_port->vlan ? in_port->vlan : ntohs(flow->dl_vlan)); - p->dp_ifidx = choose_output_iface(out_port, flow)->dp_ifidx; + return choose_output_iface(out_port, flow, &p->dp_ifidx); } static void @@ -986,6 +1030,7 @@ compose_dsts(const struct bridge *br, const struct flow *flow, uint16_t vlan, { if (out_port == FLOOD_PORT) { /* Flood. */ + /* XXX use OFPP_FLOOD if no vlans or bonding. */ struct ft_dst *dst; /* Next element in 'dsts'. */ struct ft_dst *vlan_dsts; /* First 'dsts' with vlan != flow->dl_vlan */ size_t i; @@ -1070,7 +1115,8 @@ send_packets(struct bridge *br, const struct flow *flow, } f->tags = tags; } else { - ft_insert(br->ft, ftf_create(flow, dsts, n_dsts, tags)); + f = ftf_create(flow, dsts, n_dsts, tags); + ft_insert(br->ft, f); queue = true; } @@ -1341,6 +1387,7 @@ process_flow_expired(struct bridge *br, void *ofe_) /* Update statistics. */ f->last_byte_count = f->byte_count; f->byte_count = ntohll(ofe->byte_count); + bond_account_flow(br, f); ft_remove(br->ft, f); ftf_destroy(f); @@ -1380,6 +1427,192 @@ flow_from_match(struct flow *flow, const struct ofp_match *match) /* Flow statistics collection. */ +struct slave_balance { + struct iface *iface; + uint64_t tx_bytes; + struct bond_entry **hashes; + size_t n_hashes; +}; + +/* Sorts pointers to pointers to bond_entries in ascending order by the + * interface to which they are assigned, and within a single interface in + * ascending order of bytes transmitted. */ +static int +compare_bond_entries(const void *a_, const void *b_) +{ + const struct bond_entry *const *ap = a_; + const struct bond_entry *const *bp = b_; + const struct bond_entry *a = *ap; + const struct bond_entry *b = *bp; + if (a->iface_idx != b->iface_idx) { + return a->iface_idx > b->iface_idx ? 1 : -1; + } else if (a->tx_bytes != b->tx_bytes) { + return a->tx_bytes > b->tx_bytes ? 1 : -1; + } else { + return 0; + } +} + +/* Sorts slave_balances in *descending* order by number of bytes + * transmitted. */ +static int +compare_slave_balance(const void *a_, const void *b_) +{ + const struct slave_balance *a = a_; + const struct slave_balance *b = b_; + return a->tx_bytes > b->tx_bytes ? -1 : a->tx_bytes < b->tx_bytes; +} + +static void +swap_bals(struct slave_balance *a, struct slave_balance *b) +{ + struct slave_balance tmp = *a; + *a = *b; + *b = tmp; +} + +static void +resort_bals(struct slave_balance *p, struct slave_balance *bals, size_t n_bals) +{ + if (n_bals > 1) { + for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) { + swap_bals(p, p - 1); + } + for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) { + swap_bals(p, p + 1); + } + } +} + +static void +print_bals(const char *title, const struct slave_balance *bals, size_t n_bals, + struct port *port) +{ + const struct slave_balance *b; + + printf("slave balance %s:\n", title); + for (b = bals; b < bals + n_bals; b++) { + size_t i; + + printf("\t%s: %"PRIu64" bytes over:\n", b->iface->name, b->tx_bytes); + for (i = 0; i < b->n_hashes; i++) { + const struct bond_entry *e = b->hashes[i]; + printf("\t\thash %td: %"PRIu64" bytes\n", + e - port->bond_hash, e->tx_bytes); + } + } +} + +static void +bond_rebalance_port(struct port *port) +{ + struct slave_balance bals[DP_MAX_PORTS]; + struct bond_entry *hashes[BOND_MASK + 1]; + struct slave_balance *b, *least_loaded; + struct bond_entry *e; + size_t i; + + for (b = bals; b < &bals[port->n_ifaces]; b++) { + b->iface = port->ifaces[b - bals]; + b->tx_bytes = 0; + b->hashes = NULL; + b->n_hashes = 0; + } + for (i = 0; i <= BOND_MASK; i++) { + hashes[i] = &port->bond_hash[i]; + } + qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries); + for (i = 0; i <= BOND_MASK; i++) { + e = hashes[i]; + if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) { + b = &bals[e->iface_idx]; + b->tx_bytes += e->tx_bytes; + if (!b->hashes) { + b->hashes = &hashes[i]; + } + b->n_hashes++; + } + } + + /* Sort in decreasing order of tx_bytes. */ + qsort(bals, port->n_ifaces, sizeof *bals, compare_slave_balance); + + print_bals("before", bals, port->n_ifaces, port); + + /* Shift load from the most-heavily-loaded slaves to the + * least-heavily-loaded slaves. */ + least_loaded = &bals[port->n_ifaces - 1]; + for (b = bals; b < least_loaded; ) { + uint64_t overload = b->tx_bytes - least_loaded->tx_bytes; + if (overload < least_loaded->tx_bytes >> 5 || overload < 100000) { + /* The extra load on this slave (and all less-loaded slaves), + * compared to that of the least-loaded slave, is less than ~3%, or + * it is less than ~1Mbps. No point in rebalancing. */ + break; + } + + if (b->n_hashes == 1) { + /* This slave only carries a single MAC hash, so we can't shift any + * load away from it, even though we want to. */ + b++; + continue; + } + + for (i = 0; i < b->n_hashes; i++) { + e = b->hashes[i]; + if (least_loaded->tx_bytes + e->tx_bytes + < b->tx_bytes - e->tx_bytes) { + /* Delete element from e->hashes. */ + if (!i) { + b->hashes++; + } else { + memmove(b->hashes + i, b->hashes + i + 1, + (b->n_hashes - (i + 1)) * sizeof *b->hashes); + } + b->n_hashes--; + + /* Shift load away from 'b'. */ + b->tx_bytes -= e->tx_bytes; + resort_bals(b, bals, port->n_ifaces); + + /* Shift load to 'least_loaded'. */ + tag_set_add(&port->bridge->revalidate_set, e->iface_tag); + e->iface_idx = least_loaded->iface->port_ifidx; + e->iface_tag = tag_create_random(); + least_loaded->tx_bytes += e->tx_bytes; + resort_bals(least_loaded, bals, port->n_ifaces); + goto again; + } + } + b++; + + again: ; + } + + print_bals("after", bals, port->n_ifaces, port); + + /* Implement exponentially weighted moving average. A weight of 1/2 causes + * historical data to decay to <1% in 7 rebalancing runs. */ + for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) { + e->tx_bytes /= 2; + } +} + +static void +bond_rebalance(struct bridge *br) +{ + size_t i; + + for (i = 0; i < br->n_ports; i++) { + struct port *port = br->ports[i]; + if (port->n_ifaces < 2) { + continue; + } + + bond_rebalance_port(port); + } +} + static void request_flow_stats(struct bridge *br) { @@ -1397,7 +1630,6 @@ request_flow_stats(struct bridge *br) stats_request_destroy(br->flow_rq); br->flow_rq = stats_request_create(br->stats_mgr, msg); - br->next_stats_request = time_now() + 10; } static void @@ -1453,6 +1685,8 @@ flowstats_process(struct bridge *br) ftf_set_dsts(f, &invalid_dst, 1); f->tags |= flowstats_tag; n_tagged++; + } else { + bond_account_flow(br, f); } } else { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -1471,12 +1705,21 @@ flowstats_process(struct bridge *br) } ofpbuf_delete(msg); } + + /* Delete all the flows that remain in br->ft, replacing them by the ones + * from new_ft. */ hmap_expand(&new_ft->flows); hmap_swap(&br->ft->flows, &new_ft->flows); ft_destroy(new_ft); + + /* If we tagged anything for revalidation, add that tag to the revalidation + * set. */ if (n_tagged) { tag_set_add(&br->revalidate_set, flowstats_tag); } + + /* Rebalance any bonded ports. */ + bond_rebalance(br); } static void @@ -1506,7 +1749,7 @@ flowstats_run(struct bridge *br) if (rconn_is_connected(br->rconn) && time_now() >= br->next_stats_request) { request_flow_stats(br); - br->next_stats_request = time_now() + 60; + br->next_stats_request = time_now() + 10; } } } @@ -1640,6 +1883,7 @@ port_update_bonding(struct port *port) for (i = 0; i <= BOND_MASK; i++) { struct bond_entry *e = &port->bond_hash[i]; e->iface_idx = -1; + e->tx_bytes = 0; } port->active_iface = 0; }