From: Ben Pfaff Date: Fri, 5 Jun 2009 17:25:49 +0000 (-0700) Subject: vswitch: Reimplement bond rebalancing. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=addc4b3179c4db5e305b7a94db9c62cfe90fdd36;p=openvswitch vswitch: Reimplement bond rebalancing. In addition to restoring the bond rebalancing features that were lost some time ago, this also: - Refactors the code and adds comments to make it more easily understandable. - Improves log messages. - Fixes a bug noticed during testing where the bond rebalancer would shift load to interfaces that had been disabled. Bug #1174. --- diff --git a/secchan/ofproto.c b/secchan/ofproto.c index 9a70f3e1..6454f480 100644 --- a/secchan/ofproto.c +++ b/secchan/ofproto.c @@ -108,6 +108,7 @@ struct rule { long long int created; /* Creation time. */ uint64_t packet_count; /* Number of packets received. */ uint64_t byte_count; /* Number of bytes received. */ + uint64_t accounted_bytes; /* Number of bytes passed to account_cb. */ uint8_t tcp_flags; /* Bitwise-OR of all TCP flags seen. */ uint8_t ip_tos; /* Last-seen IP type-of-service. */ tag_type tags; /* Tags (set only by hooks). */ @@ -862,6 +863,14 @@ ofproto_run1(struct ofproto *p) update_used(p); classifier_for_each(&p->cls, CLS_INC_ALL, expire_rule, p); + + /* Let the hook know that we're at a stable point: all outstanding data + * in existing flows has been accounted to the account_cb. Thus, the + * hook can now reasonably do operations that depend on having accurate + * flow volume accounting (currently, that's just bond rebalancing). */ + if (p->ofhooks->account_checkpoint_cb) { + p->ofhooks->account_checkpoint_cb(p->aux); + } } if (p->netflow) { @@ -1637,6 +1646,21 @@ rule_update_actions(struct ofproto *ofproto, struct rule *rule) } } +static void +rule_account(struct ofproto *ofproto, struct rule *rule, uint64_t extra_bytes) +{ + uint64_t total_bytes = rule->byte_count + extra_bytes; + + if (ofproto->ofhooks->account_flow_cb + && total_bytes > rule->accounted_bytes) + { + ofproto->ofhooks->account_flow_cb( + &rule->cr.flow, rule->odp_actions, rule->n_odp_actions, + total_bytes - rule->accounted_bytes, ofproto->aux); + rule->accounted_bytes = total_bytes; + } +} + static void rule_uninstall(struct ofproto *p, struct rule *rule) { @@ -1661,6 +1685,7 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule) { struct rule *super = rule->super; + rule_account(ofproto, rule, 0); if (ofproto->netflow) { struct ofexpired expired; expired.flow = rule->cr.flow; @@ -1685,6 +1710,7 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule) * reinstalled. */ rule->packet_count = 0; rule->byte_count = 0; + rule->accounted_bytes = 0; rule->tcp_flags = 0; rule->ip_tos = 0; } @@ -3148,6 +3174,7 @@ update_used(struct ofproto *p) } update_time(rule, &f->stats); + rule_account(p, rule, f->stats.n_bytes); } free(flows); } @@ -3290,4 +3317,6 @@ default_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet, static const struct ofhooks default_ofhooks = { NULL, default_normal_ofhook_cb, + NULL, + NULL }; diff --git a/secchan/ofproto.h b/secchan/ofproto.h index df90d0b9..b01ba1fd 100644 --- a/secchan/ofproto.h +++ b/secchan/ofproto.h @@ -115,6 +115,10 @@ struct ofhooks { void *aux); bool (*normal_cb)(const flow_t *, const struct ofpbuf *packet, struct odp_actions *, tag_type *, void *aux); + void (*account_flow_cb)(const flow_t *, const union odp_action *, + size_t n_actions, unsigned long long int n_bytes, + void *aux); + void (*account_checkpoint_cb)(void *aux); }; void ofproto_revalidate(struct ofproto *, tag_type); struct tag_set *ofproto_get_revalidate_set(struct ofproto *); diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c index ce23a318..bd04c11f 100644 --- a/vswitchd/bridge.c +++ b/vswitchd/bridge.c @@ -45,6 +45,7 @@ #include "coverage.h" #include "dirs.h" #include "dpif.h" +#include "dynamic-string.h" #include "flow.h" #include "hash.h" #include "list.h" @@ -173,6 +174,10 @@ struct bridge { struct port **ports; size_t n_ports, allocated_ports; + /* Bonding. */ + bool has_bonded_ports; + long long int bond_next_rebalance; + /* Flow tracking. */ bool flush; @@ -212,6 +217,7 @@ static uint64_t dpid_from_hash(const void *, size_t nbytes); static void bond_run(struct bridge *); static void bond_wait(struct bridge *); +static void bond_rebalance_port(struct port *); static void port_create(struct bridge *, const char *name); static void port_reconfigure(struct port *); @@ -824,6 +830,7 @@ bridge_create(const char *name) port_array_init(&br->ifaces); br->flush = false; + br->bond_next_rebalance = time_msec() + 10000; list_push_back(&all_bridges, &br->node); @@ -1845,11 +1852,322 @@ bridge_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet, return process_flow(br, flow, packet, actions, tags); } +static void +bridge_account_flow_ofhook_cb(const flow_t *flow, + const union odp_action *actions, + size_t n_actions, unsigned long long int n_bytes, + void *br_) +{ + struct bridge *br = br_; + const union odp_action *a; + + if (!br->has_bonded_ports) { + return; + } + + for (a = actions; a < &actions[n_actions]; a++) { + if (a->type == ODPAT_OUTPUT) { + struct port *port = port_from_dp_ifidx(br, a->output.port); + if (port && port->n_ifaces >= 2) { + struct bond_entry *e = lookup_bond_entry(port, flow->dl_src); + e->tx_bytes += n_bytes; + } + } + } +} + +static void +bridge_account_checkpoint_ofhook_cb(void *br_) +{ + struct bridge *br = br_; + size_t i; + + if (!br->has_bonded_ports) { + return; + } + + /* The current ofproto implementation calls this callback at least once a + * second, so this timer implementation is sufficient. */ + if (time_msec() < br->bond_next_rebalance) { + return; + } + br->bond_next_rebalance = time_msec() + 10000; + + for (i = 0; i < br->n_ports; i++) { + struct port *port = br->ports[i]; + if (port->n_ifaces > 1) { + bond_rebalance_port(port); + } + } +} + static struct ofhooks bridge_ofhooks = { bridge_port_changed_ofhook_cb, bridge_normal_ofhook_cb, + bridge_account_flow_ofhook_cb, + bridge_account_checkpoint_ofhook_cb, }; +/* Statistics for a single interface on a bonded port, used for load-based + * bond rebalancing. */ +struct slave_balance { + struct iface *iface; /* The interface. */ + uint64_t tx_bytes; /* Sum of hashes[*]->tx_bytes. */ + + /* All the "bond_entry"s that are assigned to this interface, in order of + * increasing tx_bytes. */ + struct bond_entry **hashes; + size_t n_hashes; +}; + +/* Sorts pointers to pointers to bond_entries in ascending order by the + * interface to which they are assigned, and within a single interface in + * ascending order of bytes transmitted. */ +static int +compare_bond_entries(const void *a_, const void *b_) +{ + const struct bond_entry *const *ap = a_; + const struct bond_entry *const *bp = b_; + const struct bond_entry *a = *ap; + const struct bond_entry *b = *bp; + if (a->iface_idx != b->iface_idx) { + return a->iface_idx > b->iface_idx ? 1 : -1; + } else if (a->tx_bytes != b->tx_bytes) { + return a->tx_bytes > b->tx_bytes ? 1 : -1; + } else { + return 0; + } +} + +/* Sorts slave_balances so that enabled ports come first, and otherwise in + * *descending* order by number of bytes transmitted. */ +static int +compare_slave_balance(const void *a_, const void *b_) +{ + const struct slave_balance *a = a_; + const struct slave_balance *b = b_; + if (a->iface->enabled != b->iface->enabled) { + return a->iface->enabled ? -1 : 1; + } else if (a->tx_bytes != b->tx_bytes) { + return a->tx_bytes > b->tx_bytes ? -1 : 1; + } else { + return 0; + } +} + +static void +swap_bals(struct slave_balance *a, struct slave_balance *b) +{ + struct slave_balance tmp = *a; + *a = *b; + *b = tmp; +} + +/* Restores the 'n_bals' slave_balance structures in 'bals' to sorted order + * given that 'p' (and only 'p') might be in the wrong location. + * + * This function invalidates 'p', since it might now be in a different memory + * location. */ +static void +resort_bals(struct slave_balance *p, + struct slave_balance bals[], size_t n_bals) +{ + if (n_bals > 1) { + for (; p > bals && p->tx_bytes > p[-1].tx_bytes; p--) { + swap_bals(p, p - 1); + } + for (; p < &bals[n_bals - 1] && p->tx_bytes < p[1].tx_bytes; p++) { + swap_bals(p, p + 1); + } + } +} + +static void +log_bals(const struct slave_balance *bals, size_t n_bals, struct port *port) +{ + if (VLOG_IS_DBG_ENABLED()) { + struct ds ds = DS_EMPTY_INITIALIZER; + const struct slave_balance *b; + + for (b = bals; b < bals + n_bals; b++) { + size_t i; + + if (b > bals) { + ds_put_char(&ds, ','); + } + ds_put_format(&ds, " %s %"PRIu64"kB", + b->iface->name, b->tx_bytes / 1024); + + if (!b->iface->enabled) { + ds_put_cstr(&ds, " (disabled)"); + } + if (b->n_hashes > 0) { + ds_put_cstr(&ds, " ("); + for (i = 0; i < b->n_hashes; i++) { + const struct bond_entry *e = b->hashes[i]; + if (i > 0) { + ds_put_cstr(&ds, " + "); + } + ds_put_format(&ds, "h%td: %"PRIu64"kB", + e - port->bond_hash, e->tx_bytes / 1024); + } + ds_put_cstr(&ds, ")"); + } + } + VLOG_DBG("bond %s:%s", port->name, ds_cstr(&ds)); + ds_destroy(&ds); + } +} + +/* Shifts 'hash' from 'from' to 'to' within 'port'. */ +static void +bond_shift_load(struct slave_balance *from, struct slave_balance *to, + struct bond_entry *hash) +{ + struct port *port = from->iface->port; + uint64_t delta = hash->tx_bytes; + + VLOG_INFO("bond %s: shift %"PRIu64"kB of load (with hash %td) " + "from %s to %s (now carrying %"PRIu64"kB and " + "%"PRIu64"kB load, respectively)", + port->name, delta / 1024, hash - port->bond_hash, + from->iface->name, to->iface->name, + (from->tx_bytes - delta) / 1024, + (to->tx_bytes + delta) / 1024); + + /* Delete element from from->hashes. + * + * We don't bother to add the element to to->hashes because not only would + * it require more work, the only purpose it would be to allow that hash to + * be migrated to another slave in this rebalancing run, and there is no + * point in doing that. */ + if (from->hashes[0] == hash) { + from->hashes++; + } else { + int i = hash - from->hashes[0]; + memmove(from->hashes + i, from->hashes + i + 1, + (from->n_hashes - (i + 1)) * sizeof *from->hashes); + } + from->n_hashes--; + + /* Shift load away from 'from' to 'to'. */ + from->tx_bytes -= delta; + to->tx_bytes += delta; + + /* Arrange for flows to be revalidated. */ + ofproto_revalidate(port->bridge->ofproto, hash->iface_tag); + hash->iface_idx = to->iface->port_ifidx; + hash->iface_tag = tag_create_random(); + +} + +static void +bond_rebalance_port(struct port *port) +{ + struct slave_balance bals[DP_MAX_PORTS]; + size_t n_bals; + struct bond_entry *hashes[BOND_MASK + 1]; + struct slave_balance *b, *from, *to; + struct bond_entry *e; + size_t i; + + /* Sets up 'bals' to describe each of the port's interfaces, sorted in + * descending order of tx_bytes, so that bals[0] represents the most + * heavily loaded slave and bals[n_bals - 1] represents the least heavily + * loaded slave. + * + * The code is a bit tricky: to avoid dynamically allocating a 'hashes' + * array for each slave_balance structure, we sort our local array of + * hashes in order by slave, so that all of the hashes for a given slave + * become contiguous in memory, and then we point each 'hashes' members of + * a slave_balance structure to the start of a contiguous group. */ + n_bals = port->n_ifaces; + for (b = bals; b < &bals[n_bals]; b++) { + b->iface = port->ifaces[b - bals]; + b->tx_bytes = 0; + b->hashes = NULL; + b->n_hashes = 0; + } + for (i = 0; i <= BOND_MASK; i++) { + hashes[i] = &port->bond_hash[i]; + } + qsort(hashes, BOND_MASK + 1, sizeof *hashes, compare_bond_entries); + for (i = 0; i <= BOND_MASK; i++) { + e = hashes[i]; + if (e->iface_idx >= 0 && e->iface_idx < port->n_ifaces) { + b = &bals[e->iface_idx]; + b->tx_bytes += e->tx_bytes; + if (!b->hashes) { + b->hashes = &hashes[i]; + } + b->n_hashes++; + } + } + qsort(bals, n_bals, sizeof *bals, compare_slave_balance); + log_bals(bals, n_bals, port); + + /* Discard slaves that aren't enabled (which were sorted to the back of the + * array earlier). */ + while (!bals[n_bals - 1].iface->enabled) { + n_bals--; + if (!n_bals) { + return; + } + } + + /* Shift load from the most-loaded slaves to the least-loaded slaves. */ + to = &bals[n_bals - 1]; + for (from = bals; from < to; ) { + uint64_t overload = from->tx_bytes - to->tx_bytes; + if (overload < to->tx_bytes >> 5 || overload < 100000) { + /* The extra load on 'from' (and all less-loaded slaves), compared + * to that of 'to' (the least-loaded slave), is less than ~3%, or + * it is less than ~1Mbps. No point in rebalancing. */ + break; + } else if (from->n_hashes == 1) { + /* 'from' only carries a single MAC hash, so we can't shift any + * load away from it, even though we want to. */ + from++; + } else { + /* 'from' is carrying significantly more load than 'to', and that + * load is split across at least two different hashes. Pick a hash + * to migrate to 'to' (the least-loaded slave), given that doing so + * must not cause 'to''s load to exceed 'from''s load. + * + * The sort order we use means that we prefer to shift away the + * smallest hashes instead of the biggest ones. There is little + * reason behind this decision; we could use the opposite sort + * order to shift away big hashes ahead of small ones. */ + size_t i; + + for (i = 0; i < from->n_hashes; i++) { + uint64_t delta = from->hashes[i]->tx_bytes; + if (to->tx_bytes + delta < from->tx_bytes - delta) { + break; + } + } + if (i < from->n_hashes) { + bond_shift_load(from, to, from->hashes[i]); + + /* Re-sort 'bals'. Note that this may make 'from' and 'to' + * point to different slave_balance structures. It is only + * valid to do these two operations in a row at all because we + * know that 'from' will not move past 'to' and vice versa. */ + resort_bals(from, bals, n_bals); + resort_bals(to, bals, n_bals); + } else { + from++; + } + } + } + + /* Implement exponentially weighted moving average. A weight of 1/2 causes + * historical data to decay to <1% in 7 rebalancing runs. */ + for (e = &port->bond_hash[0]; e <= &port->bond_hash[BOND_MASK]; e++) { + e->tx_bytes /= 2; + } +} + /* Port functions. */ static void @@ -2172,6 +2490,9 @@ iface_create(struct port *port, const char *name) sizeof *port->ifaces); } port->ifaces[port->n_ifaces++] = iface; + if (port->n_ifaces > 1) { + port->bridge->has_bonded_ports = true; + } VLOG_DBG("attached network device %s to port %s", iface->name, port->name);