From 142f33e8989ab34178e6a04b57f69fff965bd623 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Tue, 12 Aug 2008 15:34:47 -0700 Subject: [PATCH] Implement rate limiting in secchan. --- secchan/secchan.c | 262 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 258 insertions(+), 4 deletions(-) diff --git a/secchan/secchan.c b/secchan/secchan.c index e4f283d1..e54fb2ff 100644 --- a/secchan/secchan.c +++ b/secchan/secchan.c @@ -94,6 +94,10 @@ struct settings { int probe_interval; /* # seconds idle before sending echo request. */ int max_backoff; /* Max # seconds between connection attempts. */ + /* Packet-in rate-limiting. */ + int rate_limit; /* Tokens added to bucket per second. */ + int burst_limit; /* Maximum number token bucket size. */ + /* Discovery behavior. */ regex_t accept_controller_regex; /* Controller vconns to accept. */ const char *accept_controller_re; /* String version of regex. */ @@ -119,6 +123,7 @@ struct relay { struct hook { bool (*packet_cb)(struct relay *, int half, void *aux); void (*periodic_cb)(void *aux); + void (*wait_cb)(void *aux); void *aux; }; @@ -134,6 +139,7 @@ static void relay_destroy(struct relay *); static struct hook make_hook(bool (*packet_cb)(struct relay *, int, void *), void (*periodic_cb)(void *), + void (*wait_cb)(void *), void *aux); static struct discovery *discovery_init(const struct settings *); @@ -145,6 +151,9 @@ static struct hook in_band_hook_create(const struct settings *); static struct hook fail_open_hook_create(const struct settings *, struct rconn *local, struct rconn *remote); +static struct hook rate_limit_hook_create(const struct settings *, + struct rconn *local, + struct rconn *remote); static void modify_dhcp_request(struct dhcp_msg *, void *aux); static bool validate_dhcp_offer(const struct dhcp_msg *, void *aux); @@ -156,7 +165,7 @@ main(int argc, char *argv[]) struct list relays = LIST_INITIALIZER(&relays); - struct hook hooks[3]; + struct hook hooks[4]; size_t n_hooks; struct rconn *local_rconn, *remote_rconn; @@ -221,6 +230,10 @@ main(int argc, char *argv[]) hooks[n_hooks++] = fail_open_hook_create(&s, local_rconn, remote_rconn); } + if (s.rate_limit) { + hooks[n_hooks++] = rate_limit_hook_create(&s, + local_rconn, remote_rconn); + } assert(n_hooks <= ARRAY_SIZE(hooks)); for (;;) { @@ -266,6 +279,11 @@ main(int argc, char *argv[]) if (listen_vconn) { vconn_accept_wait(listen_vconn); } + for (i = 0; i < n_hooks; i++) { + if (hooks[i].wait_cb) { + hooks[i].wait_cb(hooks[i].aux); + } + } if (discovery) { discovery_wait(discovery); } @@ -278,11 +296,13 @@ main(int argc, char *argv[]) static struct hook make_hook(bool (*packet_cb)(struct relay *, int half, void *aux), void (*periodic_cb)(void *aux), + void (*wait_cb)(void *aux), void *aux) { struct hook h; h.packet_cb = packet_cb; h.periodic_cb = periodic_cb; + h.wait_cb = wait_cb; h.aux = aux; return h; } @@ -612,7 +632,7 @@ in_band_hook_create(const struct settings *s) memcpy(in_band->mac, netdev_get_etheraddr(in_band->of_device), ETH_ADDR_LEN); - return make_hook(in_band_packet_cb, NULL, in_band); + return make_hook(in_band_packet_cb, NULL, NULL, in_band); } /* Fail open support. */ @@ -677,7 +697,198 @@ fail_open_hook_create(const struct settings *s, struct rconn *local_rconn, fail_open->local_rconn = local_rconn; fail_open->remote_rconn = remote_rconn; fail_open->lswitch = NULL; - return make_hook(fail_open_packet_cb, fail_open_periodic_cb, fail_open); + return make_hook(fail_open_packet_cb, fail_open_periodic_cb, NULL, + fail_open); +} + +struct rate_limiter { + const struct settings *s; + struct rconn *remote_rconn; + + /* One queue per physical port. */ + struct queue queues[OFPP_MAX]; + int n_queued; /* Sum over queues[*].n. */ + int next_tx_port; /* Next port to check in round-robin. */ + + /* Token bucket. + * + * It costs 1000 tokens to send a single packet_in message. A single token + * per message would be more straightforward, but this choice lets us avoid + * round-off error in refill_bucket()'s calculation of how many tokens to + * add to the bucket, since no division step is needed. */ + long long int last_fill; /* Time at which we last added tokens. */ + int tokens; /* Current number of tokens. */ + + /* Transmission queue. */ + int n_txq; /* No. of packets waiting in rconn for tx. */ +}; + +/* Drop a packet from the longest queue in 'rl'. */ +static void +drop_packet(struct rate_limiter *rl) +{ + struct queue *longest; /* Queue currently selected as longest. */ + int n_longest; /* # of queues of same length as 'longest'. */ + struct queue *q; + + longest = &rl->queues[0]; + n_longest = 1; + for (q = &rl->queues[0]; q < &rl->queues[OFPP_MAX]; q++) { + if (longest->n < q->n) { + longest = q; + n_longest = 1; + } else if (longest->n == q->n) { + n_longest++; + + /* Randomly select one of the longest queues, with a uniform + * distribution (Knuth algorithm 3.4.2R). */ + if (!random_range(n_longest)) { + longest = q; + } + } + } + + /* FIXME: do we want to pop the tail instead? */ + buffer_delete(queue_pop_head(longest)); + rl->n_queued--; +} + +/* Remove and return the next packet to transmit (in round-robin order). */ +static struct buffer * +dequeue_packet(struct rate_limiter *rl) +{ + unsigned int i; + + for (i = 0; i < OFPP_MAX; i++) { + unsigned int port = (rl->next_tx_port + i) % OFPP_MAX; + struct queue *q = &rl->queues[port]; + if (q->n) { + rl->next_tx_port = (port + 1) % OFPP_MAX; + rl->n_queued--; + return queue_pop_head(q); + } + } + NOT_REACHED(); +} + +/* Add tokens to the bucket based on elapsed time. */ +static void +refill_bucket(struct rate_limiter *rl) +{ + const struct settings *s = rl->s; + long long int now = time_msec(); + long long int tokens = (now - rl->last_fill) * s->rate_limit + rl->tokens; + if (tokens >= 1000) { + rl->last_fill = now; + rl->tokens = MIN(tokens, s->burst_limit * 1000); + } +} + +/* Attempts to remove enough tokens from 'rl' to transmit a packet. Returns + * true if successful, false otherwise. (In the latter case no tokens are + * removed.) */ +static bool +get_token(struct rate_limiter *rl) +{ + if (rl->tokens >= 1000) { + rl->tokens -= 1000; + return true; + } else { + return false; + } +} + +static bool +rate_limit_packet_cb(struct relay *r, int half, void *rl_) +{ + struct rate_limiter *rl = rl_; + const struct settings *s = rl->s; + struct buffer *msg = r->halves[HALF_LOCAL].rxbuf; + struct ofp_header *oh; + + if (half == HALF_REMOTE) { + return false; + } + + oh = msg->data; + if (oh->type != OFPT_PACKET_IN) { + return false; + } + if (msg->size < offsetof(struct ofp_packet_in, data)) { + VLOG_WARN("packet too short (%zu bytes) for packet_in", msg->size); + return false; + } + + if (!rl->n_queued && get_token(rl)) { + /* In the common case where we are not constrained by the rate limit, + * let the packet take the normal path. */ + return false; + } else { + /* Otherwise queue it up for the periodic callback to drain out. */ + struct ofp_packet_in *opi = msg->data; + int port = ntohs(opi->in_port) % OFPP_MAX; + if (rl->n_queued >= s->burst_limit) { + drop_packet(rl); + } + queue_push_tail(&rl->queues[port], buffer_clone(msg)); + rl->n_queued++; + return true; + } +} + +static void +rate_limit_periodic_cb(void *rl_) +{ + struct rate_limiter *rl = rl_; + int i; + + /* Drain some packets out of the bucket if possible, but limit the number + * of iterations to allow other code to get work done too. */ + refill_bucket(rl); + for (i = 0; rl->n_queued && get_token(rl) && i < 50; i++) { + /* Use a small, arbitrary limit for the amount of queuing to do here, + * because the TCP connection is responsible for buffering and there is + * no point in trying to transmit faster than the TCP connection can + * handle. */ + struct buffer *b = dequeue_packet(rl); + rconn_send_with_limit(rl->remote_rconn, b, &rl->n_txq, 10); + } +} + +static void +rate_limit_wait_cb(void *rl_) +{ + struct rate_limiter *rl = rl_; + if (rl->n_queued) { + if (rl->tokens >= 1000) { + /* We can transmit more packets as soon as we're called again. */ + poll_immediate_wake(); + } else { + /* We have to wait for the bucket to re-fill. We could calculate + * the exact amount of time here for increased smoothness. */ + poll_timer_wait(TIME_UPDATE_INTERVAL / 2); + } + } +} + +static struct hook +rate_limit_hook_create(const struct settings *s, + struct rconn *local, + struct rconn *remote) +{ + struct rate_limiter *rl; + size_t i; + + rl = xcalloc(1, sizeof *rl); + rl->s = s; + rl->remote_rconn = remote; + for (i = 0; i < ARRAY_SIZE(rl->queues); i++) { + queue_init(&rl->queues[i]); + } + rl->last_fill = time_msec(); + rl->tokens = s->rate_limit * 100; + return make_hook(rate_limit_packet_cb, rate_limit_periodic_cb, + rate_limit_wait_cb, rl); } /* Controller discovery. */ @@ -799,7 +1010,9 @@ parse_options(int argc, char *argv[], struct settings *s) OPT_NO_RESOLV_CONF, OPT_INACTIVITY_PROBE, OPT_MAX_IDLE, - OPT_MAX_BACKOFF + OPT_MAX_BACKOFF, + OPT_RATE_LIMIT, + OPT_BURST_LIMIT }; static struct option long_options[] = { {"accept-vconn", required_argument, 0, OPT_ACCEPT_VCONN}, @@ -809,6 +1022,8 @@ parse_options(int argc, char *argv[], struct settings *s) {"max-idle", required_argument, 0, OPT_MAX_IDLE}, {"max-backoff", required_argument, 0, OPT_MAX_BACKOFF}, {"listen", required_argument, 0, 'l'}, + {"rate-limit", optional_argument, 0, OPT_RATE_LIMIT}, + {"burst-limit", required_argument, 0, OPT_BURST_LIMIT}, {"detach", no_argument, 0, 'D'}, {"pidfile", optional_argument, 0, 'P'}, {"verbose", optional_argument, 0, 'v'}, @@ -828,6 +1043,8 @@ parse_options(int argc, char *argv[], struct settings *s) s->probe_interval = 15; s->max_backoff = 15; s->update_resolv_conf = true; + s->rate_limit = 0; + s->burst_limit = 0; for (;;) { int c; @@ -884,6 +1101,24 @@ parse_options(int argc, char *argv[], struct settings *s) } break; + case OPT_RATE_LIMIT: + if (optarg) { + s->rate_limit = atoi(optarg); + if (s->rate_limit < 1) { + fatal(0, "--rate-limit argument must be at least 1"); + } + } else { + s->rate_limit = 1000; + } + break; + + case OPT_BURST_LIMIT: + s->burst_limit = atoi(optarg); + if (s->burst_limit < 1) { + fatal(0, "--burst-limit argument must be at least 1"); + } + break; + case 'D': set_detach(); break; @@ -977,6 +1212,22 @@ parse_options(int argc, char *argv[], struct settings *s) netdev_close(netdev); } + + /* Rate limiting. */ + if (s->rate_limit) { + if (s->rate_limit < 100) { + VLOG_WARN("Rate limit set to unusually low value %d", + s->rate_limit); + } + + if (!s->burst_limit) { + s->burst_limit = s->rate_limit * 2; + } else if (s->burst_limit < s->rate_limit) { + VLOG_WARN("Burst limit (%d) set lower than rate limit (%d)", + s->burst_limit, s->rate_limit); + } + s->burst_limit = MIN(s->burst_limit, INT_MAX / 1000); + } } static void @@ -1002,6 +1253,9 @@ usage(void) " attempts (default: 15 seconds)\n" " -l, --listen=METHOD allow management connections on METHOD\n" " (a passive OpenFlow connection method)\n" + "\nRate-limiting of \"packet-in\" messages to the controller:\n" + " --rate-limit[=PACKETS] max rate, in packets/s (default: 1000)\n" + " --burst-limit=BURST limit on packet credit for idle time\n" "\nOther options:\n" " -D, --detach run in background as daemon\n" " -P, --pidfile[=FILE] create pidfile (default: %s/secchan.pid)\n" -- 2.30.2