X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=lib%2Frconn.c;h=6bc371ad523468cc79b2b66ac1e394c5e07c6a25;hb=903d0940fba239b2e250e9ab5f4dfdf4ecaf4bc0;hp=0093c3ab5d4a05672c21a466956e9ea4589d425f;hpb=b3b28afb7bef9094d05fcc8c1be4a41f9f1d5bfe;p=openvswitch diff --git a/lib/rconn.c b/lib/rconn.c index 0093c3ab..6bc371ad 100644 --- a/lib/rconn.c +++ b/lib/rconn.c @@ -83,7 +83,6 @@ struct rconn { bool reliable; struct queue txq; - int txq_limit; int backoff; int max_backoff; @@ -119,31 +118,28 @@ static void state_transition(struct rconn *, enum state); static int try_send(struct rconn *); static int reconnect(struct rconn *); static void disconnect(struct rconn *, int error); +static void flush_queue(struct rconn *); static void question_connectivity(struct rconn *); /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */ struct rconn * -rconn_new(const char *name, int txq_limit, int inactivity_probe_interval, - int max_backoff) +rconn_new(const char *name, int inactivity_probe_interval, int max_backoff) { - struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval, - max_backoff); + struct rconn *rc = rconn_create(inactivity_probe_interval, max_backoff); rconn_connect(rc, name); return rc; } /* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */ struct rconn * -rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) +rconn_new_from_vconn(const char *name, struct vconn *vconn) { - struct rconn *rc = rconn_create(txq_limit, 60, 0); + struct rconn *rc = rconn_create(60, 0); rconn_connect_unreliably(rc, name, vconn); return rc; } /* Creates and returns a new rconn. - * - * 'txq_limit' is the maximum length of the send queue, in packets. * * 'probe_interval' is a number of seconds. If the interval passes once * without an OpenFlow message being received from the peer, the rconn sends @@ -156,7 +152,7 @@ rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) * failure until it reaches 'max_backoff'. If 0 is specified, the default of * 60 seconds is used. */ struct rconn * -rconn_create(int txq_limit, int probe_interval, int max_backoff) +rconn_create(int probe_interval, int max_backoff) { struct rconn *rc = xcalloc(1, sizeof *rc); @@ -168,8 +164,6 @@ rconn_create(int txq_limit, int probe_interval, int max_backoff) rc->reliable = false; queue_init(&rc->txq); - assert(txq_limit > 0); - rc->txq_limit = txq_limit; rc->backoff = 0; rc->max_backoff = max_backoff ? max_backoff : 60; @@ -235,6 +229,7 @@ rconn_destroy(struct rconn *rc) if (rc) { free(rc->name); vconn_close(rc->vconn); + flush_queue(rc); queue_destroy(&rc->txq); free(rc); } @@ -435,55 +430,59 @@ rconn_recv_wait(struct rconn *rc) } } -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if at least 'txq_limit' - * packets are already queued, otherwise a positive errno value. */ +/* Sends 'b' on 'rc'. Returns 0 if successful (in which case 'b' is + * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case + * the caller retains ownership of 'b'). + * + * If 'n_queued' is non-null, then '*n_queued' will be incremented while the + * packet is in flight, then decremented when it has been sent (or discarded + * due to disconnection). Because 'b' may be sent (or discarded) before this + * function returns, the caller may not be able to observe any change in + * '*n_queued'. + * + * There is no rconn_send_wait() function: an rconn has a send queue that it + * takes care of sending if you call rconn_run(), which will have the side + * effect of waking up poll_block(). */ int -do_send(struct rconn *rc, struct buffer *b, int txq_limit) +rconn_send(struct rconn *rc, struct buffer *b, int *n_queued) { if (rc->vconn) { - if (rc->txq.n < txq_limit) { - queue_push_tail(&rc->txq, b); - if (rc->txq.n == 1) { - try_send(rc); - } - return 0; - } else { - return EAGAIN; + b->private = n_queued; + if (n_queued) { + ++*n_queued; } + queue_push_tail(&rc->txq, b); + if (rc->txq.n == 1) { + try_send(rc); + } + return 0; } else { return ENOTCONN; } } -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if the send queue is - * full, or ENOTCONN if 'rc' is not currently connected. +/* Sends 'b' on 'rc'. Increments '*n_queued' while the packet is in flight; it + * will be decremented when it has been sent (or discarded due to + * disconnection). Returns 0 if successful, EAGAIN if '*n_queued' is already + * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently + * connected. Regardless of return value, 'b' is destroyed. + * + * Because 'b' may be sent (or discarded) before this function returns, the + * caller may not be able to observe any change in '*n_queued'. * * There is no rconn_send_wait() function: an rconn has a send queue that it * takes care of sending if you call rconn_run(), which will have the side * effect of waking up poll_block(). */ int -rconn_send(struct rconn *rc, struct buffer *b) -{ - return do_send(rc, b, rc->txq_limit); -} - -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if the send queue is - * full, otherwise a positive errno value. - * - * Compared to rconn_send(), this function relaxes the queue limit, allowing - * more packets than usual to be queued. */ -int -rconn_force_send(struct rconn *rc, struct buffer *b) +rconn_send_with_limit(struct rconn *rc, struct buffer *b, + int *n_queued, int queue_limit) { - return do_send(rc, b, 2 * rc->txq_limit); -} - -/* Returns true if 'rc''s send buffer is full, - * false if it has room for at least one more packet. */ -bool -rconn_is_full(const struct rconn *rc) -{ - return rc->txq.n >= rc->txq_limit; + int retval; + retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued); + if (retval) { + buffer_delete(b); + } + return retval; } /* Returns the total number of packets successfully sent on the underlying @@ -556,6 +555,7 @@ try_send(struct rconn *rc) { int retval = 0; struct buffer *next = rc->txq.head->next; + int *n_queued = rc->txq.head->private; retval = vconn_send(rc->vconn, rc->txq.head); if (retval) { if (retval != EAGAIN) { @@ -564,6 +564,9 @@ try_send(struct rconn *rc) return retval; } rc->packets_sent++; + if (n_queued) { + --*n_queued; + } queue_advance_head(&rc->txq, next); return 0; } @@ -590,7 +593,7 @@ disconnect(struct rconn *rc, int error) } vconn_close(rc->vconn); rc->vconn = NULL; - queue_clear(&rc->txq); + flush_queue(rc); } if (now >= rc->backoff_deadline) { @@ -610,6 +613,21 @@ disconnect(struct rconn *rc, int error) } } +/* Drops all the packets from 'rc''s send queue and decrements their queue + * counts. */ +static void +flush_queue(struct rconn *rc) +{ + while (rc->txq.n > 0) { + struct buffer *b = queue_pop_head(&rc->txq); + int *n_queued = b->private; + if (n_queued) { + --*n_queued; + } + buffer_delete(b); + } +} + static unsigned int elapsed_in_this_state(const struct rconn *rc) {