From: Ben Pfaff Date: Mon, 14 Jul 2008 20:43:17 +0000 (-0700) Subject: rconn: Rewrite to use explicit state machine. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b8769714c15d042e3db8e04af2713bf16a82faa2;p=openvswitch rconn: Rewrite to use explicit state machine. --- diff --git a/include/rconn.h b/include/rconn.h index 8f3b0acf..1872aa9f 100644 --- a/include/rconn.h +++ b/include/rconn.h @@ -57,6 +57,12 @@ struct rconn *rconn_new(const char *name, int txq_limit, int inactivity_probe_interval, int max_backoff); struct rconn *rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *); +struct rconn *rconn_create(int txq_limit, int inactivity_probe_interval, + int max_backoff); +void rconn_connect(struct rconn *, const char *name); +void rconn_connect_unreliably(struct rconn *, + const char *name, struct vconn *vconn); +void rconn_disconnect(struct rconn *); void rconn_destroy(struct rconn *); void rconn_run(struct rconn *); diff --git a/lib/rconn.c b/lib/rconn.c index c410e736..bb7d3fe1 100644 --- a/lib/rconn.c +++ b/lib/rconn.c @@ -1,6 +1,6 @@ /* Copyright (c) 2008 The Board of Trustees of The Leland Stanford * Junior University - * + * * We are making the OpenFlow specification and associated documentation * (Software) available for public use and benefit with the expectation * that others will use, modify and enhance the Software and contribute @@ -13,10 +13,10 @@ * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: - * + * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. - * + * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND @@ -25,7 +25,7 @@ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. - * + * * The name and trademarks of copyright holder(s) may NOT be used in * advertising or publicity pertaining to the Software or any * derivatives without specific, written prior permission. @@ -34,6 +34,7 @@ #include "rconn.h" #include #include +#include #include #include #include "buffer.h" @@ -46,20 +47,50 @@ #define THIS_MODULE VLM_rconn #include "vlog.h" +#define STATES \ + STATE(VOID, 1 << 0) \ + STATE(BACKOFF, 1 << 1) \ + STATE(CONNECTING, 1 << 2) \ + STATE(ACTIVE, 1 << 3) \ + STATE(IDLE, 1 << 4) +enum state { +#define STATE(NAME, VALUE) S_##NAME = VALUE, + STATES +#undef STATE +}; + +static const char * +state_name(enum state state) +{ + switch (state) { +#define STATE(NAME, VALUE) case S_##NAME: return #NAME; + STATES +#undef STATE + } + return "***ERROR***"; +} + /* A reliable connection to an OpenFlow switch or controller. * * See the large comment in rconn.h for more information. */ struct rconn { - bool reliable; - char *name; + enum state state; + time_t state_entered; + unsigned int min_timeout; + struct vconn *vconn; - bool connected; + char *name; + bool reliable; + struct queue txq; int txq_limit; - time_t backoff_deadline; + int backoff; int max_backoff; + time_t backoff_deadline; + time_t last_received; time_t last_connected; + unsigned int packets_sent; /* Throughout this file, "probe" is shorthand for "inactivity probe". @@ -67,19 +98,38 @@ struct rconn { * an echo request as an inactivity probe packet. We should receive back * a response. */ int probe_interval; /* Secs of inactivity before sending probe. */ - time_t probe_sent; /* Time at which last probe sent, or 0 if none - * has been sent since 'last_connected'. */ }; -static struct rconn *create_rconn(const char *name, int txq_limit, - int probe_interval, int max_backoff, - struct vconn *); +static unsigned int sat_add(unsigned int x, unsigned int y); +static unsigned int sat_mul(unsigned int x, unsigned int y); +static unsigned int elapsed_in_this_state(const struct rconn *); +static bool timeout(struct rconn *, unsigned int secs); +static void state_transition(struct rconn *, enum state); static int try_send(struct rconn *); +static void reconnect(struct rconn *); static void disconnect(struct rconn *, int error); -static time_t probe_deadline(const struct rconn *); -/* Creates and returns a new rconn that connects (and re-connects as necessary) - * to the vconn named 'name'. +/* Creates a new rconn, connects it (reliably) to 'name', and returns it. */ +struct rconn * +rconn_new(const char *name, int txq_limit, int inactivity_probe_interval, + int max_backoff) +{ + struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval, + max_backoff); + rconn_connect(rc, name); + return rc; +} + +/* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */ +struct rconn * +rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) +{ + struct rconn *rc = rconn_create(txq_limit, 60, 0); + rconn_connect_unreliably(rc, name, vconn); + return rc; +} + +/* Creates and returns a new rconn. * * 'txq_limit' is the maximum length of the send queue, in packets. * @@ -94,21 +144,74 @@ static time_t probe_deadline(const struct rconn *); * failure until it reaches 'max_backoff'. If 0 is specified, the default of * 60 seconds is used. */ struct rconn * -rconn_new(const char *name, int txq_limit, int probe_interval, int max_backoff) +rconn_create(int txq_limit, int probe_interval, int max_backoff) { - return create_rconn(name, txq_limit, probe_interval, max_backoff, NULL); + struct rconn *rc = xcalloc(1, sizeof *rc); + + rc->state = S_VOID; + rc->state_entered = time(0); + rc->min_timeout = 0; + + rc->vconn = NULL; + rc->name = xstrdup("void"); + rc->reliable = false; + + queue_init(&rc->txq); + assert(txq_limit > 0); + rc->txq_limit = txq_limit; + + rc->backoff = 0; + rc->max_backoff = max_backoff ? max_backoff : 60; + rc->backoff_deadline = TIME_MIN; + rc->last_received = time(0); + rc->last_connected = time(0); + + rc->packets_sent = 0; + + rc->probe_interval = probe_interval ? MAX(5, probe_interval) : 0; + + return rc; } -/* Creates and returns a new rconn that is initially connected to 'vconn' and - * has the given 'name'. The rconn will not re-connect after the connection - * drops. - * - * 'txq_limit' is the maximum length of the send queue, in packets. */ -struct rconn * -rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) +void +rconn_connect(struct rconn *rc, const char *name) +{ + rconn_disconnect(rc); + free(rc->name); + rc->name = xstrdup(name); + rc->reliable = true; + reconnect(rc); +} + +void +rconn_connect_unreliably(struct rconn *rc, + const char *name, struct vconn *vconn) { assert(vconn != NULL); - return create_rconn(name, txq_limit, 0, 0, vconn); + rconn_disconnect(rc); + free(rc->name); + rc->name = xstrdup(name); + rc->reliable = false; + rc->vconn = vconn; + rc->last_connected = time(0); + state_transition(rc, S_ACTIVE); +} + +void +rconn_disconnect(struct rconn *rc) +{ + if (rc->vconn) { + vconn_close(rc->vconn); + rc->vconn = NULL; + } + free(rc->name); + rc->name = xstrdup("void"); + rc->reliable = false; + + rc->backoff = 0; + rc->backoff_deadline = TIME_MIN; + + state_transition(rc, S_VOID); } /* Disconnects 'rc' and frees the underlying storage. */ @@ -123,68 +226,118 @@ rconn_destroy(struct rconn *rc) } } +static void +run_VOID(struct rconn *rc) +{ + /* Nothing to do. */ +} + +static void +reconnect(struct rconn *rc) +{ + int retval; + + VLOG_WARN("%s: connecting...", rc->name); + retval = vconn_open(rc->name, &rc->vconn); + if (!retval) { + rc->backoff_deadline = time(0) + rc->backoff; + state_transition(rc, S_CONNECTING); + } else { + VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval)); + disconnect(rc, 0); + } +} + +static void +run_BACKOFF(struct rconn *rc) +{ + if (timeout(rc, rc->backoff)) { + reconnect(rc); + } +} + +static void +run_CONNECTING(struct rconn *rc) +{ + int error = vconn_connect(rc->vconn); + if (!error) { + VLOG_WARN("%s: connected", rc->name); + if (vconn_is_passive(rc->vconn)) { + fatal(0, "%s: passive vconn not supported in switch", + rc->name); + } + state_transition(rc, S_ACTIVE); + rc->last_connected = rc->state_entered; + } else if (error != EAGAIN) { + VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(error)); + disconnect(rc, error); + } else if (timeout(rc, MAX(1, rc->backoff))) { + VLOG_WARN("%s: connection timed out", rc->name); + rc->backoff_deadline = TIME_MAX; /* Prevent resetting backoff. */ + disconnect(rc, 0); + } +} + +static void +do_tx_work(struct rconn *rc) +{ + while (rc->txq.n > 0) { + int error = try_send(rc); + if (error) { + break; + } + } +} + +static void +run_ACTIVE(struct rconn *rc) +{ + if (rc->probe_interval) { + unsigned int base = MAX(rc->last_received, rc->state_entered); + unsigned int arg = base + rc->probe_interval - rc->state_entered; + if (timeout(rc, arg)) { + queue_push_tail(&rc->txq, make_echo_request()); + VLOG_DBG("%s: idle %u seconds, sending inactivity probe", + rc->name, (unsigned int) (time(0) - base)); + state_transition(rc, S_IDLE); + return; + } + } + + do_tx_work(rc); +} + +static void +run_IDLE(struct rconn *rc) +{ + if (timeout(rc, rc->probe_interval)) { + VLOG_ERR("%s: no response to inactivity probe after %u " + "seconds, disconnecting", + rc->name, elapsed_in_this_state(rc)); + disconnect(rc, 0); + } else { + do_tx_work(rc); + } +} + /* Performs whatever activities are necessary to maintain 'rc': if 'rc' is * disconnected, attempts to (re)connect, backing off as necessary; if 'rc' is * connected, attempts to send packets in the send queue, if any. */ void rconn_run(struct rconn *rc) { - if (!rc->vconn) { - if (rc->reliable && time(0) >= rc->backoff_deadline) { - int retval; - - VLOG_WARN("%s: connecting...", rc->name); - retval = vconn_open(rc->name, &rc->vconn); - if (!retval) { - rc->backoff_deadline = time(0) + rc->backoff; - rc->connected = false; - } else { - VLOG_WARN("%s: connection failed (%s)", - rc->name, strerror(retval)); - disconnect(rc, 0); - } + int old_state; + do { + old_state = rc->state; + rc->min_timeout = UINT_MAX; + switch (rc->state) { +#define STATE(NAME, VALUE) case S_##NAME: run_##NAME(rc); break; + STATES +#undef STATE + default: + NOT_REACHED(); } - } else if (!rc->connected) { - int error = vconn_connect(rc->vconn); - if (!error) { - VLOG_WARN("%s: connected", rc->name); - if (vconn_is_passive(rc->vconn)) { - fatal(0, "%s: passive vconn not supported in switch", - rc->name); - } - rc->connected = true; - } else if (error != EAGAIN) { - VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(error)); - disconnect(rc, 0); - } else if (time(0) >= rc->backoff_deadline) { - VLOG_WARN("%s: connection timed out", rc->name); - rc->backoff_deadline = TIME_MAX; /* Prevent resetting backoff. */ - disconnect(rc, 0); - } - } else { - if (rc->probe_interval) { - time_t now = time(0); - if (now >= probe_deadline(rc)) { - if (!rc->probe_sent) { - queue_push_tail(&rc->txq, make_echo_request()); - rc->probe_sent = now; - VLOG_DBG("%s: idle %d seconds, sending inactivity probe", - rc->name, (int) (now - rc->last_connected)); - } else { - VLOG_ERR("%s: no response to inactivity probe after %d " - "seconds, disconnecting", - rc->name, (int) (now - rc->probe_sent)); - disconnect(rc, 0); - } - } - } - while (rc->txq.n > 0) { - int error = try_send(rc); - if (error) { - break; - } - } - } + } while (rc->state != old_state); } /* Causes the next call to poll_block() to wake up when rconn_run() should be @@ -192,27 +345,18 @@ rconn_run(struct rconn *rc) void rconn_run_wait(struct rconn *rc) { - if (rc->vconn) { - if (rc->txq.n) { - vconn_wait(rc->vconn, WAIT_SEND); - } - if (rc->probe_interval) { - poll_timer_wait((probe_deadline(rc) - time(0)) * 1000); - } - } else { - poll_timer_wait((rc->backoff_deadline - time(0)) * 1000); + if (rc->min_timeout != UINT_MAX) { + poll_timer_wait(sat_mul(rc->min_timeout, 1000)); } -} + /* Reset timeout to 1 second. This will have no effect ordinarily, because + * rconn_run() will typically set it back to a higher value. If, however, + * the caller fails to call rconn_run() before its next call to + * rconn_wait() we won't potentially block forever. */ + rc->min_timeout = 1; -/* Returns the time at which, should nothing be received, we should send out an - * inactivity probe (if none has yet been sent) or conclude that the connection - * is dead (if a probe has already been sent). */ -static time_t -probe_deadline(const struct rconn *rc) -{ - assert(rc->probe_interval); - return (rc->probe_interval - + (rc->probe_sent ? rc->probe_sent : rc->last_connected)); + if ((rc->state & (S_ACTIVE | S_IDLE)) && rc->txq.n) { + vconn_wait(rc->vconn, WAIT_SEND); + } } /* Attempts to receive a packet from 'rc'. If successful, returns the packet; @@ -221,15 +365,17 @@ probe_deadline(const struct rconn *rc) struct buffer * rconn_recv(struct rconn *rc) { - if (rc->vconn && rc->connected) { + if (rc->state & (S_ACTIVE | S_IDLE)) { struct buffer *buffer; int error = vconn_recv(rc->vconn, &buffer); if (!error) { - rc->last_connected = time(0); - rc->probe_sent = 0; + rc->last_received = time(0); + if (rc->state == S_IDLE) { + state_transition(rc, S_ACTIVE); + } return buffer; } else if (error != EAGAIN) { - disconnect(rc, error); + disconnect(rc, error); } } return NULL; @@ -238,7 +384,7 @@ rconn_recv(struct rconn *rc) /* Causes the next call to poll_block() to wake up when a packet may be ready * to be received by vconn_recv() on 'rc'. */ void -rconn_recv_wait(struct rconn *rc) +rconn_recv_wait(struct rconn *rc) { if (rc->vconn) { vconn_wait(rc->vconn, WAIT_RECV); @@ -291,7 +437,7 @@ rconn_force_send(struct rconn *rc, struct buffer *b) /* Returns true if 'rc''s send buffer is full, * false if it has room for at least one more packet. */ bool -rconn_is_full(const struct rconn *rc) +rconn_is_full(const struct rconn *rc) { return rc->txq.n >= rc->txq_limit; } @@ -300,22 +446,22 @@ rconn_is_full(const struct rconn *rc) * vconn. A packet is not counted as sent while it is still queued in the * rconn, only when it has been successfuly passed to the vconn. */ unsigned int -rconn_packets_sent(const struct rconn *rc) +rconn_packets_sent(const struct rconn *rc) { return rc->packets_sent; } /* Returns 'rc''s name (the 'name' argument passed to rconn_new()). */ const char * -rconn_get_name(const struct rconn *rc) +rconn_get_name(const struct rconn *rc) { return rc->name; } /* Returns true if 'rconn' is connected or in the process of reconnecting, - * false if 'rconn' is disconnected and will not be reconnected. */ + * false if 'rconn' is disconnected and will not reconnect on its own. */ bool -rconn_is_alive(const struct rconn *rconn) +rconn_is_alive(const struct rconn *rconn) { return rconn->reliable || rconn->vconn; } @@ -324,13 +470,13 @@ rconn_is_alive(const struct rconn *rconn) bool rconn_is_connected(const struct rconn *rconn) { - return rconn->vconn && !vconn_connect(rconn->vconn); + return rconn->state & (S_ACTIVE | S_IDLE); } /* Returns 0 if 'rconn' is connected, otherwise the number of seconds that it * has been disconnected. */ int -rconn_disconnected_duration(const struct rconn *rconn) +rconn_disconnected_duration(const struct rconn *rconn) { return rconn_is_connected(rconn) ? 0 : time(0) - rconn->last_connected; } @@ -343,29 +489,6 @@ rconn_get_ip(const struct rconn *rconn) return rconn->vconn ? vconn_get_ip(rconn->vconn) : 0; } -static struct rconn * -create_rconn(const char *name, int txq_limit, int probe_interval, - int max_backoff, struct vconn *vconn) -{ - struct rconn *rc = xmalloc(sizeof *rc); - assert(txq_limit > 0); - rc->reliable = vconn == NULL; - rc->connected = vconn != NULL; - rc->name = xstrdup(name); - rc->vconn = vconn; - queue_init(&rc->txq); - rc->txq_limit = txq_limit; - rc->backoff_deadline = time(0); - rc->backoff = 0; - rc->max_backoff = max_backoff ? max_backoff : 60; - rc->last_connected = time(0); - rc->probe_interval = (probe_interval - ? MAX(5, probe_interval) : 0); - rc->probe_sent = 0; - rc->packets_sent = 0; - return rc; -} - /* Tries to send a packet from 'rc''s send buffer. Returns 0 if successful, * otherwise a positive errno value. */ static int @@ -389,33 +512,71 @@ try_send(struct rconn *rc) * nonzero, then it should be EOF to indicate the connection was closed by the * peer in a normal fashion or a positive errno value. */ static void -disconnect(struct rconn *rc, int error) +disconnect(struct rconn *rc, int error) { - time_t now = time(0); - - if (rc->vconn) { - if (error > 0) { - VLOG_WARN("%s: connection dropped (%s)", - rc->name, strerror(error)); - } else if (error == EOF) { - if (rc->reliable) { - VLOG_WARN("%s: connection closed", rc->name); + if (rc->reliable) { + time_t now = time(0); + + if (rc->state & (S_CONNECTING | S_ACTIVE | S_IDLE)) { + if (error > 0) { + VLOG_WARN("%s: connection dropped (%s)", + rc->name, strerror(error)); + } else if (error == EOF) { + if (rc->reliable) { + VLOG_WARN("%s: connection closed", rc->name); + } + } else { + VLOG_WARN("%s: connection dropped", rc->name); } - } else { - VLOG_WARN("%s: connection dropped", rc->name); + vconn_close(rc->vconn); + rc->vconn = NULL; + queue_clear(&rc->txq); } - vconn_close(rc->vconn); - rc->vconn = NULL; - queue_clear(&rc->txq); - } - if (now >= rc->backoff_deadline) { - rc->backoff = 1; + if (now >= rc->backoff_deadline) { + rc->backoff = 1; + } else { + rc->backoff = MIN(rc->max_backoff, MAX(1, 2 * rc->backoff)); + VLOG_WARN("%s: waiting %d seconds before reconnect\n", + rc->name, rc->backoff); + } + rc->backoff_deadline = now + rc->backoff; + state_transition(rc, S_BACKOFF); } else { - rc->backoff = MIN(rc->max_backoff, MAX(1, 2 * rc->backoff)); - VLOG_WARN("%s: waiting %d seconds before reconnect\n", - rc->name, rc->backoff); + rconn_disconnect(rc); } - rc->backoff_deadline = now + rc->backoff; - rc->probe_sent = 0; +} + +static unsigned int +elapsed_in_this_state(const struct rconn *rc) +{ + return time(0) - rc->state_entered; +} + +static bool +timeout(struct rconn *rc, unsigned int secs) +{ + rc->min_timeout = MIN(rc->min_timeout, secs); + return time(0) >= sat_add(rc->state_entered, secs); +} + +static void +state_transition(struct rconn *rc, enum state state) +{ + VLOG_DBG("%s: entering %s", rc->name, state_name(state)); + rc->state = state; + rc->state_entered = time(0); +} + +static unsigned int +sat_add(unsigned int x, unsigned int y) +{ + return x + y >= x ? x + y : UINT_MAX; +} + +static unsigned int +sat_mul(unsigned int x, unsigned int y) +{ + assert(y); + return x <= UINT_MAX / y ? x * y : UINT_MAX; }