#include "mac.h"
#include "ofp-print.h"
#include "openflow.h"
+#include "poll-loop.h"
#include "time.h"
#include "util.h"
#include "vconn-ssl.h"
struct switch_ {
char *name;
struct vconn *vconn;
- struct pollfd *pollfd;
uint64_t datapath_id;
time_t last_control_hello;
main(int argc, char *argv[])
{
struct switch_ *switches[MAX_SWITCHES];
- struct pollfd pollfds[MAX_SWITCHES + 1];
- struct vlog_server *vlog_server;
int n_switches;
int retval;
int i;
fatal(0, "at least one vconn argument required; use --help for usage");
}
- retval = vlog_server_listen(NULL, &vlog_server);
+ retval = vlog_server_listen(NULL, NULL);
if (retval) {
fatal(retval, "Could not listen for vlog connections");
}
if (n_switches == 0) {
fatal(0, "could not connect to any switches");
}
-
- while (n_switches > 0) {
- size_t n_ready;
- int retval;
-
- /* Wait until there's something to do. */
- n_ready = 0;
- for (i = 0; i < n_switches; i++) {
- struct switch_ *this = switches[i];
- int want;
-
- if (vconn_is_passive(this->vconn)) {
- want = n_switches < MAX_SWITCHES ? WANT_ACCEPT : 0;
- } else {
- want = WANT_RECV;
- if (this->n_txq) {
- want |= WANT_SEND;
- }
- }
-
- this->pollfd = &pollfds[i];
- this->pollfd->fd = -1;
- this->pollfd->events = 0;
- n_ready += vconn_prepoll(this->vconn, want, this->pollfd);
- }
- if (vlog_server) {
- pollfds[n_switches].fd = vlog_server_get_fd(vlog_server);
- pollfds[n_switches].events = POLLIN;
- }
- do {
- retval = poll(pollfds, n_switches + (vlog_server != NULL),
- n_ready ? 0 : -1);
- } while (retval < 0 && errno == EINTR);
- if (retval < 0 || (retval == 0 && !n_ready)) {
- fatal(retval < 0 ? errno : 0, "poll");
- }
-
- /* Let each connection deal with any pending operations. */
- for (i = 0; i < n_switches; i++) {
- struct switch_ *this = switches[i];
- vconn_postpoll(this->vconn, &this->pollfd->revents);
- if (this->pollfd->revents & POLLERR) {
- this->pollfd->revents |= POLLIN | POLLOUT;
- }
- }
- if (vlog_server && pollfds[n_switches].revents) {
- vlog_server_poll(vlog_server);
- }
- for (i = 0; i < n_switches; ) {
- struct switch_ *this = switches[i];
+ while (n_switches > 0) {
+ /* Do some work. Limit the number of iterations so that callbacks
+ * registered with the poll loop don't starve. */
+ int iteration;
+ int i;
+ for (iteration = 0; iteration < 50; iteration++) {
+ bool progress = false;
+ for (i = 0; i < n_switches; ) {
+ struct switch_ *this = switches[i];
+ int retval;
- if (this->pollfd) {
- retval = 0;
if (vconn_is_passive(this->vconn)) {
- if (this->pollfd->revents & POLLIN) {
+ retval = 0;
+ while (n_switches < MAX_SWITCHES) {
struct vconn *new_vconn;
- while (n_switches < MAX_SWITCHES
- && (retval = vconn_accept(this->vconn,
- &new_vconn)) == 0) {
- switches[n_switches++] = new_switch("tcp",
- new_vconn);
+ retval = vconn_accept(this->vconn, &new_vconn);
+ if (retval) {
+ break;
}
+ switches[n_switches++] = new_switch("tcp", new_vconn);
}
} else {
- bool may_read = this->pollfd->revents & POLLIN;
- bool may_write = this->pollfd->revents & POLLOUT;
- if (may_read) {
- retval = do_switch_recv(this);
- if (!retval || retval == EAGAIN) {
- retval = 0;
-
- /* Enable writing to avoid round trip through poll
- * in common case. */
- may_write = true;
- }
- }
- while ((!retval || retval == EAGAIN) && may_write) {
- retval = do_switch_send(this);
- may_write = !retval;
+ retval = do_switch_recv(this);
+ if (!retval || retval == EAGAIN) {
+ do {
+ retval = do_switch_send(this);
+ if (!retval) {
+ progress = true;
+ }
+ } while (!retval);
}
}
if (retval && retval != EAGAIN) {
close_switch(this);
switches[i] = switches[--n_switches];
- continue;
+ } else {
+ i++;
+ }
+ }
+ if (!progress) {
+ break;
+ }
+ }
+
+ /* Wait for something to happen. */
+ for (i = 0; i < n_switches; i++) {
+ struct switch_ *this = switches[i];
+ if (vconn_is_passive(this->vconn)) {
+ if (n_switches < MAX_SWITCHES) {
+ vconn_accept_wait(this->vconn);
}
} else {
- /* New switch that hasn't been polled yet. */
+ vconn_recv_wait(this->vconn);
+ if (this->n_txq) {
+ vconn_send_wait(this->vconn);
+ }
}
- i++;
}
+ poll_block();
}
return 0;
memset(this, 0, sizeof *this);
this->name = xstrdup(name);
this->vconn = vconn;
- this->pollfd = NULL;
this->n_txq = 0;
this->txq = NULL;
this->tx_tail = NULL;
ip.h \
list.h \
mac.h \
- Makefile.am \
netlink.h \
ofp-print.h \
openflow.h \
openflow-netlink.h \
packets.h \
+ poll-loop.h \
socket-util.h \
util.h \
vconn.h \
--- /dev/null
+/* Copyright (C) 2008 Board of Trustees, Leland Stanford Jr. University.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#ifndef POLL_LOOP_H
+#define POLL_LOOP_H 1
+
+#include <poll.h>
+
+typedef void poll_fd_func(int fd, short int revents, void *aux);
+
+struct poll_waiter *poll_fd_callback(int fd, short int events,
+ poll_fd_func *, void *aux);
+struct poll_waiter *poll_fd_wait(int fd, short int events, short int *revents);
+void poll_cancel(struct poll_waiter *);
+
+void poll_immediate_wake(void);
+void poll_timer_wait(int msec);
+void poll_block(void);
+
+#endif /* poll-loop.h */
int set_nonblocking(int fd);
int lookup_ip(const char *host_name, struct in_addr *address);
+int get_socket_error(int sock);
+int check_connection_completion(int fd);
#endif /* socket-util.h */
/* Virtual connection to an OpenFlow device. */
struct vconn {
struct vconn_class *class;
-};
-
-/* What kind of operation do we want to perform? */
-enum {
- WANT_ACCEPT = 1 << 0, /* Want to accept a new connection. */
- WANT_RECV = 1 << 1, /* Want to receive a message. */
- WANT_SEND = 1 << 2 /* Want to send a message. */
+ int connect_status;
};
int vconn_open(const char *name, struct vconn **);
void vconn_close(struct vconn *);
bool vconn_is_passive(const struct vconn *);
-bool vconn_prepoll(struct vconn *, int want, struct pollfd *);
-void vconn_postpoll(struct vconn *, short int *revents);
+int vconn_connect(struct vconn *);
int vconn_accept(struct vconn *, struct vconn **);
int vconn_recv(struct vconn *, struct buffer **);
int vconn_send(struct vconn *, struct buffer *);
-int vconn_send_wait(struct vconn *, struct buffer *);
+
+int vconn_open_block(const char *name, struct vconn **);
+int vconn_send_block(struct vconn *, struct buffer *);
+
+enum vconn_wait_type {
+ WAIT_CONNECT,
+ WAIT_ACCEPT,
+ WAIT_RECV,
+ WAIT_SEND
+};
+void vconn_wait(struct vconn *, enum vconn_wait_type);
+void vconn_connect_wait(struct vconn *);
+void vconn_accept_wait(struct vconn *);
+void vconn_recv_wait(struct vconn *);
+void vconn_send_wait(struct vconn *);
struct buffer *make_add_simple_flow(const struct flow *,
uint32_t buffer_id, uint16_t out_port);
/* Attempts to connect to an OpenFlow device. 'name' is the full
* connection name provided by the user, e.g. "nl:0", "tcp:1.2.3.4". This
* name is useful for error messages but must not be modified.
- *
+ *
* 'suffix' is a copy of 'name' following the colon and may be modified.
*
* Returns 0 if successful, otherwise a positive errno value. If
- * successful, stores a pointer to the new connection in '*vconnp'. */
+ * successful, stores a pointer to the new connection in '*vconnp'.
+ *
+ * The open function must not block waiting for a connection to complete.
+ * If the connection cannot be completed immediately, it should return
+ * EAGAIN (not EINPROGRESS, as returned by the connect system call) and
+ * continue the connection in the background. */
int (*open)(const char *name, char *suffix, struct vconn **vconnp);
/* Closes 'vconn' and frees associated memory. */
void (*close)(struct vconn *vconn);
- /* Called by the main loop before calling poll(), this function must
- * initialize 'pfd->fd' and 'pfd->events' appropriately so that poll() will
- * wake up when the connection becomes available for the operations
- * specified in 'want'. The prepoll function may also set bits in 'pfd' to
- * allow for internal processing.
- *
- * Should return false normally. May return true to indicate that no
- * blocking should happen in poll() because the connection is available for
- * some operation specified in 'want' but that status cannot be detected
- * via poll() and thus poll() could block forever otherwise. */
- bool (*prepoll)(struct vconn *, int want, struct pollfd *pfd);
-
- /* Called by the main loop after calling poll(), this function may perform
- * any internal processing needed by the connection. It is provided with
- * the vconn file descriptor's status in '*revents', as reported by poll().
+ /* Tries to complete the connection on 'vconn', which must be an active
+ * vconn. If 'vconn''s connection is complete, returns 0 if the connection
+ * was successful or a positive errno value if it failed. If the
+ * connection is still in progress, returns EAGAIN.
*
- * The postpoll function should adjust '*revents' to reflect the status of
- * the connection from the caller's point of view: that is, upon return
- * '*revents & POLLIN' should indicate that a packet is (potentially) ready
- * to be read (for an active vconn) or a new connection is ready to be
- * accepted (for a passive vconn) and '*revents & POLLOUT' should indicate
- * that a packet is (potentially) ready to be written.
- *
- * This function may be a null pointer in a vconn class that has no use for
- * it, that is, if the vconn does not need to do any internal processing
- * and poll's revents out properly reflects the vconn's status. */
- void (*postpoll)(struct vconn *, short int *revents);
+ * The connect function must not block waiting for the connection to
+ * complete; instead, it should return EAGAIN immediately. */
+ int (*connect)(struct vconn *vconn);
/* Tries to accept a new connection on 'vconn', which must be a passive
* vconn. If successful, stores the new connection in '*new_vconnp' and
* The accept function must not block waiting for a connection. If no
* connection is ready to be accepted, it should return EAGAIN.
*
- * Nonnull iff this is a passive vconn (one that accepts connection and
+ * Nonnull iff this is a passive vconn (one that accepts connections and
* does not transfer data). */
int (*accept)(struct vconn *vconn, struct vconn **new_vconnp);
* Nonnull iff this is an active vconn (one that transfers data and does
* not accept connections). */
int (*send)(struct vconn *vconn, struct buffer *msg);
+
+ void (*wait)(struct vconn *vconn, enum vconn_wait_type);
};
extern struct vconn_class tcp_vconn_class;
struct vlog_server;
int vlog_server_listen(const char *path, struct vlog_server **);
void vlog_server_close(struct vlog_server *);
-int vlog_server_get_fd(const struct vlog_server *);
-void vlog_server_poll(struct vlog_server *);
/* Client for Vlog control connection. */
struct vlog_client;
/* Modules that can emit log messages. */
#define VLOG_MODULES \
+ VLOG_MODULE(chain) \
VLOG_MODULE(controller) \
+ VLOG_MODULE(controller_connection) \
VLOG_MODULE(ctlpath) \
+ VLOG_MODULE(datapath) \
VLOG_MODULE(dpif) \
VLOG_MODULE(dpctl) \
VLOG_MODULE(fault) \
VLOG_MODULE(flow) \
+ VLOG_MODULE(netdev) \
VLOG_MODULE(netlink) \
+ VLOG_MODULE(poll_loop) \
VLOG_MODULE(secchan) \
+ VLOG_MODULE(switch) \
VLOG_MODULE(socket_util) \
VLOG_MODULE(vconn_netlink) \
VLOG_MODULE(vconn_tcp) \
hash.c \
list.c \
ofp-print.c \
+ poll-loop.c \
socket-util.c \
util.c \
vconn-tcp.c \
--- /dev/null
+/* Copyright (C) 2008 Board of Trustees, Leland Stanford Jr. University.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "poll-loop.h"
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include "list.h"
+
+#define THIS_MODULE VLM_poll_loop
+#include "vlog.h"
+
+struct poll_waiter {
+ struct list node;
+ int fd;
+ short int events;
+ struct pollfd *pollfd;
+
+ short int *revents;
+
+ poll_fd_func *function;
+ void *aux;
+};
+
+static struct list waiters = LIST_INITIALIZER(&waiters);
+static size_t n_waiters;
+static int timeout = -1;
+
+#ifndef NDEBUG
+static struct poll_waiter *running_cb;
+#endif
+
+static struct poll_waiter *
+new_waiter(int fd, short int events)
+{
+ struct poll_waiter *waiter = xcalloc(1, sizeof *waiter);
+ assert(fd >= 0);
+ waiter->fd = fd;
+ waiter->events = events;
+ list_push_back(&waiters, &waiter->node);
+ n_waiters++;
+ return waiter;
+}
+
+struct poll_waiter *
+poll_fd_callback(int fd, short int events, poll_fd_func *function, void *aux)
+{
+ struct poll_waiter *pw = new_waiter(fd, events);
+ pw->function = function;
+ pw->aux = aux;
+ return pw;
+}
+
+struct poll_waiter *
+poll_fd_wait(int fd, short int events, short int *revents)
+{
+ struct poll_waiter *pw = new_waiter(fd, events);
+ pw->revents = revents;
+ if (revents) {
+ *revents = 0;
+ }
+ return pw;
+}
+
+void
+poll_cancel(struct poll_waiter *pw)
+{
+ if (pw) {
+ assert(pw != running_cb);
+ list_remove(&pw->node);
+ n_waiters--;
+ }
+}
+
+void
+poll_immediate_wake(void)
+{
+ timeout = 0;
+}
+
+void
+poll_timer_wait(int msec)
+{
+ if (timeout < 0 || msec < timeout) {
+ timeout = MAX(0, msec);
+ }
+}
+
+void
+poll_block(void)
+{
+ static struct pollfd *pollfds;
+ static size_t max_pollfds;
+
+ struct poll_waiter *pw;
+ struct list *node;
+ int n_pollfds;
+ int retval;
+
+ assert(!running_cb);
+ if (max_pollfds < n_waiters) {
+ max_pollfds = n_waiters;
+ pollfds = xrealloc(pollfds, max_pollfds * sizeof *pollfds);
+ }
+
+ n_pollfds = 0;
+ LIST_FOR_EACH (pw, struct poll_waiter, node, &waiters) {
+ pw->pollfd = &pollfds[n_pollfds];
+ pollfds[n_pollfds].fd = pw->fd;
+ pollfds[n_pollfds].events = pw->events;
+ pollfds[n_pollfds].revents = 0;
+ n_pollfds++;
+ }
+
+ do {
+ retval = poll(pollfds, n_pollfds, timeout);
+ } while (retval < 0 && errno == EINTR);
+ if (retval < 0) {
+ VLOG_ERR("poll: %s", strerror(errno));
+ }
+
+ for (node = waiters.next; node != &waiters; ) {
+ pw = CONTAINER_OF(node, struct poll_waiter, node);
+ if (!pw->pollfd || !pw->pollfd->revents) {
+ if (pw->function) {
+ node = node->next;
+ continue;
+ }
+ } else {
+ if (pw->function) {
+#ifndef NDEBUG
+ running_cb = pw;
+#endif
+ pw->function(pw->fd, pw->pollfd->revents, pw->aux);
+#ifndef NDEBUG
+ running_cb = NULL;
+#endif
+ } else if (pw->revents) {
+ *pw->revents = pw->pollfd->revents;
+ }
+ }
+ node = list_remove(node);
+ n_waiters--;
+ }
+
+ timeout = -1;
+}
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
+#include <poll.h>
#include <stdio.h>
+#include <string.h>
#include "vlog.h"
#define THIS_MODULE VLM_socket_util
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags != -1) {
- return fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1 ? 0 : errno;
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1) {
+ return 0;
+ } else {
+ VLOG_ERR("fcntl(F_SETFL) failed: %s", strerror(errno));
+ return errno;
+ }
} else {
+ VLOG_ERR("fcntl(F_GETFL) failed: %s", strerror(errno));
return errno;
}
}
}
return 0;
}
+
+/* Returns the error condition associated with socket 'fd' and resets the
+ * socket's error status. */
+int
+get_socket_error(int fd)
+{
+ int error;
+ socklen_t len = sizeof(error);
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
+ error = errno;
+ VLOG_ERR("getsockopt(SO_ERROR): %s", strerror(error));
+ }
+ return error;
+}
+
+int
+check_connection_completion(int fd)
+{
+ struct pollfd pfd;
+ int retval;
+
+ pfd.fd = fd;
+ pfd.events = POLLOUT;
+ do {
+ retval = poll(&pfd, 1, 0);
+ } while (retval < 0 && errno == EINTR);
+ if (retval == 1) {
+ return get_socket_error(fd);
+ } else if (retval < 0) {
+ VLOG_ERR("poll: %s", strerror(errno));
+ return errno;
+ } else {
+ return EAGAIN;
+ }
+}
#include "buffer.h"
#include "dpif.h"
#include "netlink.h"
+#include "poll-loop.h"
#include "socket-util.h"
#include "util.h"
#include "openflow.h"
netlink = xmalloc(sizeof *netlink);
netlink->vconn.class = &netlink_vconn_class;
+ netlink->vconn.connect_status = 0;
retval = dpif_open(dp_idx, true, &netlink->dp);
if (retval) {
free(netlink);
free(netlink);
}
-static bool
-netlink_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
- struct netlink_vconn *netlink = netlink_vconn_cast(vconn);
- pfd->fd = nl_sock_fd(netlink->dp.sock);
- if (want & WANT_RECV) {
- pfd->events |= POLLIN;
- }
- if (want & WANT_SEND) {
- pfd->events |= POLLOUT;
- }
- return false;
-}
-
static int
netlink_recv(struct vconn *vconn, struct buffer **bufferp)
{
return retval;
}
+static void
+netlink_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct netlink_vconn *netlink = netlink_vconn_cast(vconn);
+ short int events = 0;
+ switch (wait) {
+ case WAIT_RECV:
+ events = POLLIN;
+ break;
+
+ case WAIT_SEND:
+ events = 0;
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+ poll_fd_wait(nl_sock_fd(netlink->dp.sock), events, NULL);
+}
+
struct vconn_class netlink_vconn_class = {
.name = "nl",
.open = netlink_open,
.close = netlink_close,
- .prepoll = netlink_prepoll,
.recv = netlink_recv,
.send = netlink_send,
+ .wait = netlink_wait,
};
#include "socket-util.h"
#include "util.h"
#include "openflow.h"
+#include "poll-loop.h"
#include "ofp-print.h"
+#include "socket-util.h"
#include "vconn.h"
#include "vlog.h"
/* Active SSL. */
enum ssl_state {
- STATE_SSL_CONNECTING,
- STATE_CONNECTED
+ STATE_TCP_CONNECTING,
+ STATE_SSL_CONNECTING
};
enum session_type {
SSL *ssl;
struct buffer *rxbuf;
struct buffer *txbuf;
+ struct poll_waiter *tx_waiter;
};
/* SSL context created by ssl_init(). */
static int ssl_init(void);
static int do_ssl_init(void);
-static void connect_completed(struct ssl_vconn *, int error);
static bool ssl_wants_io(int ssl_error);
static void ssl_close(struct vconn *);
-static bool state_machine(struct ssl_vconn *sslv);
+static int interpret_ssl_error(const char *function, int ret, int error);
+static void ssl_do_tx(int fd, short int revents, void *vconn_);
static DH *tmp_dh_callback(SSL *ssl, int is_export UNUSED, int keylength);
static int
new_ssl_vconn(const char *name, int fd, enum session_type type,
- struct vconn **vconnp)
+ enum ssl_state state, struct vconn **vconnp)
{
struct ssl_vconn *sslv;
SSL *ssl = NULL;
goto error;
}
- /* Make 'fd' non-blocking and disable Nagle. */
- retval = set_nonblocking(fd);
- if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
- close(fd);
- return retval;
- }
+ /* Disable Nagle. */
retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
if (retval) {
VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
/* Create and return the ssl_vconn. */
sslv = xmalloc(sizeof *sslv);
sslv->vconn.class = &ssl_vconn_class;
- sslv->state = STATE_SSL_CONNECTING;
+ sslv->vconn.connect_status = EAGAIN;
+ sslv->state = state;
sslv->type = type;
sslv->fd = fd;
sslv->ssl = ssl;
sslv->rxbuf = NULL;
sslv->txbuf = NULL;
+ sslv->tx_waiter = NULL;
*vconnp = &sslv->vconn;
return 0;
VLOG_ERR("%s: socket: %s", name, strerror(errno));
return errno;
}
+ retval = set_nonblocking(fd);
+ if (retval) {
+ close(fd);
+ return retval;
+ }
- /* Connect socket (blocking). */
+ /* Connect socket. */
retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
if (retval < 0) {
- int error = errno;
- VLOG_ERR("%s: connect: %s", name, strerror(error));
- close(fd);
- return error;
+ if (errno == EINPROGRESS) {
+ return new_ssl_vconn(name, fd, CLIENT, STATE_TCP_CONNECTING,
+ vconnp);
+ } else {
+ int error = errno;
+ VLOG_ERR("%s: connect: %s", name, strerror(error));
+ close(fd);
+ return error;
+ }
+ } else {
+ return new_ssl_vconn(name, fd, CLIENT, STATE_SSL_CONNECTING,
+ vconnp);
}
-
- /* Make an ssl_vconn for the socket. */
- return new_ssl_vconn(name, fd, CLIENT, vconnp);
}
-static void
-ssl_close(struct vconn *vconn)
+static int
+ssl_connect(struct vconn *vconn)
{
struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
- SSL_free(sslv->ssl);
- close(sslv->fd);
- free(sslv);
-}
+ int retval;
-static bool
-ssl_want_io_to_events(SSL *ssl, short int *events)
-{
- if (SSL_want_read(ssl)) {
- *events |= POLLIN;
- return true;
- } else if (SSL_want_write(ssl)) {
- *events |= POLLOUT;
- return true;
- } else {
- return false;
- }
-}
+ switch (sslv->state) {
+ case STATE_TCP_CONNECTING:
+ retval = check_connection_completion(sslv->fd);
+ if (retval) {
+ return retval;
+ }
+ sslv->state = STATE_SSL_CONNECTING;
+ /* Fall through. */
-static bool
-ssl_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
- struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
- pfd->fd = sslv->fd;
- if (!state_machine(sslv)) {
- switch (sslv->state) {
- case STATE_SSL_CONNECTING:
- if (!ssl_want_io_to_events(sslv->ssl, &pfd->events)) {
- /* state_machine() should have transitioned us away to another
- * state. */
- NOT_REACHED();
+ case STATE_SSL_CONNECTING:
+ retval = (sslv->type == CLIENT
+ ? SSL_connect(sslv->ssl) : SSL_accept(sslv->ssl));
+ if (retval != 1) {
+ int error = SSL_get_error(sslv->ssl, retval);
+ if (retval < 0 && ssl_wants_io(error)) {
+ return EAGAIN;
+ } else {
+ interpret_ssl_error((sslv->type == CLIENT ? "SSL_connect"
+ : "SSL_accept"), retval, error);
+ shutdown(sslv->fd, SHUT_RDWR);
+ return EPROTO;
}
- break;
- default:
- NOT_REACHED();
- }
- } else if (sslv->connect_error) {
- pfd->events = 0;
- return true;
- } else if (!ssl_want_io_to_events(sslv->ssl, &pfd->events)) {
- if (want & WANT_RECV) {
- pfd->events |= POLLIN;
- }
- if (want & WANT_SEND || sslv->txbuf) {
- pfd->events |= POLLOUT;
+ } else {
+ return 0;
}
}
- return false;
+
+ NOT_REACHED();
}
static void
-ssl_postpoll(struct vconn *vconn, short int *revents)
+ssl_close(struct vconn *vconn)
{
struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
- if (!state_machine(sslv)) {
- *revents = 0;
- } else if (sslv->connect_error) {
- *revents |= POLLERR;
- } else if (*revents & POLLOUT && sslv->txbuf) {
- ssize_t n = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
- if (n > 0) {
- buffer_pull(sslv->txbuf, n);
- if (sslv->txbuf->size == 0) {
- buffer_delete(sslv->txbuf);
- sslv->txbuf = NULL;
- }
- }
- if (sslv->txbuf) {
- *revents &= ~POLLOUT;
- }
- }
+ poll_cancel(sslv->tx_waiter);
+ SSL_free(sslv->ssl);
+ close(sslv->fd);
+ free(sslv);
}
static int
size_t want_bytes;
ssize_t ret;
- if (!state_machine(sslv)) {
- return EAGAIN;
- } else if (sslv->connect_error) {
- return sslv->connect_error;
- }
-
if (sslv->rxbuf == NULL) {
sslv->rxbuf = buffer_new(1564);
}
}
}
+static void
+ssl_clear_txbuf(struct ssl_vconn *sslv)
+{
+ buffer_delete(sslv->txbuf);
+ sslv->txbuf = NULL;
+ sslv->tx_waiter = NULL;
+}
+
+static void
+ssl_register_tx_waiter(struct vconn *vconn)
+{
+ struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+ short int events = SSL_want_read(sslv->ssl) ? POLLIN : POLLOUT;
+ sslv->tx_waiter = poll_fd_callback(sslv->fd, events, ssl_do_tx, vconn);
+}
+
+static void
+ssl_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+ struct vconn *vconn = vconn_;
+ struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+ int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
+ if (ret > 0) {
+ buffer_pull(sslv->txbuf, ret);
+ if (sslv->txbuf->size == 0) {
+ ssl_clear_txbuf(sslv);
+ return;
+ }
+ } else {
+ int error = SSL_get_error(sslv->ssl, ret);
+ if (error == SSL_ERROR_ZERO_RETURN) {
+ /* Connection closed (EOF). */
+ VLOG_WARN("SSL_write: connection close");
+ } else if (interpret_ssl_error("SSL_write", ret, error) != EAGAIN) {
+ ssl_clear_txbuf(sslv);
+ return;
+ }
+ }
+ ssl_register_tx_waiter(vconn);
+}
+
static int
ssl_send(struct vconn *vconn, struct buffer *buffer)
{
struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
ssize_t ret;
- if (!state_machine(sslv)) {
- return EAGAIN;
- } else if (sslv->connect_error) {
- return sslv->connect_error;
- }
-
if (sslv->txbuf) {
return EAGAIN;
}
} else {
sslv->txbuf = buffer;
buffer_pull(buffer, ret);
+ ssl_register_tx_waiter(vconn);
}
return 0;
} else {
}
}
+static bool
+ssl_needs_wait(struct ssl_vconn *sslv)
+{
+ if (SSL_want_read(sslv->ssl)) {
+ poll_fd_wait(sslv->fd, POLLIN, NULL);
+ return true;
+ } else if (SSL_want_write(sslv->ssl)) {
+ poll_fd_wait(sslv->fd, POLLOUT, NULL);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+static void
+ssl_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+
+ switch (wait) {
+ case WAIT_CONNECT:
+ if (vconn_connect(vconn) != EAGAIN) {
+ poll_immediate_wake();
+ } else if (sslv->state == STATE_TCP_CONNECTING) {
+ poll_fd_wait(sslv->fd, POLLOUT, NULL);
+ } else if (!ssl_needs_wait(sslv)) {
+ NOT_REACHED();
+ }
+ break;
+
+ case WAIT_RECV:
+ if (!ssl_needs_wait(sslv)) {
+ if (SSL_pending(sslv->ssl)) {
+ poll_immediate_wake();
+ } else {
+ poll_fd_wait(sslv->fd, POLLIN, NULL);
+ }
+ }
+ break;
+
+ case WAIT_SEND:
+ if (!sslv->txbuf && !ssl_needs_wait(sslv)) {
+ poll_fd_wait(sslv->fd, POLLOUT, NULL);
+ }
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+}
+
struct vconn_class ssl_vconn_class = {
.name = "ssl",
.open = ssl_open,
.close = ssl_close,
- .prepoll = ssl_prepoll,
- .postpoll = ssl_postpoll,
+ .connect = ssl_connect,
.recv = ssl_recv,
.send = ssl_send,
+ .wait = ssl_wait,
};
\f
/* Passive SSL. */
retval = set_nonblocking(fd);
if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
close(fd);
return retval;
}
pssl = xmalloc(sizeof *pssl);
pssl->vconn.class = &pssl_vconn_class;
+ pssl->vconn.connect_status = 0;
pssl->fd = fd;
*vconnp = &pssl->vconn;
return 0;
free(pssl);
}
-static bool
-pssl_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
- struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
- pfd->fd = pssl->fd;
- if (want & WANT_ACCEPT) {
- pfd->events |= POLLIN;
- }
- return false;
-}
-
static int
pssl_accept(struct vconn *vconn, struct vconn **new_vconnp)
{
struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
int new_fd;
+ int error;
new_fd = accept(pssl->fd, NULL, NULL);
if (new_fd < 0) {
int error = errno;
if (error != EAGAIN) {
- VLOG_DBG("pssl: accept: %s", strerror(error));
+ VLOG_DBG("accept: %s", strerror(error));
}
return error;
}
- return new_ssl_vconn("ssl" /* FIXME */, new_fd, SERVER, new_vconnp);
+ error = set_nonblocking(new_fd);
+ if (error) {
+ close(new_fd);
+ return error;
+ }
+
+ return new_ssl_vconn("ssl" /* FIXME */, new_fd,
+ SERVER, STATE_SSL_CONNECTING, new_vconnp);
+}
+
+static void
+pssl_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
+ assert(wait == WAIT_ACCEPT);
+ poll_fd_wait(pssl->fd, POLLIN, NULL);
}
struct vconn_class pssl_vconn_class = {
.name = "pssl",
.open = pssl_open,
.close = pssl_close,
- .prepoll = pssl_prepoll,
.accept = pssl_accept,
+ .wait = pssl_wait,
};
\f
/*
return 0;
}
-static bool
-state_machine(struct ssl_vconn *sslv)
-{
- if (sslv->state == STATE_SSL_CONNECTING) {
- int ret = (sslv->type == CLIENT
- ? SSL_connect(sslv->ssl) : SSL_accept(sslv->ssl));
- if (ret != 1) {
- int error = SSL_get_error(sslv->ssl, ret);
- if (ret < 0 && ssl_wants_io(error)) {
- /* Stay in this state to repeat the SSL_connect later. */
- return false;
- } else {
- interpret_ssl_error((sslv->type == CLIENT ? "SSL_connect"
- : "SSL_accept"), ret, error);
- shutdown(sslv->fd, SHUT_RDWR);
- connect_completed(sslv, EPROTO);
- }
- } else {
- connect_completed(sslv, 0);
- }
- }
- return sslv->state == STATE_CONNECTED;
-}
-
-static void
-connect_completed(struct ssl_vconn *sslv, int error)
-{
- sslv->state = STATE_CONNECTED;
- sslv->connect_error = error;
-}
-
static DH *
tmp_dh_callback(SSL *ssl, int is_export UNUSED, int keylength)
{
#include "util.h"
#include "openflow.h"
#include "ofp-print.h"
+#include "poll-loop.h"
#include "vlog.h"
#define THIS_MODULE VLM_vconn_tcp
int fd;
struct buffer *rxbuf;
struct buffer *txbuf;
+ struct poll_waiter *tx_waiter;
};
static int
-new_tcp_vconn(const char *name, int fd, struct vconn **vconnp)
+new_tcp_vconn(const char *name, int fd, int connect_status,
+ struct vconn **vconnp)
{
struct tcp_vconn *tcp;
int on = 1;
int retval;
- retval = set_nonblocking(fd);
- if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
- close(fd);
- return retval;
- }
-
retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
if (retval) {
VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
tcp = xmalloc(sizeof *tcp);
tcp->vconn.class = &tcp_vconn_class;
+ tcp->vconn.connect_status = connect_status;
tcp->fd = fd;
tcp->txbuf = NULL;
+ tcp->tx_waiter = NULL;
tcp->rxbuf = NULL;
*vconnp = &tcp->vconn;
return 0;
}
static struct tcp_vconn *
-tcp_vconn_cast(struct vconn *vconn)
+tcp_vconn_cast(struct vconn *vconn)
{
assert(vconn->class == &tcp_vconn_class);
- return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
+ return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
}
return errno;
}
- retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
- if (retval < 0) {
- int error = errno;
- VLOG_ERR("%s: connect: %s", name, strerror(error));
+ retval = set_nonblocking(fd);
+ if (retval) {
close(fd);
- return error;
+ return retval;
}
- return new_tcp_vconn(name, fd, vconnp);
+ retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
+ if (retval < 0) {
+ if (errno == EINPROGRESS) {
+ return new_tcp_vconn(name, fd, EAGAIN, vconnp);
+ } else {
+ int error = errno;
+ VLOG_ERR("%s: connect: %s", name, strerror(error));
+ close(fd);
+ return error;
+ }
+ } else {
+ return new_tcp_vconn(name, fd, 0, vconnp);
+ }
}
static void
-tcp_close(struct vconn *vconn)
+tcp_close(struct vconn *vconn)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ poll_cancel(tcp->tx_waiter);
close(tcp->fd);
free(tcp);
}
-static bool
-tcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
+static int
+tcp_connect(struct vconn *vconn)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
- pfd->fd = tcp->fd;
- if (want & WANT_RECV) {
- pfd->events |= POLLIN;
- }
- if (want & WANT_SEND || tcp->txbuf) {
- pfd->events |= POLLOUT;
- }
- return false;
-}
-
-static void
-tcp_postpoll(struct vconn *vconn, short int *revents)
-{
- struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
- if (*revents & POLLOUT && tcp->txbuf) {
- ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
- if (n < 0) {
- if (errno != EAGAIN) {
- VLOG_ERR("send: %s", strerror(errno));
- *revents |= POLLERR;
- }
- } else if (n > 0) {
- buffer_pull(tcp->txbuf, n);
- if (tcp->txbuf->size == 0) {
- buffer_delete(tcp->txbuf);
- tcp->txbuf = NULL;
- }
- }
- if (tcp->txbuf) {
- *revents &= ~POLLOUT;
- }
- }
+ return check_connection_completion(tcp->fd);
}
static int
}
}
+static void
+tcp_clear_txbuf(struct tcp_vconn *tcp)
+{
+ buffer_delete(tcp->txbuf);
+ tcp->txbuf = NULL;
+ tcp->tx_waiter = NULL;
+}
+
+static void
+tcp_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+ struct vconn *vconn = vconn_;
+ struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ VLOG_ERR("send: %s", strerror(errno));
+ tcp_clear_txbuf(tcp);
+ return;
+ }
+ } else if (n > 0) {
+ buffer_pull(tcp->txbuf, n);
+ if (!tcp->txbuf->size) {
+ tcp_clear_txbuf(tcp);
+ return;
+ }
+ }
+ tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
+}
+
static int
-tcp_send(struct vconn *vconn, struct buffer *buffer)
+tcp_send(struct vconn *vconn, struct buffer *buffer)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
ssize_t retval;
if (retval > 0) {
buffer_pull(buffer, retval);
}
+ tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
return 0;
} else {
return errno;
}
}
+static void
+tcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ switch (wait) {
+ case WAIT_CONNECT:
+ poll_fd_wait(tcp->fd, POLLOUT, NULL);
+ break;
+
+ case WAIT_SEND:
+ if (!tcp->txbuf) {
+ poll_fd_wait(tcp->fd, POLLOUT, NULL);
+ } else {
+ /* Nothing to do: need to drain txbuf first. */
+ }
+ break;
+
+ case WAIT_RECV:
+ poll_fd_wait(tcp->fd, POLLIN, NULL);
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+}
+
struct vconn_class tcp_vconn_class = {
.name = "tcp",
.open = tcp_open,
.close = tcp_close,
- .prepoll = tcp_prepoll,
- .postpoll = tcp_postpoll,
+ .connect = tcp_connect,
.recv = tcp_recv,
.send = tcp_send,
+ .wait = tcp_wait,
};
\f
/* Passive TCP. */
};
static struct ptcp_vconn *
-ptcp_vconn_cast(struct vconn *vconn)
+ptcp_vconn_cast(struct vconn *vconn)
{
assert(vconn->class == &ptcp_vconn_class);
- return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
+ return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
}
static int
retval = set_nonblocking(fd);
if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
close(fd);
return retval;
}
ptcp = xmalloc(sizeof *ptcp);
ptcp->vconn.class = &ptcp_vconn_class;
+ ptcp->vconn.connect_status = 0;
ptcp->fd = fd;
*vconnp = &ptcp->vconn;
return 0;
}
static void
-ptcp_close(struct vconn *vconn)
+ptcp_close(struct vconn *vconn)
{
struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
close(ptcp->fd);
free(ptcp);
}
-static bool
-ptcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
- struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
- pfd->fd = ptcp->fd;
- if (want & WANT_ACCEPT) {
- pfd->events |= POLLIN;
- }
- return false;
-}
-
static int
-ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
+ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
{
struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
int new_fd;
+ int error;
new_fd = accept(ptcp->fd, NULL, NULL);
if (new_fd < 0) {
- return errno;
+ int error = errno;
+ if (error != EAGAIN) {
+ VLOG_DBG("accept: %s", strerror(error));
+ }
+ return error;
}
- return new_tcp_vconn("tcp" /* FIXME */, new_fd, new_vconnp);
+ error = set_nonblocking(new_fd);
+ if (error) {
+ close(new_fd);
+ return error;
+ }
+
+ return new_tcp_vconn("tcp" /* FIXME */, new_fd, 0, new_vconnp);
+}
+
+static void
+ptcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
+ assert(wait == WAIT_ACCEPT);
+ poll_fd_wait(ptcp->fd, POLLIN, NULL);
}
struct vconn_class ptcp_vconn_class = {
.name = "ptcp",
.open = ptcp_open,
.close = ptcp_close,
- .prepoll = ptcp_prepoll,
.accept = ptcp_accept,
+ .wait = ptcp_wait
};
#include "buffer.h"
#include "flow.h"
#include "openflow.h"
+#include "poll-loop.h"
#include "util.h"
+#define THIS_MODULE VLM_vconn
+#include "vlog.h"
+
static struct vconn_class *vconn_classes[] = {
&tcp_vconn_class,
&ptcp_vconn_class,
/* Check the validity of the vconn class structures. */
static void
-check_vconn_classes(void)
+check_vconn_classes(void)
{
#ifndef NDEBUG
size_t i;
assert(class->name != NULL);
assert(class->open != NULL);
assert(class->close != NULL);
- assert(class->prepoll != NULL);
assert(class->accept
? !class->recv && !class->send
- : class->recv && class->send);
+ : class->recv && class->send);
+ assert(class->wait != NULL);
}
#endif
}
* stores a pointer to the new connection in '*vconnp', otherwise a null
* pointer. */
int
-vconn_open(const char *name, struct vconn **vconnp)
+vconn_open(const char *name, struct vconn **vconnp)
{
size_t prefix_len;
size_t i;
free(suffix_copy);
if (retval) {
*vconnp = NULL;
+ } else {
+ assert((*vconnp)->connect_status != EAGAIN
+ || (*vconnp)->class->connect);
}
return retval;
}
abort();
}
+int
+vconn_open_block(const char *name, struct vconn **vconnp)
+{
+ struct vconn *vconn;
+ int error;
+
+ error = vconn_open(name, &vconn);
+ while (error == EAGAIN) {
+ vconn_connect_wait(vconn);
+ poll_block();
+ error = vconn_connect(vconn);
+ assert(error != EINPROGRESS);
+ }
+ if (error) {
+ vconn_close(vconn);
+ *vconnp = NULL;
+ } else {
+ *vconnp = vconn;
+ }
+ return error;
+}
+
/* Closes 'vconn'. */
void
-vconn_close(struct vconn *vconn)
+vconn_close(struct vconn *vconn)
{
if (vconn != NULL) {
(vconn->class->close)(vconn);
* 'vconn' is an active vconn, that is, its purpose is to transfer data, not
* to wait for new connections to arrive. */
bool
-vconn_is_passive(const struct vconn *vconn)
+vconn_is_passive(const struct vconn *vconn)
{
return vconn->class->accept != NULL;
}
-/* Initializes 'pfd->fd' and 'pfd->events' appropriately so that poll() will
- * wake up when the connection becomes available for the operations specified
- * in 'want', or for performing the vconn's needed internal processing.
- *
- * Normally returns false. Returns true to indicate that no blocking should
- * happen in poll() because the connection is available for some operation
- * specified in 'want' but that status cannot be detected via poll() and thus
- * poll() could block forever otherwise. */
-bool
-vconn_prepoll(struct vconn *vconn, int want, struct pollfd *pollfd)
-{
- return (vconn->class->prepoll)(vconn, want, pollfd);
-}
-
-/* Perform any internal processing needed by the connections. The vconn file
- * descriptor's status, as reported by poll(), must be provided in '*revents'.
- *
- * The postpoll function adjusts '*revents' to reflect the status of the
- * connection from the caller's point of view. That is, upon return '*revents
- * & POLLIN' indicates that a packet is (potentially) ready to be read (for an
- * active vconn) or a new connection is ready to be accepted (for a passive
- * vconn) and '*revents & POLLOUT' indicates that a packet is (potentially)
- * ready to be written. */
-void
-vconn_postpoll(struct vconn *vconn, short int *revents)
+/* Tries to complete the connection on 'vconn', which must be an active
+ * vconn. If 'vconn''s connection is complete, returns 0 if the connection
+ * was successful or a positive errno value if it failed. If the
+ * connection is still in progress, returns EAGAIN. */
+int
+vconn_connect(struct vconn *vconn)
{
- if (vconn->class->postpoll) {
- (vconn->class->postpoll)(vconn, revents);
- }
+ if (vconn->connect_status == EAGAIN) {
+ vconn->connect_status = (vconn->class->connect)(vconn);
+ assert(vconn->connect_status != EINPROGRESS);
+ }
+ return vconn->connect_status;
}
/* Tries to accept a new connection on 'vconn', which must be a passive vconn.
* vconn_accept will not block waiting for a connection. If no connection is
* ready to be accepted, it returns EAGAIN immediately. */
int
-vconn_accept(struct vconn *vconn, struct vconn **new_vconn)
+vconn_accept(struct vconn *vconn, struct vconn **new_vconn)
{
- int retval = (vconn->class->accept)(vconn, new_vconn);
+ int retval;
+
+ retval = (vconn->class->accept)(vconn, new_vconn);
+
if (retval) {
*new_vconn = NULL;
+ } else {
+ assert((*new_vconn)->connect_status != EAGAIN
+ || (*new_vconn)->class->connect);
}
return retval;
}
* vconn_recv will not block waiting for a packet to arrive. If no packets
* have been received, it returns EAGAIN immediately. */
int
-vconn_recv(struct vconn *vconn, struct buffer **msgp)
+vconn_recv(struct vconn *vconn, struct buffer **msgp)
{
- int retval = (vconn->class->recv)(vconn, msgp);
+ int retval = vconn_connect(vconn);
+ if (!retval) {
+ retval = (vconn->class->recv)(vconn, msgp);
+ }
if (retval) {
*msgp = NULL;
}
* vconn_send will not block. If 'msg' cannot be immediately accepted for
* transmission, it returns EAGAIN immediately. */
int
-vconn_send(struct vconn *vconn, struct buffer *msg)
+vconn_send(struct vconn *vconn, struct buffer *msg)
{
- return (vconn->class->send)(vconn, msg);
+ int retval = vconn_connect(vconn);
+ if (!retval) {
+ retval = (vconn->class->send)(vconn, msg);
+ }
+ return retval;
}
/* Same as vconn_send, except that it waits until 'msg' can be transmitted. */
int
-vconn_send_wait(struct vconn *vconn, struct buffer *msg)
+vconn_send_block(struct vconn *vconn, struct buffer *msg)
{
int retval;
while ((retval = vconn_send(vconn, msg)) == EAGAIN) {
- struct pollfd pfd;
+ vconn_send_wait(vconn);
+ poll_block();
+ }
+ return retval;
+}
+
+void
+vconn_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ int connect_status;
+
+ assert(vconn_is_passive(vconn)
+ ? wait == WAIT_ACCEPT || wait == WAIT_CONNECT
+ : wait == WAIT_CONNECT || wait == WAIT_RECV || wait == WAIT_SEND);
- pfd.fd = -1;
- pfd.events = 0;
- vconn_prepoll(vconn, WANT_SEND, &pfd);
- do {
- retval = poll(&pfd, 1, -1);
- } while (retval < 0 && errno == EINTR);
- if (retval < 0) {
- return errno;
+ connect_status = vconn_connect(vconn);
+ if (connect_status) {
+ if (connect_status == EAGAIN) {
+ wait = WAIT_CONNECT;
+ } else {
+ poll_immediate_wake();
+ return;
}
- assert(retval == 1);
- vconn_postpoll(vconn, &pfd.revents);
}
- return retval;
+
+ (vconn->class->wait)(vconn, wait);
+}
+
+void
+vconn_connect_wait(struct vconn *vconn)
+{
+ vconn_wait(vconn, WAIT_CONNECT);
+}
+
+void
+vconn_accept_wait(struct vconn *vconn)
+{
+ vconn_wait(vconn, WAIT_ACCEPT);
+}
+
+void
+vconn_recv_wait(struct vconn *vconn)
+{
+ vconn_wait(vconn, WAIT_RECV);
+}
+
+void
+vconn_send_wait(struct vconn *vconn)
+{
+ vconn_wait(vconn, WAIT_SEND);
}
struct buffer *
make_add_simple_flow(const struct flow *flow,
- uint32_t buffer_id, uint16_t out_port)
+ uint32_t buffer_id, uint16_t out_port)
{
struct ofp_flow_mod *ofm;
size_t size = sizeof *ofm + sizeof ofm->actions[0];
#include <sys/types.h>
#include <unistd.h>
#include "fatal-signal.h"
+#include "poll-loop.h"
#include "util.h"
#include "vlog.h"
\f
/* Server for Vlog control connection. */
struct vlog_server {
+ struct poll_waiter *waiter;
char *path;
int fd;
};
+static void poll_server(int fd, short int events, void *server_);
+
/* Start listening for connections from clients and processing their
* requests. 'path' may be:
*
free(server);
fprintf(stderr, "Could not initialize vlog configuration socket: %s\n",
strerror(-server->fd));
- *serverp = NULL;
+ if (serverp) {
+ *serverp = NULL;
+ }
return fd;
}
- *serverp = server;
+
+ server->waiter = poll_fd_callback(server->fd, POLLIN, poll_server, server);
+
+ if (serverp) {
+ *serverp = server;
+ }
return 0;
}
vlog_server_close(struct vlog_server *server)
{
if (server) {
+ poll_cancel(server->waiter);
close(server->fd);
unlink(server->path);
fatal_signal_remove_file_to_unlink(server->path);
}
}
-/* Returns the fd used by 'server'. The caller can poll this fd (POLLIN) to
- * determine when to call vlog_server_poll(). */
-int
-vlog_server_get_fd(const struct vlog_server *server)
-{
- return server->fd;
-}
-
static int
recv_with_creds(const struct vlog_server *server,
char *cmd_buf, size_t cmd_buf_size,
}
/* Processes incoming requests for 'server'. */
-void
-vlog_server_poll(struct vlog_server *server)
+static void
+poll_server(int fd UNUSED, short int events, void *server_)
{
+ struct vlog_server *server = server_;
for (;;) {
char cmd_buf[512];
struct sockaddr_un un;
fprintf(stderr, "vlog: reading configuration socket: %s",
strerror(errno));
}
- return;
+ break;
} else if (error < 0) {
continue;
}
(struct sockaddr*) &un, un_len);
free(reply);
}
+ server->waiter = poll_fd_callback(server->fd, POLLIN, poll_server, server);
}
\f
/* Client for Vlog control connection. */
#include "vconn.h"
#include "vlog-socket.h"
#include "openflow.h"
+#include "poll-loop.h"
#include "vlog.h"
#define THIS_MODULE VLM_secchan
struct half {
const char *name;
struct vconn *vconn;
- struct pollfd *pollfd;
struct buffer *rxbuf;
time_t backoff_deadline;
int backoff;
main(int argc, char *argv[])
{
struct half halves[2];
- struct pollfd pollfds[2 + 1];
- struct vlog_server *vlog_server;
int retval;
int i;
fatal(0, "exactly two peer arguments required; use --help for usage");
}
- retval = vlog_server_listen(NULL, &vlog_server);
+ retval = vlog_server_listen(NULL, NULL);
if (retval) {
fatal(retval, "Could not listen for vlog connections");
}
for (i = 0; i < 2; i++) {
halves[i].name = argv[optind + i];
halves[i].vconn = NULL;
- halves[i].pollfd = &pollfds[i];
halves[i].rxbuf = NULL;
halves[i].backoff_deadline = 0;
halves[i].backoff = 1;
reconnect(&halves[i]);
}
for (;;) {
- size_t n_ready;
-
- /* Wait until there's something to do. */
- n_ready = 0;
- for (i = 0; i < 2; i++) {
- struct half *this = &halves[i];
- struct half *peer = &halves[!i];
- int want = 0;
- if (peer->rxbuf) {
- want |= WANT_SEND;
- }
- if (!this->rxbuf) {
- want |= WANT_RECV;
- }
- this->pollfd->fd = -1;
- this->pollfd->events = 0;
- n_ready += vconn_prepoll(this->vconn, want, this->pollfd);
- }
- if (vlog_server) {
- pollfds[2].fd = vlog_server_get_fd(vlog_server);
- pollfds[2].events = POLLIN;
- }
- do {
- retval = poll(pollfds, 2 + (vlog_server != NULL),
- n_ready ? 0 : -1);
- } while (retval < 0 && errno == EINTR);
- if (retval < 0 || (retval == 0 && !n_ready)) {
- fatal(retval < 0 ? errno : 0, "poll");
- }
+ /* Do some work. Limit the number of iterations so that callbacks
+ * registered with the poll loop don't starve. */
+ int iteration;
+ for (iteration = 0; iteration < 50; iteration++) {
+ bool progress = false;
+ for (i = 0; i < 2; i++) {
+ struct half *this = &halves[i];
+ struct half *peer = &halves[!i];
+
+ if (!this->rxbuf) {
+ retval = vconn_recv(this->vconn, &this->rxbuf);
+ if (retval && retval != EAGAIN) {
+ if (retval == EOF) {
+ VLOG_DBG("%s: connection closed by remote host",
+ this->name);
+ } else {
+ VLOG_DBG("%s: recv: closing connection: %s",
+ this->name, strerror(retval));
+ }
+ reconnect(this);
+ break;
+ }
+ }
- /* Let each connection deal with any pending operations. */
- for (i = 0; i < 2; i++) {
- struct half *this = &halves[i];
- vconn_postpoll(this->vconn, &this->pollfd->revents);
- if (this->pollfd->revents & POLLERR) {
- this->pollfd->revents |= POLLIN | POLLOUT;
+ if (this->rxbuf) {
+ retval = vconn_send(peer->vconn, this->rxbuf);
+ if (!retval) {
+ this->rxbuf = NULL;
+ progress = true;
+ } else if (retval != EAGAIN) {
+ VLOG_DBG("%s: send: closing connection: %s",
+ peer->name, strerror(retval));
+ reconnect(peer);
+ break;
+ }
+ }
+ }
+ if (!progress) {
+ break;
}
- }
- if (vlog_server && pollfds[2].revents) {
- vlog_server_poll(vlog_server);
}
- /* Do as much work as we can without waiting. */
+ /* Wait for something to happen. */
for (i = 0; i < 2; i++) {
struct half *this = &halves[i];
struct half *peer = &halves[!i];
-
- if (this->pollfd->revents & POLLIN && !this->rxbuf) {
- retval = vconn_recv(this->vconn, &this->rxbuf);
- if (retval && retval != EAGAIN) {
- VLOG_DBG("%s: recv: closing connection: %s",
- this->name, strerror(retval));
- reconnect(this);
- break;
- }
+ if (!this->rxbuf) {
+ vconn_recv_wait(this->vconn);
+ } else {
+ vconn_send_wait(peer->vconn);
}
-
- if (peer->pollfd->revents & POLLOUT && this->rxbuf) {
- retval = vconn_send(peer->vconn, this->rxbuf);
- if (!retval) {
- this->rxbuf = NULL;
- } else if (retval != EAGAIN) {
- VLOG_DBG("%s: send: closing connection: %s",
- peer->name, strerror(retval));
- reconnect(peer);
- break;
- }
- }
}
+ poll_block();
}
return 0;
buffer_delete(this->rxbuf);
this->rxbuf = NULL;
}
- this->pollfd->revents = POLLIN | POLLOUT;
for (;;) {
time_t now = time(0);
}
VLOG_WARN("%s: waiting %d seconds before reconnect\n",
this->name, (int) (this->backoff_deadline - now));
- sleep(this->backoff_deadline - now);
+ poll_timer_wait((this->backoff_deadline - now) * 1000);
+ poll_block();
}
- retval = vconn_open(this->name, &this->vconn);
+ retval = vconn_open_block(this->name, &this->vconn);
if (!retval) {
VLOG_WARN("%s: connected", this->name);
if (vconn_is_passive(this->vconn)) {
if (!reliable) {
fatal(0, "%s: connection failed", this->name);
}
- VLOG_WARN("%s: connection failed (%s)", this->name, strerror(errno));
+ VLOG_WARN("%s: connection failed (%s)", this->name, strerror(retval));
this->backoff_deadline = time(0) + this->backoff;
}
}
ofm->group_id = htonl(0);
str_to_flow(line, &ofm->match, &ofm->actions[0]);
- retval = vconn_send_wait(vconn, buffer);
+ retval = vconn_send_block(vconn, buffer);
if (retval) {
fatal(retval, "sending to datapath");
}