From: Ben Pfaff Date: Wed, 4 Nov 2009 23:02:32 +0000 (-0800) Subject: stream: New library for bidirectional streams (e.g. TCP, SSL, Unix sockets). X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c34b65c731a1b6dae014efe8895141e5b2fe758a;p=openvswitch stream: New library for bidirectional streams (e.g. TCP, SSL, Unix sockets). This code is heavily based on the vconn code. Eventually we should make the stream-based vconns (currently that's all of them) a wrapper around streams, but I haven't done that yet. SSL is not implemented yet. --- diff --git a/lib/automake.mk b/lib/automake.mk index 1d02dfd4..fd791c28 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -102,6 +102,13 @@ lib_libopenvswitch_a_SOURCES = \ lib/socket-util.h \ lib/stp.c \ lib/stp.h \ + lib/stream-fd.c \ + lib/stream-fd.h \ + lib/stream-provider.h \ + lib/stream-tcp.c \ + lib/stream-unix.c \ + lib/stream.c \ + lib/stream.h \ lib/svec.c \ lib/svec.h \ lib/tag.c \ @@ -198,6 +205,7 @@ COVERAGE_FILES = \ lib/process.c \ lib/rconn.c \ lib/rtnetlink.c \ + lib/stream.c \ lib/timeval.c \ lib/unixctl.c \ lib/util.c \ diff --git a/lib/stream-fd.c b/lib/stream-fd.c new file mode 100644 index 00000000..46aa8e73 --- /dev/null +++ b/lib/stream-fd.c @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "stream-fd.h" +#include +#include +#include +#include +#include +#include +#include +#include "fatal-signal.h" +#include "leak-checker.h" +#include "poll-loop.h" +#include "socket-util.h" +#include "util.h" +#include "stream-provider.h" +#include "stream.h" + +#include "vlog.h" +#define THIS_MODULE VLM_stream_fd + +/* Active file descriptor stream. */ + +struct stream_fd +{ + struct stream stream; + int fd; + char *unlink_path; +}; + +static struct stream_class stream_fd_class; + +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); + +static void maybe_unlink_and_free(char *path); + +/* Creates a new stream named 'name' that will send and receive data on 'fd' + * and stores a pointer to the stream in '*streamp'. Initial connection status + * 'connect_status' is interpreted as described for stream_init(). + * + * When '*streamp' is closed, then 'unlink_path' (if nonnull) will be passed to + * fatal_signal_unlink_file_now() and then freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_stream(const char *name, int fd, int connect_status, + char *unlink_path, struct stream **streamp) +{ + struct stream_fd *s; + + s = xmalloc(sizeof *s); + stream_init(&s->stream, &stream_fd_class, connect_status, name); + s->fd = fd; + s->unlink_path = unlink_path; + *streamp = &s->stream; + return 0; +} + +static struct stream_fd * +stream_fd_cast(struct stream *stream) +{ + stream_assert_class(stream, &stream_fd_class); + return CONTAINER_OF(stream, struct stream_fd, stream); +} + +static void +fd_close(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + close(s->fd); + maybe_unlink_and_free(s->unlink_path); + free(s); +} + +static int +fd_connect(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + return check_connection_completion(s->fd); +} + +static ssize_t +fd_recv(struct stream *stream, void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval = read(s->fd, buffer, n); + return retval >= 0 ? retval : -errno; +} + +static ssize_t +fd_send(struct stream *stream, const void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval = write(s->fd, buffer, n); + return (retval > 0 ? retval + : retval == 0 ? -EAGAIN + : -errno); +} + +static void +fd_wait(struct stream *stream, enum stream_wait_type wait) +{ + struct stream_fd *s = stream_fd_cast(stream); + switch (wait) { + case STREAM_CONNECT: + case STREAM_SEND: + poll_fd_wait(s->fd, POLLOUT); + break; + + case STREAM_RECV: + poll_fd_wait(s->fd, POLLIN); + break; + + default: + NOT_REACHED(); + } +} + +static struct stream_class stream_fd_class = { + "fd", /* name */ + NULL, /* open */ + fd_close, /* close */ + fd_connect, /* connect */ + fd_recv, /* recv */ + fd_send, /* send */ + fd_wait, /* wait */ +}; + +/* Passive file descriptor stream. */ + +struct fd_pstream +{ + struct pstream pstream; + int fd; + int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, + struct stream **); + char *unlink_path; +}; + +static struct pstream_class fd_pstream_class; + +static struct fd_pstream * +fd_pstream_cast(struct pstream *pstream) +{ + pstream_assert_class(pstream, &fd_pstream_class); + return CONTAINER_OF(pstream, struct fd_pstream, pstream); +} + +/* Creates a new pstream named 'name' that will accept new socket connections + * on 'fd' and stores a pointer to the stream in '*pstreamp'. + * + * When a connection has been accepted, 'accept_cb' will be called with the new + * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. + * accept_cb must return 0 if the connection is successful, in which case it + * must initialize '*streamp' to the new stream, or a positive errno value on + * error. In either case accept_cb takes ownership of the 'fd' passed in. + * + * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed + * to fatal_signal_unlink_file_now() and freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_pstream(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr *sa, + size_t sa_len, struct stream **streamp), + char *unlink_path, struct pstream **pstreamp) +{ + struct fd_pstream *ps = xmalloc(sizeof *ps); + pstream_init(&ps->pstream, &fd_pstream_class, name); + ps->fd = fd; + ps->accept_cb = accept_cb; + ps->unlink_path = unlink_path; + *pstreamp = &ps->pstream; + return 0; +} + +static void +pfd_close(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + close(ps->fd); + maybe_unlink_and_free(ps->unlink_path); + free(ps); +} + +static int +pfd_accept(struct pstream *pstream, struct stream **new_streamp) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + struct sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + int new_fd; + int retval; + + new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); + if (new_fd < 0) { + int retval = errno; + if (retval != EAGAIN) { + VLOG_DBG_RL(&rl, "accept: %s", strerror(retval)); + } + return retval; + } + + retval = set_nonblocking(new_fd); + if (retval) { + close(new_fd); + return retval; + } + + return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, + new_streamp); +} + +static void +pfd_wait(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + poll_fd_wait(ps->fd, POLLIN); +} + +static struct pstream_class fd_pstream_class = { + "pstream", + NULL, + pfd_close, + pfd_accept, + pfd_wait +}; + +/* Helper functions. */ +static void +maybe_unlink_and_free(char *path) +{ + if (path) { + fatal_signal_unlink_file_now(path); + free(path); + } +} diff --git a/lib/stream-fd.h b/lib/stream-fd.h new file mode 100644 index 00000000..d2a34eb2 --- /dev/null +++ b/lib/stream-fd.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STREAM_FD_H +#define STREAM_FD_H 1 + +#include +#include +#include + +struct stream; +struct pstream; +struct sockaddr; + +int new_fd_stream(const char *name, int fd, int connect_status, + char *unlink_path, struct stream **streamp); +int new_fd_pstream(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr *, + size_t sa_len, struct stream **), + char *unlink_path, + struct pstream **pstreamp); + +#endif /* stream-fd.h */ diff --git a/lib/stream-provider.h b/lib/stream-provider.h new file mode 100644 index 00000000..6beaab75 --- /dev/null +++ b/lib/stream-provider.h @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STREAM_PROVIDER_H +#define STREAM_PROVIDER_H 1 + +#include +#include +#include "stream.h" + +/* Active stream connection. */ + +/* Active stream connection. + * + * This structure should be treated as opaque by implementation. */ +struct stream { + struct stream_class *class; + int state; + int error; + uint32_t remote_ip; + uint16_t remote_port; + uint32_t local_ip; + uint16_t local_port; + char *name; +}; + +void stream_init(struct stream *, struct stream_class *, int connect_status, + const char *name); +void stream_set_remote_ip(struct stream *, uint32_t remote_ip); +void stream_set_remote_port(struct stream *, uint16_t remote_port); +void stream_set_local_ip(struct stream *, uint32_t local_ip); +void stream_set_local_port(struct stream *, uint16_t local_port); +static inline void stream_assert_class(const struct stream *stream, + const struct stream_class *class) +{ + assert(stream->class == class); +} + +struct stream_class { + /* Prefix for connection names, e.g. "tcp", "ssl", "unix". */ + const char *name; + + /* Attempts to connect to a peer. 'name' is the full connection name + * provided by the user, e.g. "tcp:1.2.3.4". This name is useful for error + * messages but must not be modified. + * + * 'suffix' is a copy of 'name' following the colon and may be modified. + * + * Returns 0 if successful, otherwise a positive errno value. If + * successful, stores a pointer to the new connection in '*streamp'. + * + * The open function must not block waiting for a connection to complete. + * If the connection cannot be completed immediately, it should return + * EAGAIN (not EINPROGRESS, as returned by the connect system call) and + * continue the connection in the background. */ + int (*open)(const char *name, char *suffix, struct stream **streamp); + + /* Closes 'stream' and frees associated memory. */ + void (*close)(struct stream *stream); + + /* Tries to complete the connection on 'stream'. If 'stream''s connection + * is complete, returns 0 if the connection was successful or a positive + * errno value if it failed. If the connection is still in progress, + * returns EAGAIN. + * + * The connect function must not block waiting for the connection to + * complete; instead, it should return EAGAIN immediately. */ + int (*connect)(struct stream *stream); + + /* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and + * returns: + * + * - If successful, the number of bytes received (between 1 and 'n'). + * + * - On error, a negative errno value. + * + * - 0, if the connection has been closed in the normal fashion. + * + * The recv function will not be passed a zero 'n'. + * + * The recv function must not block waiting for data to arrive. If no data + * have been received, it should return -EAGAIN immediately. */ + ssize_t (*recv)(struct stream *stream, void *buffer, size_t n); + + /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns: + * + * - If successful, the number of bytes sent (between 1 and 'n'). + * + * - On error, a negative errno value. + * + * - Never returns 0. + * + * The send function will not be passed a zero 'n'. + * + * The send function must not block. If no bytes can be immediately + * accepted for transmission, it should return -EAGAIN immediately. */ + ssize_t (*send)(struct stream *stream, const void *buffer, size_t n); + + /* Arranges for the poll loop to wake up when 'stream' is ready to take an + * action of the given 'type'. */ + void (*wait)(struct stream *stream, enum stream_wait_type type); +}; + +/* Passive listener for incoming stream connections. + * + * This structure should be treated as opaque by stream implementations. */ +struct pstream { + struct pstream_class *class; + char *name; +}; + +void pstream_init(struct pstream *, struct pstream_class *, const char *name); +static inline void pstream_assert_class(const struct pstream *pstream, + const struct pstream_class *class) +{ + assert(pstream->class == class); +} + +struct pstream_class { + /* Prefix for connection names, e.g. "ptcp", "pssl", "punix". */ + const char *name; + + /* Attempts to start listening for stream connections. 'name' is the full + * connection name provided by the user, e.g. "ptcp:1234". This name is + * useful for error messages but must not be modified. + * + * 'suffix' is a copy of 'name' following the colon and may be modified. + * + * Returns 0 if successful, otherwise a positive errno value. If + * successful, stores a pointer to the new connection in '*pstreamp'. + * + * The listen function must not block. If the connection cannot be + * completed immediately, it should return EAGAIN (not EINPROGRESS, as + * returned by the connect system call) and continue the connection in the + * background. */ + int (*listen)(const char *name, char *suffix, struct pstream **pstreamp); + + /* Closes 'pstream' and frees associated memory. */ + void (*close)(struct pstream *pstream); + + /* Tries to accept a new connection on 'pstream'. If successful, stores + * the new connection in '*new_streamp' and returns 0. Otherwise, returns + * a positive errno value. + * + * The accept function must not block waiting for a connection. If no + * connection is ready to be accepted, it should return EAGAIN. */ + int (*accept)(struct pstream *pstream, struct stream **new_streamp); + + /* Arranges for the poll loop to wake up when a connection is ready to be + * accepted on 'pstream'. */ + void (*wait)(struct pstream *pstream); +}; + +/* Active and passive stream classes. */ +extern struct stream_class tcp_stream_class; +extern struct pstream_class ptcp_pstream_class; +extern struct stream_class unix_stream_class; +extern struct pstream_class punix_pstream_class; + +#endif /* stream-provider.h */ diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c new file mode 100644 index 00000000..ecd96865 --- /dev/null +++ b/lib/stream-tcp.c @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "packets.h" +#include "socket-util.h" +#include "util.h" +#include "stream-provider.h" +#include "stream-fd.h" + +#include "vlog.h" +#define THIS_MODULE VLM_stream_tcp + +/* Active TCP. */ + +static int +new_tcp_stream(const char *name, int fd, int connect_status, + const struct sockaddr_in *remote, struct stream **streamp) +{ + struct sockaddr_in local; + socklen_t local_len = sizeof local; + int on = 1; + int retval; + + /* Get the local IP and port information */ + retval = getsockname(fd, (struct sockaddr *)&local, &local_len); + if (retval) { + memset(&local, 0, sizeof local); + } + + retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on); + if (retval) { + VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno)); + close(fd); + return errno; + } + + retval = new_fd_stream(name, fd, connect_status, NULL, streamp); + if (!retval) { + struct stream *stream = *streamp; + stream_set_remote_ip(stream, remote->sin_addr.s_addr); + stream_set_remote_port(stream, remote->sin_port); + stream_set_local_ip(stream, local.sin_addr.s_addr); + stream_set_local_port(stream, local.sin_port); + } + return retval; +} + +static int +tcp_open(const char *name, char *suffix, struct stream **streamp) +{ + struct sockaddr_in sin; + int fd, error; + + error = tcp_open_active(suffix, 0, &sin, &fd); + if (fd >= 0) { + return new_tcp_stream(name, fd, error, &sin, streamp); + } else { + VLOG_ERR("%s: connect: %s", name, strerror(error)); + return error; + } +} + +struct stream_class tcp_stream_class = { + "tcp", /* name */ + tcp_open, /* open */ + NULL, /* close */ + NULL, /* connect */ + NULL, /* recv */ + NULL, /* send */ + NULL, /* wait */ +}; + +/* Passive TCP. */ + +static int ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len, + struct stream **streamp); + +static int +ptcp_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp) +{ + int fd; + + fd = tcp_open_passive(suffix, 0); + if (fd < 0) { + return -fd; + } else { + return new_fd_pstream("ptcp", fd, ptcp_accept, NULL, pstreamp); + } +} + +static int +ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len, + struct stream **streamp) +{ + const struct sockaddr_in *sin = (const struct sockaddr_in *) sa; + char name[128]; + + if (sa_len == sizeof(struct sockaddr_in) && sin->sin_family == AF_INET) { + sprintf(name, "tcp:"IP_FMT, IP_ARGS(&sin->sin_addr)); + sprintf(strchr(name, '\0'), ":%"PRIu16, ntohs(sin->sin_port)); + } else { + strcpy(name, "tcp"); + } + return new_tcp_stream(name, fd, 0, sin, streamp); +} + +struct pstream_class ptcp_pstream_class = { + "ptcp", + ptcp_open, + NULL, + NULL, + NULL +}; + diff --git a/lib/stream-unix.c b/lib/stream-unix.c new file mode 100644 index 00000000..a5dfd55b --- /dev/null +++ b/lib/stream-unix.c @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "packets.h" +#include "poll-loop.h" +#include "socket-util.h" +#include "util.h" +#include "stream-provider.h" +#include "stream-fd.h" + +#include "vlog.h" +#define THIS_MODULE VLM_stream_unix + +/* Active UNIX socket. */ + +/* Number of unix sockets created so far, to ensure binding path uniqueness. */ +static int n_unix_sockets; + +static int +unix_open(const char *name, char *suffix, struct stream **streamp) +{ + const char *connect_path = suffix; + char *bind_path; + int fd; + + bind_path = xasprintf("/tmp/stream-unix.%ld.%d", + (long int) getpid(), n_unix_sockets++); + fd = make_unix_socket(SOCK_STREAM, true, false, bind_path, connect_path); + if (fd < 0) { + VLOG_ERR("%s: connection to %s failed: %s", + bind_path, connect_path, strerror(-fd)); + free(bind_path); + return -fd; + } + + return new_fd_stream(name, fd, check_connection_completion(fd), + bind_path, streamp); +} + +struct stream_class unix_stream_class = { + "unix", /* name */ + unix_open, /* open */ + NULL, /* close */ + NULL, /* connect */ + NULL, /* recv */ + NULL, /* send */ + NULL, /* wait */ +}; + +/* Passive UNIX socket. */ + +static int punix_accept(int fd, const struct sockaddr *sa, size_t sa_len, + struct stream **streamp); + +static int +punix_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp) +{ + int fd, error; + + fd = make_unix_socket(SOCK_STREAM, true, true, suffix, NULL); + if (fd < 0) { + VLOG_ERR("%s: binding failed: %s", suffix, strerror(errno)); + return errno; + } + + error = set_nonblocking(fd); + if (error) { + close(fd); + return error; + } + + if (listen(fd, 10) < 0) { + error = errno; + VLOG_ERR("%s: listen: %s", name, strerror(error)); + close(fd); + return error; + } + + return new_fd_pstream("punix", fd, punix_accept, + xstrdup(suffix), pstreamp); +} + +static int +punix_accept(int fd, const struct sockaddr *sa, size_t sa_len, + struct stream **streamp) +{ + const struct sockaddr_un *sun = (const struct sockaddr_un *) sa; + int name_len = get_unix_name_len(sa_len); + char name[128]; + + if (name_len > 0) { + snprintf(name, sizeof name, "unix:%.*s", name_len, sun->sun_path); + } else { + strcpy(name, "unix"); + } + return new_fd_stream(name, fd, 0, NULL, streamp); +} + +struct pstream_class punix_pstream_class = { + "punix", + punix_open, + NULL, + NULL, + NULL +}; + diff --git a/lib/stream.c b/lib/stream.c new file mode 100644 index 00000000..1a0ea7ba --- /dev/null +++ b/lib/stream.c @@ -0,0 +1,490 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "stream-provider.h" +#include +#include +#include +#include +#include +#include +#include +#include "coverage.h" +#include "dynamic-string.h" +#include "flow.h" +#include "ofp-print.h" +#include "ofpbuf.h" +#include "openflow/nicira-ext.h" +#include "openflow/openflow.h" +#include "packets.h" +#include "poll-loop.h" +#include "random.h" +#include "util.h" + +#define THIS_MODULE VLM_stream +#include "vlog.h" + +/* State of an active stream.*/ +enum stream_state { + SCS_CONNECTING, /* Underlying stream is not connected. */ + SCS_CONNECTED, /* Connection established. */ + SCS_DISCONNECTED /* Connection failed or connection closed. */ +}; + +static struct stream_class *stream_classes[] = { + &tcp_stream_class, + &unix_stream_class, +}; + +static struct pstream_class *pstream_classes[] = { + &ptcp_pstream_class, + &punix_pstream_class, +}; + +/* Check the validity of the stream class structures. */ +static void +check_stream_classes(void) +{ +#ifndef NDEBUG + size_t i; + + for (i = 0; i < ARRAY_SIZE(stream_classes); i++) { + struct stream_class *class = stream_classes[i]; + assert(class->name != NULL); + assert(class->open != NULL); + if (class->close || class->recv || class->send || class->wait) { + assert(class->close != NULL); + assert(class->recv != NULL); + assert(class->send != NULL); + assert(class->wait != NULL); + } else { + /* This class delegates to another one. */ + } + } + + for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) { + struct pstream_class *class = pstream_classes[i]; + assert(class->name != NULL); + assert(class->listen != NULL); + if (class->close || class->accept || class->wait) { + assert(class->close != NULL); + assert(class->accept != NULL); + assert(class->wait != NULL); + } else { + /* This class delegates to another one. */ + } + } +#endif +} + +/* Prints information on active (if 'active') and passive (if 'passive') + * connection methods supported by the stream. */ +void +stream_usage(const char *name, bool active, bool passive) +{ + /* Really this should be implemented via callbacks into the stream + * providers, but that seems too heavy-weight to bother with at the + * moment. */ + + printf("\n"); + if (active) { + printf("Active %s connection methods:\n", name); + printf(" tcp:IP:PORT " + "PORT at remote IP\n"); + printf(" unix:FILE " + "Unix domain socket named FILE\n"); + } + + if (passive) { + printf("Passive %s connection methods:\n", name); + printf(" ptcp:PORT[:IP] " + "listen to TCP PORT on IP\n"); + printf(" punix:FILE " + "listen on Unix domain socket FILE\n"); + } +} + +/* Attempts to connect a stream to a remote peer. 'name' is a connection name + * in the form "TYPE:ARGS", where TYPE is an active stream class's name and + * ARGS are stream class-specific. + * + * Returns 0 if successful, otherwise a positive errno value. If successful, + * stores a pointer to the new connection in '*streamp', otherwise a null + * pointer. */ +int +stream_open(const char *name, struct stream **streamp) +{ + size_t prefix_len; + size_t i; + + COVERAGE_INC(stream_open); + check_stream_classes(); + + *streamp = NULL; + prefix_len = strcspn(name, ":"); + if (prefix_len == strlen(name)) { + return EAFNOSUPPORT; + } + for (i = 0; i < ARRAY_SIZE(stream_classes); i++) { + struct stream_class *class = stream_classes[i]; + if (strlen(class->name) == prefix_len + && !memcmp(class->name, name, prefix_len)) { + struct stream *stream; + char *suffix_copy = xstrdup(name + prefix_len + 1); + int retval = class->open(name, suffix_copy, &stream); + free(suffix_copy); + if (!retval) { + assert(stream->state != SCS_CONNECTING + || stream->class->connect); + *streamp = stream; + } + return retval; + } + } + return EAFNOSUPPORT; +} + +int +stream_open_block(const char *name, struct stream **streamp) +{ + struct stream *stream; + int error; + + error = stream_open(name, &stream); + while (error == EAGAIN) { + stream_connect_wait(stream); + poll_block(); + error = stream_connect(stream); + assert(error != EINPROGRESS); + } + if (error) { + stream_close(stream); + *streamp = NULL; + } else { + *streamp = stream; + } + return error; +} + +/* Closes 'stream'. */ +void +stream_close(struct stream *stream) +{ + if (stream != NULL) { + char *name = stream->name; + (stream->class->close)(stream); + free(name); + } +} + +/* Returns the name of 'stream', that is, the string passed to + * stream_open(). */ +const char * +stream_get_name(const struct stream *stream) +{ + return stream ? stream->name : "(null)"; +} + +/* Returns the IP address of the peer, or 0 if the peer is not connected over + * an IP-based protocol or if its IP address is not yet known. */ +uint32_t +stream_get_remote_ip(const struct stream *stream) +{ + return stream->remote_ip; +} + +/* Returns the transport port of the peer, or 0 if the connection does not + * contain a port or if the port is not yet known. */ +uint16_t +stream_get_remote_port(const struct stream *stream) +{ + return stream->remote_port; +} + +/* Returns the IP address used to connect to the peer, or 0 if the connection + * is not an IP-based protocol or if its IP address is not yet known. */ +uint32_t +stream_get_local_ip(const struct stream *stream) +{ + return stream->local_ip; +} + +/* Returns the transport port used to connect to the peer, or 0 if the + * connection does not contain a port or if the port is not yet known. */ +uint16_t +stream_get_local_port(const struct stream *stream) +{ + return stream->local_port; +} + +static void +scs_connecting(struct stream *stream) +{ + int retval = (stream->class->connect)(stream); + assert(retval != EINPROGRESS); + if (!retval) { + stream->state = SCS_CONNECTED; + } else if (retval != EAGAIN) { + stream->state = SCS_DISCONNECTED; + stream->error = retval; + } +} + +/* Tries to complete the connection on 'stream', which must be an active + * stream. If 'stream''s connection is complete, returns 0 if the connection + * was successful or a positive errno value if it failed. If the + * connection is still in progress, returns EAGAIN. */ +int +stream_connect(struct stream *stream) +{ + enum stream_state last_state; + + do { + last_state = stream->state; + switch (stream->state) { + case SCS_CONNECTING: + scs_connecting(stream); + break; + + case SCS_CONNECTED: + return 0; + + case SCS_DISCONNECTED: + return stream->error; + + default: + NOT_REACHED(); + } + } while (stream->state != last_state); + + return EAGAIN; +} + +/* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns: + * + * - If successful, the number of bytes received (between 1 and 'n'). + * + * - On error, a negative errno value. + * + * - 0, if the connection has been closed in the normal fashion, or if 'n' + * is zero. + * + * The recv function will not block waiting for a packet to arrive. If no + * data have been received, it returns -EAGAIN immediately. */ +int +stream_recv(struct stream *stream, void *buffer, size_t n) +{ + int retval = stream_connect(stream); + return (retval ? -retval + : n == 0 ? 0 + : (stream->class->recv)(stream, buffer, n)); +} + +/* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns: + * + * - If successful, the number of bytes sent (between 1 and 'n'). 0 is + * only a valid return value if 'n' is 0. + * + * - On error, a negative errno value. + * + * The send function will not block. If no bytes can be immediately accepted + * for transmission, it returns -EAGAIN immediately. */ +int +stream_send(struct stream *stream, const void *buffer, size_t n) +{ + int retval = stream_connect(stream); + return (retval ? -retval + : n == 0 ? 0 + : (stream->class->send)(stream, buffer, n)); +} + +void +stream_wait(struct stream *stream, enum stream_wait_type wait) +{ + assert(wait == STREAM_CONNECT || wait == STREAM_RECV + || wait == STREAM_SEND); + + switch (stream->state) { + case SCS_CONNECTING: + wait = STREAM_CONNECT; + break; + + case SCS_DISCONNECTED: + poll_immediate_wake(); + return; + } + (stream->class->wait)(stream, wait); +} + +void +stream_connect_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_CONNECT); +} + +void +stream_recv_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_RECV); +} + +void +stream_send_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_SEND); +} + +/* Attempts to start listening for remote stream connections. 'name' is a + * connection name in the form "TYPE:ARGS", where TYPE is an passive stream + * class's name and ARGS are stream class-specific. + * + * Returns 0 if successful, otherwise a positive errno value. If successful, + * stores a pointer to the new connection in '*pstreamp', otherwise a null + * pointer. */ +int +pstream_open(const char *name, struct pstream **pstreamp) +{ + size_t prefix_len; + size_t i; + + check_stream_classes(); + + *pstreamp = NULL; + prefix_len = strcspn(name, ":"); + if (prefix_len == strlen(name)) { + return EAFNOSUPPORT; + } + for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) { + struct pstream_class *class = pstream_classes[i]; + if (strlen(class->name) == prefix_len + && !memcmp(class->name, name, prefix_len)) { + char *suffix_copy = xstrdup(name + prefix_len + 1); + int retval = class->listen(name, suffix_copy, pstreamp); + free(suffix_copy); + if (retval) { + *pstreamp = NULL; + } + return retval; + } + } + return EAFNOSUPPORT; +} + +/* Returns the name that was used to open 'pstream'. The caller must not + * modify or free the name. */ +const char * +pstream_get_name(const struct pstream *pstream) +{ + return pstream->name; +} + +/* Closes 'pstream'. */ +void +pstream_close(struct pstream *pstream) +{ + if (pstream != NULL) { + char *name = pstream->name; + (pstream->class->close)(pstream); + free(name); + } +} + +/* Tries to accept a new connection on 'pstream'. If successful, stores the + * new connection in '*new_stream' and returns 0. Otherwise, returns a + * positive errno value. + * + * pstream_accept() will not block waiting for a connection. If no connection + * is ready to be accepted, it returns EAGAIN immediately. */ +int +pstream_accept(struct pstream *pstream, struct stream **new_stream) +{ + int retval = (pstream->class->accept)(pstream, new_stream); + if (retval) { + *new_stream = NULL; + } else { + assert((*new_stream)->state != SCS_CONNECTING + || (*new_stream)->class->connect); + } + return retval; +} + +void +pstream_wait(struct pstream *pstream) +{ + (pstream->class->wait)(pstream); +} + +/* Initializes 'stream' as a new stream named 'name', implemented via 'class'. + * The initial connection status, supplied as 'connect_status', is interpreted + * as follows: + * + * - 0: 'stream' is connected. Its 'send' and 'recv' functions may be + * called in the normal fashion. + * + * - EAGAIN: 'stream' is trying to complete a connection. Its 'connect' + * function should be called to complete the connection. + * + * - Other positive errno values indicate that the connection failed with + * the specified error. + * + * After calling this function, stream_close() must be used to destroy + * 'stream', otherwise resources will be leaked. + * + * The caller retains ownership of 'name'. */ +void +stream_init(struct stream *stream, struct stream_class *class, + int connect_status, const char *name) +{ + stream->class = class; + stream->state = (connect_status == EAGAIN ? SCS_CONNECTING + : !connect_status ? SCS_CONNECTED + : SCS_DISCONNECTED); + stream->error = connect_status; + stream->name = xstrdup(name); +} + +void +stream_set_remote_ip(struct stream *stream, uint32_t ip) +{ + stream->remote_ip = ip; +} + +void +stream_set_remote_port(struct stream *stream, uint16_t port) +{ + stream->remote_port = port; +} + +void +stream_set_local_ip(struct stream *stream, uint32_t ip) +{ + stream->local_ip = ip; +} + +void +stream_set_local_port(struct stream *stream, uint16_t port) +{ + stream->local_port = port; +} + +void +pstream_init(struct pstream *pstream, struct pstream_class *class, + const char *name) +{ + pstream->class = class; + pstream->name = xstrdup(name); +} diff --git a/lib/stream.h b/lib/stream.h new file mode 100644 index 00000000..76c9e6c6 --- /dev/null +++ b/lib/stream.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STREAM_H +#define STREAM_H 1 + +#include +#include +#include +#include + +#include "flow.h" + +struct pstream; +struct stream; + +void stream_usage(const char *name, bool active, bool passive); + +/* Bidirectional byte streams. */ +int stream_open(const char *name, struct stream **); +int stream_open_block(const char *name, struct stream **); +void stream_close(struct stream *); +const char *stream_get_name(const struct stream *); +uint32_t stream_get_remote_ip(const struct stream *); +uint16_t stream_get_remote_port(const struct stream *); +uint32_t stream_get_local_ip(const struct stream *); +uint16_t stream_get_local_port(const struct stream *); +int stream_connect(struct stream *); +int stream_recv(struct stream *, void *buffer, size_t n); +int stream_send(struct stream *, const void *buffer, size_t n); + +enum stream_wait_type { + STREAM_CONNECT, + STREAM_RECV, + STREAM_SEND +}; +void stream_wait(struct stream *, enum stream_wait_type); +void stream_connect_wait(struct stream *); +void stream_recv_wait(struct stream *); +void stream_send_wait(struct stream *); + +/* Passive streams: listeners for incoming stream connections. */ +int pstream_open(const char *name, struct pstream **); +const char *pstream_get_name(const struct pstream *); +void pstream_close(struct pstream *); +int pstream_accept(struct pstream *, struct stream **); +void pstream_wait(struct pstream *); + +#endif /* stream.h */ diff --git a/lib/vlog-modules.def b/lib/vlog-modules.def index 59ab045c..da345df4 100644 --- a/lib/vlog-modules.def +++ b/lib/vlog-modules.def @@ -62,6 +62,10 @@ VLOG_MODULE(process) VLOG_MODULE(rconn) VLOG_MODULE(rtnetlink) VLOG_MODULE(stp) +VLOG_MODULE(stream_fd) +VLOG_MODULE(stream_tcp) +VLOG_MODULE(stream_unix) +VLOG_MODULE(stream) VLOG_MODULE(stats) VLOG_MODULE(status) VLOG_MODULE(svec)