From 85a2e91f5f0bae5f44f8978b8ab8d618e25d8593 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Wed, 30 Jul 2008 15:44:22 -0700 Subject: [PATCH] vconn: Introduce infrastructure for stream socket-based vconns. --- include/Makefile.am | 1 + include/vconn-stream.h | 50 ++++++ include/vlog.h | 1 + lib/Makefile.am | 1 + lib/vconn-stream.c | 354 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 407 insertions(+) create mode 100644 include/vconn-stream.h create mode 100644 lib/vconn-stream.c diff --git a/include/Makefile.am b/include/Makefile.am index c37f1426..4f0a4656 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -32,6 +32,7 @@ noinst_HEADERS = \ util.h \ vconn.h \ vconn-ssl.h \ + vconn-stream.h \ vlog-socket.h \ vlog.h \ xtoxll.h diff --git a/include/vconn-stream.h b/include/vconn-stream.h new file mode 100644 index 00000000..d7eb59f8 --- /dev/null +++ b/include/vconn-stream.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford + * Junior University + * + * We are making the OpenFlow specification and associated documentation + * (Software) available for public use and benefit with the expectation + * that others will use, modify and enhance the Software and contribute + * those enhancements back to the community. However, since we would + * like to make the Software available for broadest use, with as few + * restrictions as possible permission is hereby granted, free of + * charge, to any person obtaining a copy of this Software to deal in + * the Software under the copyrights without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * The name and trademarks of copyright holder(s) may NOT be used in + * advertising or publicity pertaining to the Software or any + * derivatives without specific, written prior permission. + */ + +#ifndef VCONN_STREAM_H +#define VCONN_STREAM_H 1 + +#include +#include + +struct vconn; +struct sockaddr; + +int new_stream_vconn(const char *name, int fd, int connect_status, + uint32_t ip, struct vconn **vconnp); +int new_pstream_vconn(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr *, + size_t sa_len, struct vconn **), + struct vconn **vconnp); + +#endif /* vconn-stream.h */ diff --git a/include/vlog.h b/include/vlog.h index 2ecaa419..a25c5281 100644 --- a/include/vlog.h +++ b/include/vlog.h @@ -85,6 +85,7 @@ enum vlog_facility vlog_get_facility_val(const char *name); VLOG_MODULE(vconn_netlink) \ VLOG_MODULE(vconn_tcp) \ VLOG_MODULE(vconn_ssl) \ + VLOG_MODULE(vconn_stream) \ VLOG_MODULE(vconn) \ VLOG_MODULE(vlog) \ diff --git a/lib/Makefile.am b/lib/Makefile.am index bedd77fa..6c13fc58 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -26,6 +26,7 @@ libopenflow_a_SOURCES = \ socket-util.c \ util.c \ vconn-tcp.c \ + vconn-stream.c \ vconn.c \ vlog-socket.c \ vlog.c diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c new file mode 100644 index 00000000..28d96354 --- /dev/null +++ b/lib/vconn-stream.c @@ -0,0 +1,354 @@ +/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford + * Junior University + * + * We are making the OpenFlow specification and associated documentation + * (Software) available for public use and benefit with the expectation + * that others will use, modify and enhance the Software and contribute + * those enhancements back to the community. However, since we would + * like to make the Software available for broadest use, with as few + * restrictions as possible permission is hereby granted, free of + * charge, to any person obtaining a copy of this Software to deal in + * the Software under the copyrights without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * The name and trademarks of copyright holder(s) may NOT be used in + * advertising or publicity pertaining to the Software or any + * derivatives without specific, written prior permission. + */ + +#include +#include "vconn-stream.h" +#include +#include +#include +#include +#include +#include +#include +#include "buffer.h" +#include "util.h" +#include "openflow.h" +#include "poll-loop.h" +#include "socket-util.h" +#include "vconn.h" + +#include "vlog.h" +#define THIS_MODULE VLM_vconn_stream + +/* Active stream socket vconn. */ + +struct stream_vconn +{ + struct vconn vconn; + int fd; + struct buffer *rxbuf; + struct buffer *txbuf; + struct poll_waiter *tx_waiter; +}; + +static struct vconn_class stream_vconn_class; + +int +new_stream_vconn(const char *name, int fd, int connect_status, + uint32_t ip, struct vconn **vconnp) +{ + struct stream_vconn *s; + + s = xmalloc(sizeof *s); + s->vconn.class = &stream_vconn_class; + s->vconn.connect_status = connect_status; + s->vconn.ip = ip; + s->fd = fd; + s->txbuf = NULL; + s->tx_waiter = NULL; + s->rxbuf = NULL; + *vconnp = &s->vconn; + return 0; +} + +static struct stream_vconn * +stream_vconn_cast(struct vconn *vconn) +{ + assert(vconn->class == &stream_vconn_class); + return CONTAINER_OF(vconn, struct stream_vconn, vconn); +} + +static void +stream_close(struct vconn *vconn) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + poll_cancel(s->tx_waiter); + close(s->fd); + free(s); +} + +static int +stream_connect(struct vconn *vconn) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + return check_connection_completion(s->fd); +} + +static int +stream_recv(struct vconn *vconn, struct buffer **bufferp) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + struct buffer *rx; + size_t want_bytes; + ssize_t retval; + + if (s->rxbuf == NULL) { + s->rxbuf = buffer_new(1564); + } + rx = s->rxbuf; + +again: + if (sizeof(struct ofp_header) > rx->size) { + want_bytes = sizeof(struct ofp_header) - rx->size; + } else { + struct ofp_header *oh = rx->data; + size_t length = ntohs(oh->length); + if (length < sizeof(struct ofp_header)) { + VLOG_ERR("received too-short ofp_header (%zu bytes)", length); + return EPROTO; + } + want_bytes = length - rx->size; + if (!want_bytes) { + *bufferp = rx; + s->rxbuf = NULL; + return 0; + } + } + buffer_prealloc_tailroom(rx, want_bytes); + + retval = read(s->fd, buffer_tail(rx), want_bytes); + if (retval > 0) { + rx->size += retval; + if (retval == want_bytes) { + if (rx->size > sizeof(struct ofp_header)) { + *bufferp = rx; + s->rxbuf = NULL; + return 0; + } else { + goto again; + } + } + return EAGAIN; + } else if (retval == 0) { + if (rx->size) { + VLOG_ERR("connection dropped mid-packet"); + return EPROTO; + } else { + return EOF; + } + } else { + return retval ? errno : EAGAIN; + } +} + +static void +stream_clear_txbuf(struct stream_vconn *s) +{ + buffer_delete(s->txbuf); + s->txbuf = NULL; + s->tx_waiter = NULL; +} + +static void +stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_) +{ + struct vconn *vconn = vconn_; + struct stream_vconn *s = stream_vconn_cast(vconn); + ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size); + if (n < 0) { + if (errno != EAGAIN) { + VLOG_ERR("send: %s", strerror(errno)); + stream_clear_txbuf(s); + return; + } + } else if (n > 0) { + buffer_pull(s->txbuf, n); + if (!s->txbuf->size) { + stream_clear_txbuf(s); + return; + } + } + s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); +} + +static int +stream_send(struct vconn *vconn, struct buffer *buffer) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + ssize_t retval; + + if (s->txbuf) { + return EAGAIN; + } + + retval = write(s->fd, buffer->data, buffer->size); + if (retval == buffer->size) { + buffer_delete(buffer); + return 0; + } else if (retval >= 0 || errno == EAGAIN) { + s->txbuf = buffer; + if (retval > 0) { + buffer_pull(buffer, retval); + } + s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); + return 0; + } else { + return errno; + } +} + +static void +stream_wait(struct vconn *vconn, enum vconn_wait_type wait) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + switch (wait) { + case WAIT_CONNECT: + poll_fd_wait(s->fd, POLLOUT); + break; + + case WAIT_SEND: + if (!s->txbuf) { + poll_fd_wait(s->fd, POLLOUT); + } else { + /* Nothing to do: need to drain txbuf first. */ + } + break; + + case WAIT_RECV: + poll_fd_wait(s->fd, POLLIN); + break; + + default: + NOT_REACHED(); + } +} + +static struct vconn_class stream_vconn_class = { + .name = "stream", + .close = stream_close, + .connect = stream_connect, + .recv = stream_recv, + .send = stream_send, + .wait = stream_wait, +}; + +/* Passive stream socket vconn. */ + +struct pstream_vconn +{ + struct vconn vconn; + int fd; + int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, + struct vconn **); +}; + +static struct vconn_class pstream_vconn_class; + +static struct pstream_vconn * +pstream_vconn_cast(struct vconn *vconn) +{ + assert(vconn->class == &pstream_vconn_class); + return CONTAINER_OF(vconn, struct pstream_vconn, vconn); +} + +int +new_pstream_vconn(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr *, + size_t sa_len, struct vconn **), + struct vconn **vconnp) +{ + struct pstream_vconn *ps; + int retval; + + retval = set_nonblocking(fd); + if (retval) { + close(fd); + return retval; + } + + if (listen(fd, 10) < 0) { + int error = errno; + VLOG_ERR("%s: listen: %s", name, strerror(error)); + close(fd); + return error; + } + + ps = xmalloc(sizeof *ps); + ps->vconn.class = &pstream_vconn_class; + ps->vconn.connect_status = 0; + ps->fd = fd; + ps->accept_cb = accept_cb; + *vconnp = &ps->vconn; + return 0; +} + +static void +pstream_close(struct vconn *vconn) +{ + struct pstream_vconn *ps = pstream_vconn_cast(vconn); + close(ps->fd); + free(ps); +} + +static int +pstream_accept(struct vconn *vconn, struct vconn **new_vconnp) +{ + struct pstream_vconn *ps = pstream_vconn_cast(vconn); + struct sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + int new_fd; + int retval; + + new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); + if (new_fd < 0) { + int retval = errno; + if (retval != EAGAIN) { + VLOG_DBG("accept: %s", strerror(retval)); + } + return retval; + } + + retval = set_nonblocking(new_fd); + if (retval) { + close(new_fd); + return retval; + } + + return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, + new_vconnp); +} + +static void +pstream_wait(struct vconn *vconn, enum vconn_wait_type wait) +{ + struct pstream_vconn *ps = pstream_vconn_cast(vconn); + assert(wait == WAIT_ACCEPT); + poll_fd_wait(ps->fd, POLLIN); +} + +static struct vconn_class pstream_vconn_class = { + .name = "pstream", + .close = pstream_close, + .accept = pstream_accept, + .wait = pstream_wait +}; -- 2.30.2