lib/socket-util.h \
lib/stp.c \
lib/stp.h \
+ lib/stream-fd.c \
+ lib/stream-fd.h \
+ lib/stream-provider.h \
+ lib/stream-tcp.c \
+ lib/stream-unix.c \
+ lib/stream.c \
+ lib/stream.h \
lib/svec.c \
lib/svec.h \
lib/tag.c \
lib/process.c \
lib/rconn.c \
lib/rtnetlink.c \
+ lib/stream.c \
lib/timeval.c \
lib/unixctl.c \
lib/util.c \
--- /dev/null
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-fd.h"
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "fatal-signal.h"
+#include "leak-checker.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_fd
+
+/* Active file descriptor stream. */
+
+struct stream_fd
+{
+ struct stream stream;
+ int fd;
+ char *unlink_path;
+};
+
+static struct stream_class stream_fd_class;
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
+
+static void maybe_unlink_and_free(char *path);
+
+/* Creates a new stream named 'name' that will send and receive data on 'fd'
+ * and stores a pointer to the stream in '*streamp'. Initial connection status
+ * 'connect_status' is interpreted as described for stream_init().
+ *
+ * When '*streamp' is closed, then 'unlink_path' (if nonnull) will be passed to
+ * fatal_signal_unlink_file_now() and then freed with free().
+ *
+ * Returns 0 if successful, otherwise a positive errno value. (The current
+ * implementation never fails.) */
+int
+new_fd_stream(const char *name, int fd, int connect_status,
+ char *unlink_path, struct stream **streamp)
+{
+ struct stream_fd *s;
+
+ s = xmalloc(sizeof *s);
+ stream_init(&s->stream, &stream_fd_class, connect_status, name);
+ s->fd = fd;
+ s->unlink_path = unlink_path;
+ *streamp = &s->stream;
+ return 0;
+}
+
+static struct stream_fd *
+stream_fd_cast(struct stream *stream)
+{
+ stream_assert_class(stream, &stream_fd_class);
+ return CONTAINER_OF(stream, struct stream_fd, stream);
+}
+
+static void
+fd_close(struct stream *stream)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ close(s->fd);
+ maybe_unlink_and_free(s->unlink_path);
+ free(s);
+}
+
+static int
+fd_connect(struct stream *stream)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ return check_connection_completion(s->fd);
+}
+
+static ssize_t
+fd_recv(struct stream *stream, void *buffer, size_t n)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ ssize_t retval = read(s->fd, buffer, n);
+ return retval >= 0 ? retval : -errno;
+}
+
+static ssize_t
+fd_send(struct stream *stream, const void *buffer, size_t n)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ ssize_t retval = write(s->fd, buffer, n);
+ return (retval > 0 ? retval
+ : retval == 0 ? -EAGAIN
+ : -errno);
+}
+
+static void
+fd_wait(struct stream *stream, enum stream_wait_type wait)
+{
+ struct stream_fd *s = stream_fd_cast(stream);
+ switch (wait) {
+ case STREAM_CONNECT:
+ case STREAM_SEND:
+ poll_fd_wait(s->fd, POLLOUT);
+ break;
+
+ case STREAM_RECV:
+ poll_fd_wait(s->fd, POLLIN);
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+}
+
+static struct stream_class stream_fd_class = {
+ "fd", /* name */
+ NULL, /* open */
+ fd_close, /* close */
+ fd_connect, /* connect */
+ fd_recv, /* recv */
+ fd_send, /* send */
+ fd_wait, /* wait */
+};
+\f
+/* Passive file descriptor stream. */
+
+struct fd_pstream
+{
+ struct pstream pstream;
+ int fd;
+ int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
+ struct stream **);
+ char *unlink_path;
+};
+
+static struct pstream_class fd_pstream_class;
+
+static struct fd_pstream *
+fd_pstream_cast(struct pstream *pstream)
+{
+ pstream_assert_class(pstream, &fd_pstream_class);
+ return CONTAINER_OF(pstream, struct fd_pstream, pstream);
+}
+
+/* Creates a new pstream named 'name' that will accept new socket connections
+ * on 'fd' and stores a pointer to the stream in '*pstreamp'.
+ *
+ * When a connection has been accepted, 'accept_cb' will be called with the new
+ * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
+ * accept_cb must return 0 if the connection is successful, in which case it
+ * must initialize '*streamp' to the new stream, or a positive errno value on
+ * error. In either case accept_cb takes ownership of the 'fd' passed in.
+ *
+ * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed
+ * to fatal_signal_unlink_file_now() and freed with free().
+ *
+ * Returns 0 if successful, otherwise a positive errno value. (The current
+ * implementation never fails.) */
+int
+new_fd_pstream(const char *name, int fd,
+ int (*accept_cb)(int fd, const struct sockaddr *sa,
+ size_t sa_len, struct stream **streamp),
+ char *unlink_path, struct pstream **pstreamp)
+{
+ struct fd_pstream *ps = xmalloc(sizeof *ps);
+ pstream_init(&ps->pstream, &fd_pstream_class, name);
+ ps->fd = fd;
+ ps->accept_cb = accept_cb;
+ ps->unlink_path = unlink_path;
+ *pstreamp = &ps->pstream;
+ return 0;
+}
+
+static void
+pfd_close(struct pstream *pstream)
+{
+ struct fd_pstream *ps = fd_pstream_cast(pstream);
+ close(ps->fd);
+ maybe_unlink_and_free(ps->unlink_path);
+ free(ps);
+}
+
+static int
+pfd_accept(struct pstream *pstream, struct stream **new_streamp)
+{
+ struct fd_pstream *ps = fd_pstream_cast(pstream);
+ struct sockaddr_storage ss;
+ socklen_t ss_len = sizeof ss;
+ int new_fd;
+ int retval;
+
+ new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
+ if (new_fd < 0) {
+ int retval = errno;
+ if (retval != EAGAIN) {
+ VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
+ }
+ return retval;
+ }
+
+ retval = set_nonblocking(new_fd);
+ if (retval) {
+ close(new_fd);
+ return retval;
+ }
+
+ return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
+ new_streamp);
+}
+
+static void
+pfd_wait(struct pstream *pstream)
+{
+ struct fd_pstream *ps = fd_pstream_cast(pstream);
+ poll_fd_wait(ps->fd, POLLIN);
+}
+
+static struct pstream_class fd_pstream_class = {
+ "pstream",
+ NULL,
+ pfd_close,
+ pfd_accept,
+ pfd_wait
+};
+\f
+/* Helper functions. */
+static void
+maybe_unlink_and_free(char *path)
+{
+ if (path) {
+ fatal_signal_unlink_file_now(path);
+ free(path);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_FD_H
+#define STREAM_FD_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+struct stream;
+struct pstream;
+struct sockaddr;
+
+int new_fd_stream(const char *name, int fd, int connect_status,
+ char *unlink_path, struct stream **streamp);
+int new_fd_pstream(const char *name, int fd,
+ int (*accept_cb)(int fd, const struct sockaddr *,
+ size_t sa_len, struct stream **),
+ char *unlink_path,
+ struct pstream **pstreamp);
+
+#endif /* stream-fd.h */
--- /dev/null
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_PROVIDER_H
+#define STREAM_PROVIDER_H 1
+
+#include <assert.h>
+#include <sys/types.h>
+#include "stream.h"
+
+/* Active stream connection. */
+
+/* Active stream connection.
+ *
+ * This structure should be treated as opaque by implementation. */
+struct stream {
+ struct stream_class *class;
+ int state;
+ int error;
+ uint32_t remote_ip;
+ uint16_t remote_port;
+ uint32_t local_ip;
+ uint16_t local_port;
+ char *name;
+};
+
+void stream_init(struct stream *, struct stream_class *, int connect_status,
+ const char *name);
+void stream_set_remote_ip(struct stream *, uint32_t remote_ip);
+void stream_set_remote_port(struct stream *, uint16_t remote_port);
+void stream_set_local_ip(struct stream *, uint32_t local_ip);
+void stream_set_local_port(struct stream *, uint16_t local_port);
+static inline void stream_assert_class(const struct stream *stream,
+ const struct stream_class *class)
+{
+ assert(stream->class == class);
+}
+
+struct stream_class {
+ /* Prefix for connection names, e.g. "tcp", "ssl", "unix". */
+ const char *name;
+
+ /* Attempts to connect to a peer. 'name' is the full connection name
+ * provided by the user, e.g. "tcp:1.2.3.4". This name is useful for error
+ * messages but must not be modified.
+ *
+ * 'suffix' is a copy of 'name' following the colon and may be modified.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. If
+ * successful, stores a pointer to the new connection in '*streamp'.
+ *
+ * The open function must not block waiting for a connection to complete.
+ * If the connection cannot be completed immediately, it should return
+ * EAGAIN (not EINPROGRESS, as returned by the connect system call) and
+ * continue the connection in the background. */
+ int (*open)(const char *name, char *suffix, struct stream **streamp);
+
+ /* Closes 'stream' and frees associated memory. */
+ void (*close)(struct stream *stream);
+
+ /* Tries to complete the connection on 'stream'. If 'stream''s connection
+ * is complete, returns 0 if the connection was successful or a positive
+ * errno value if it failed. If the connection is still in progress,
+ * returns EAGAIN.
+ *
+ * The connect function must not block waiting for the connection to
+ * complete; instead, it should return EAGAIN immediately. */
+ int (*connect)(struct stream *stream);
+
+ /* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and
+ * returns:
+ *
+ * - If successful, the number of bytes received (between 1 and 'n').
+ *
+ * - On error, a negative errno value.
+ *
+ * - 0, if the connection has been closed in the normal fashion.
+ *
+ * The recv function will not be passed a zero 'n'.
+ *
+ * The recv function must not block waiting for data to arrive. If no data
+ * have been received, it should return -EAGAIN immediately. */
+ ssize_t (*recv)(struct stream *stream, void *buffer, size_t n);
+
+ /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
+ *
+ * - If successful, the number of bytes sent (between 1 and 'n').
+ *
+ * - On error, a negative errno value.
+ *
+ * - Never returns 0.
+ *
+ * The send function will not be passed a zero 'n'.
+ *
+ * The send function must not block. If no bytes can be immediately
+ * accepted for transmission, it should return -EAGAIN immediately. */
+ ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
+
+ /* Arranges for the poll loop to wake up when 'stream' is ready to take an
+ * action of the given 'type'. */
+ void (*wait)(struct stream *stream, enum stream_wait_type type);
+};
+\f
+/* Passive listener for incoming stream connections.
+ *
+ * This structure should be treated as opaque by stream implementations. */
+struct pstream {
+ struct pstream_class *class;
+ char *name;
+};
+
+void pstream_init(struct pstream *, struct pstream_class *, const char *name);
+static inline void pstream_assert_class(const struct pstream *pstream,
+ const struct pstream_class *class)
+{
+ assert(pstream->class == class);
+}
+
+struct pstream_class {
+ /* Prefix for connection names, e.g. "ptcp", "pssl", "punix". */
+ const char *name;
+
+ /* Attempts to start listening for stream connections. 'name' is the full
+ * connection name provided by the user, e.g. "ptcp:1234". This name is
+ * useful for error messages but must not be modified.
+ *
+ * 'suffix' is a copy of 'name' following the colon and may be modified.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. If
+ * successful, stores a pointer to the new connection in '*pstreamp'.
+ *
+ * The listen function must not block. If the connection cannot be
+ * completed immediately, it should return EAGAIN (not EINPROGRESS, as
+ * returned by the connect system call) and continue the connection in the
+ * background. */
+ int (*listen)(const char *name, char *suffix, struct pstream **pstreamp);
+
+ /* Closes 'pstream' and frees associated memory. */
+ void (*close)(struct pstream *pstream);
+
+ /* Tries to accept a new connection on 'pstream'. If successful, stores
+ * the new connection in '*new_streamp' and returns 0. Otherwise, returns
+ * a positive errno value.
+ *
+ * The accept function must not block waiting for a connection. If no
+ * connection is ready to be accepted, it should return EAGAIN. */
+ int (*accept)(struct pstream *pstream, struct stream **new_streamp);
+
+ /* Arranges for the poll loop to wake up when a connection is ready to be
+ * accepted on 'pstream'. */
+ void (*wait)(struct pstream *pstream);
+};
+
+/* Active and passive stream classes. */
+extern struct stream_class tcp_stream_class;
+extern struct pstream_class ptcp_pstream_class;
+extern struct stream_class unix_stream_class;
+extern struct pstream_class punix_pstream_class;
+
+#endif /* stream-provider.h */
--- /dev/null
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream.h"
+#include <errno.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "packets.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream-fd.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_tcp
+
+/* Active TCP. */
+
+static int
+new_tcp_stream(const char *name, int fd, int connect_status,
+ const struct sockaddr_in *remote, struct stream **streamp)
+{
+ struct sockaddr_in local;
+ socklen_t local_len = sizeof local;
+ int on = 1;
+ int retval;
+
+ /* Get the local IP and port information */
+ retval = getsockname(fd, (struct sockaddr *)&local, &local_len);
+ if (retval) {
+ memset(&local, 0, sizeof local);
+ }
+
+ retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
+ if (retval) {
+ VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
+ close(fd);
+ return errno;
+ }
+
+ retval = new_fd_stream(name, fd, connect_status, NULL, streamp);
+ if (!retval) {
+ struct stream *stream = *streamp;
+ stream_set_remote_ip(stream, remote->sin_addr.s_addr);
+ stream_set_remote_port(stream, remote->sin_port);
+ stream_set_local_ip(stream, local.sin_addr.s_addr);
+ stream_set_local_port(stream, local.sin_port);
+ }
+ return retval;
+}
+
+static int
+tcp_open(const char *name, char *suffix, struct stream **streamp)
+{
+ struct sockaddr_in sin;
+ int fd, error;
+
+ error = tcp_open_active(suffix, 0, &sin, &fd);
+ if (fd >= 0) {
+ return new_tcp_stream(name, fd, error, &sin, streamp);
+ } else {
+ VLOG_ERR("%s: connect: %s", name, strerror(error));
+ return error;
+ }
+}
+
+struct stream_class tcp_stream_class = {
+ "tcp", /* name */
+ tcp_open, /* open */
+ NULL, /* close */
+ NULL, /* connect */
+ NULL, /* recv */
+ NULL, /* send */
+ NULL, /* wait */
+};
+\f
+/* Passive TCP. */
+
+static int ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+ struct stream **streamp);
+
+static int
+ptcp_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp)
+{
+ int fd;
+
+ fd = tcp_open_passive(suffix, 0);
+ if (fd < 0) {
+ return -fd;
+ } else {
+ return new_fd_pstream("ptcp", fd, ptcp_accept, NULL, pstreamp);
+ }
+}
+
+static int
+ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+ struct stream **streamp)
+{
+ const struct sockaddr_in *sin = (const struct sockaddr_in *) sa;
+ char name[128];
+
+ if (sa_len == sizeof(struct sockaddr_in) && sin->sin_family == AF_INET) {
+ sprintf(name, "tcp:"IP_FMT, IP_ARGS(&sin->sin_addr));
+ sprintf(strchr(name, '\0'), ":%"PRIu16, ntohs(sin->sin_port));
+ } else {
+ strcpy(name, "tcp");
+ }
+ return new_tcp_stream(name, fd, 0, sin, streamp);
+}
+
+struct pstream_class ptcp_pstream_class = {
+ "ptcp",
+ ptcp_open,
+ NULL,
+ NULL,
+ NULL
+};
+
--- /dev/null
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream.h"
+#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <netdb.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "packets.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream-fd.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_unix
+
+/* Active UNIX socket. */
+
+/* Number of unix sockets created so far, to ensure binding path uniqueness. */
+static int n_unix_sockets;
+
+static int
+unix_open(const char *name, char *suffix, struct stream **streamp)
+{
+ const char *connect_path = suffix;
+ char *bind_path;
+ int fd;
+
+ bind_path = xasprintf("/tmp/stream-unix.%ld.%d",
+ (long int) getpid(), n_unix_sockets++);
+ fd = make_unix_socket(SOCK_STREAM, true, false, bind_path, connect_path);
+ if (fd < 0) {
+ VLOG_ERR("%s: connection to %s failed: %s",
+ bind_path, connect_path, strerror(-fd));
+ free(bind_path);
+ return -fd;
+ }
+
+ return new_fd_stream(name, fd, check_connection_completion(fd),
+ bind_path, streamp);
+}
+
+struct stream_class unix_stream_class = {
+ "unix", /* name */
+ unix_open, /* open */
+ NULL, /* close */
+ NULL, /* connect */
+ NULL, /* recv */
+ NULL, /* send */
+ NULL, /* wait */
+};
+\f
+/* Passive UNIX socket. */
+
+static int punix_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+ struct stream **streamp);
+
+static int
+punix_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp)
+{
+ int fd, error;
+
+ fd = make_unix_socket(SOCK_STREAM, true, true, suffix, NULL);
+ if (fd < 0) {
+ VLOG_ERR("%s: binding failed: %s", suffix, strerror(errno));
+ return errno;
+ }
+
+ error = set_nonblocking(fd);
+ if (error) {
+ close(fd);
+ return error;
+ }
+
+ if (listen(fd, 10) < 0) {
+ error = errno;
+ VLOG_ERR("%s: listen: %s", name, strerror(error));
+ close(fd);
+ return error;
+ }
+
+ return new_fd_pstream("punix", fd, punix_accept,
+ xstrdup(suffix), pstreamp);
+}
+
+static int
+punix_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+ struct stream **streamp)
+{
+ const struct sockaddr_un *sun = (const struct sockaddr_un *) sa;
+ int name_len = get_unix_name_len(sa_len);
+ char name[128];
+
+ if (name_len > 0) {
+ snprintf(name, sizeof name, "unix:%.*s", name_len, sun->sun_path);
+ } else {
+ strcpy(name, "unix");
+ }
+ return new_fd_stream(name, fd, 0, NULL, streamp);
+}
+
+struct pstream_class punix_pstream_class = {
+ "punix",
+ punix_open,
+ NULL,
+ NULL,
+ NULL
+};
+
--- /dev/null
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-provider.h"
+#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <netinet/in.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include "coverage.h"
+#include "dynamic-string.h"
+#include "flow.h"
+#include "ofp-print.h"
+#include "ofpbuf.h"
+#include "openflow/nicira-ext.h"
+#include "openflow/openflow.h"
+#include "packets.h"
+#include "poll-loop.h"
+#include "random.h"
+#include "util.h"
+
+#define THIS_MODULE VLM_stream
+#include "vlog.h"
+
+/* State of an active stream.*/
+enum stream_state {
+ SCS_CONNECTING, /* Underlying stream is not connected. */
+ SCS_CONNECTED, /* Connection established. */
+ SCS_DISCONNECTED /* Connection failed or connection closed. */
+};
+
+static struct stream_class *stream_classes[] = {
+ &tcp_stream_class,
+ &unix_stream_class,
+};
+
+static struct pstream_class *pstream_classes[] = {
+ &ptcp_pstream_class,
+ &punix_pstream_class,
+};
+
+/* Check the validity of the stream class structures. */
+static void
+check_stream_classes(void)
+{
+#ifndef NDEBUG
+ size_t i;
+
+ for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
+ struct stream_class *class = stream_classes[i];
+ assert(class->name != NULL);
+ assert(class->open != NULL);
+ if (class->close || class->recv || class->send || class->wait) {
+ assert(class->close != NULL);
+ assert(class->recv != NULL);
+ assert(class->send != NULL);
+ assert(class->wait != NULL);
+ } else {
+ /* This class delegates to another one. */
+ }
+ }
+
+ for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
+ struct pstream_class *class = pstream_classes[i];
+ assert(class->name != NULL);
+ assert(class->listen != NULL);
+ if (class->close || class->accept || class->wait) {
+ assert(class->close != NULL);
+ assert(class->accept != NULL);
+ assert(class->wait != NULL);
+ } else {
+ /* This class delegates to another one. */
+ }
+ }
+#endif
+}
+
+/* Prints information on active (if 'active') and passive (if 'passive')
+ * connection methods supported by the stream. */
+void
+stream_usage(const char *name, bool active, bool passive)
+{
+ /* Really this should be implemented via callbacks into the stream
+ * providers, but that seems too heavy-weight to bother with at the
+ * moment. */
+
+ printf("\n");
+ if (active) {
+ printf("Active %s connection methods:\n", name);
+ printf(" tcp:IP:PORT "
+ "PORT at remote IP\n");
+ printf(" unix:FILE "
+ "Unix domain socket named FILE\n");
+ }
+
+ if (passive) {
+ printf("Passive %s connection methods:\n", name);
+ printf(" ptcp:PORT[:IP] "
+ "listen to TCP PORT on IP\n");
+ printf(" punix:FILE "
+ "listen on Unix domain socket FILE\n");
+ }
+}
+
+/* Attempts to connect a stream to a remote peer. 'name' is a connection name
+ * in the form "TYPE:ARGS", where TYPE is an active stream class's name and
+ * ARGS are stream class-specific.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. If successful,
+ * stores a pointer to the new connection in '*streamp', otherwise a null
+ * pointer. */
+int
+stream_open(const char *name, struct stream **streamp)
+{
+ size_t prefix_len;
+ size_t i;
+
+ COVERAGE_INC(stream_open);
+ check_stream_classes();
+
+ *streamp = NULL;
+ prefix_len = strcspn(name, ":");
+ if (prefix_len == strlen(name)) {
+ return EAFNOSUPPORT;
+ }
+ for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
+ struct stream_class *class = stream_classes[i];
+ if (strlen(class->name) == prefix_len
+ && !memcmp(class->name, name, prefix_len)) {
+ struct stream *stream;
+ char *suffix_copy = xstrdup(name + prefix_len + 1);
+ int retval = class->open(name, suffix_copy, &stream);
+ free(suffix_copy);
+ if (!retval) {
+ assert(stream->state != SCS_CONNECTING
+ || stream->class->connect);
+ *streamp = stream;
+ }
+ return retval;
+ }
+ }
+ return EAFNOSUPPORT;
+}
+
+int
+stream_open_block(const char *name, struct stream **streamp)
+{
+ struct stream *stream;
+ int error;
+
+ error = stream_open(name, &stream);
+ while (error == EAGAIN) {
+ stream_connect_wait(stream);
+ poll_block();
+ error = stream_connect(stream);
+ assert(error != EINPROGRESS);
+ }
+ if (error) {
+ stream_close(stream);
+ *streamp = NULL;
+ } else {
+ *streamp = stream;
+ }
+ return error;
+}
+
+/* Closes 'stream'. */
+void
+stream_close(struct stream *stream)
+{
+ if (stream != NULL) {
+ char *name = stream->name;
+ (stream->class->close)(stream);
+ free(name);
+ }
+}
+
+/* Returns the name of 'stream', that is, the string passed to
+ * stream_open(). */
+const char *
+stream_get_name(const struct stream *stream)
+{
+ return stream ? stream->name : "(null)";
+}
+
+/* Returns the IP address of the peer, or 0 if the peer is not connected over
+ * an IP-based protocol or if its IP address is not yet known. */
+uint32_t
+stream_get_remote_ip(const struct stream *stream)
+{
+ return stream->remote_ip;
+}
+
+/* Returns the transport port of the peer, or 0 if the connection does not
+ * contain a port or if the port is not yet known. */
+uint16_t
+stream_get_remote_port(const struct stream *stream)
+{
+ return stream->remote_port;
+}
+
+/* Returns the IP address used to connect to the peer, or 0 if the connection
+ * is not an IP-based protocol or if its IP address is not yet known. */
+uint32_t
+stream_get_local_ip(const struct stream *stream)
+{
+ return stream->local_ip;
+}
+
+/* Returns the transport port used to connect to the peer, or 0 if the
+ * connection does not contain a port or if the port is not yet known. */
+uint16_t
+stream_get_local_port(const struct stream *stream)
+{
+ return stream->local_port;
+}
+
+static void
+scs_connecting(struct stream *stream)
+{
+ int retval = (stream->class->connect)(stream);
+ assert(retval != EINPROGRESS);
+ if (!retval) {
+ stream->state = SCS_CONNECTED;
+ } else if (retval != EAGAIN) {
+ stream->state = SCS_DISCONNECTED;
+ stream->error = retval;
+ }
+}
+
+/* Tries to complete the connection on 'stream', which must be an active
+ * stream. If 'stream''s connection is complete, returns 0 if the connection
+ * was successful or a positive errno value if it failed. If the
+ * connection is still in progress, returns EAGAIN. */
+int
+stream_connect(struct stream *stream)
+{
+ enum stream_state last_state;
+
+ do {
+ last_state = stream->state;
+ switch (stream->state) {
+ case SCS_CONNECTING:
+ scs_connecting(stream);
+ break;
+
+ case SCS_CONNECTED:
+ return 0;
+
+ case SCS_DISCONNECTED:
+ return stream->error;
+
+ default:
+ NOT_REACHED();
+ }
+ } while (stream->state != last_state);
+
+ return EAGAIN;
+}
+
+/* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns:
+ *
+ * - If successful, the number of bytes received (between 1 and 'n').
+ *
+ * - On error, a negative errno value.
+ *
+ * - 0, if the connection has been closed in the normal fashion, or if 'n'
+ * is zero.
+ *
+ * The recv function will not block waiting for a packet to arrive. If no
+ * data have been received, it returns -EAGAIN immediately. */
+int
+stream_recv(struct stream *stream, void *buffer, size_t n)
+{
+ int retval = stream_connect(stream);
+ return (retval ? -retval
+ : n == 0 ? 0
+ : (stream->class->recv)(stream, buffer, n));
+}
+
+/* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
+ *
+ * - If successful, the number of bytes sent (between 1 and 'n'). 0 is
+ * only a valid return value if 'n' is 0.
+ *
+ * - On error, a negative errno value.
+ *
+ * The send function will not block. If no bytes can be immediately accepted
+ * for transmission, it returns -EAGAIN immediately. */
+int
+stream_send(struct stream *stream, const void *buffer, size_t n)
+{
+ int retval = stream_connect(stream);
+ return (retval ? -retval
+ : n == 0 ? 0
+ : (stream->class->send)(stream, buffer, n));
+}
+
+void
+stream_wait(struct stream *stream, enum stream_wait_type wait)
+{
+ assert(wait == STREAM_CONNECT || wait == STREAM_RECV
+ || wait == STREAM_SEND);
+
+ switch (stream->state) {
+ case SCS_CONNECTING:
+ wait = STREAM_CONNECT;
+ break;
+
+ case SCS_DISCONNECTED:
+ poll_immediate_wake();
+ return;
+ }
+ (stream->class->wait)(stream, wait);
+}
+
+void
+stream_connect_wait(struct stream *stream)
+{
+ stream_wait(stream, STREAM_CONNECT);
+}
+
+void
+stream_recv_wait(struct stream *stream)
+{
+ stream_wait(stream, STREAM_RECV);
+}
+
+void
+stream_send_wait(struct stream *stream)
+{
+ stream_wait(stream, STREAM_SEND);
+}
+
+/* Attempts to start listening for remote stream connections. 'name' is a
+ * connection name in the form "TYPE:ARGS", where TYPE is an passive stream
+ * class's name and ARGS are stream class-specific.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. If successful,
+ * stores a pointer to the new connection in '*pstreamp', otherwise a null
+ * pointer. */
+int
+pstream_open(const char *name, struct pstream **pstreamp)
+{
+ size_t prefix_len;
+ size_t i;
+
+ check_stream_classes();
+
+ *pstreamp = NULL;
+ prefix_len = strcspn(name, ":");
+ if (prefix_len == strlen(name)) {
+ return EAFNOSUPPORT;
+ }
+ for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
+ struct pstream_class *class = pstream_classes[i];
+ if (strlen(class->name) == prefix_len
+ && !memcmp(class->name, name, prefix_len)) {
+ char *suffix_copy = xstrdup(name + prefix_len + 1);
+ int retval = class->listen(name, suffix_copy, pstreamp);
+ free(suffix_copy);
+ if (retval) {
+ *pstreamp = NULL;
+ }
+ return retval;
+ }
+ }
+ return EAFNOSUPPORT;
+}
+
+/* Returns the name that was used to open 'pstream'. The caller must not
+ * modify or free the name. */
+const char *
+pstream_get_name(const struct pstream *pstream)
+{
+ return pstream->name;
+}
+
+/* Closes 'pstream'. */
+void
+pstream_close(struct pstream *pstream)
+{
+ if (pstream != NULL) {
+ char *name = pstream->name;
+ (pstream->class->close)(pstream);
+ free(name);
+ }
+}
+
+/* Tries to accept a new connection on 'pstream'. If successful, stores the
+ * new connection in '*new_stream' and returns 0. Otherwise, returns a
+ * positive errno value.
+ *
+ * pstream_accept() will not block waiting for a connection. If no connection
+ * is ready to be accepted, it returns EAGAIN immediately. */
+int
+pstream_accept(struct pstream *pstream, struct stream **new_stream)
+{
+ int retval = (pstream->class->accept)(pstream, new_stream);
+ if (retval) {
+ *new_stream = NULL;
+ } else {
+ assert((*new_stream)->state != SCS_CONNECTING
+ || (*new_stream)->class->connect);
+ }
+ return retval;
+}
+
+void
+pstream_wait(struct pstream *pstream)
+{
+ (pstream->class->wait)(pstream);
+}
+\f
+/* Initializes 'stream' as a new stream named 'name', implemented via 'class'.
+ * The initial connection status, supplied as 'connect_status', is interpreted
+ * as follows:
+ *
+ * - 0: 'stream' is connected. Its 'send' and 'recv' functions may be
+ * called in the normal fashion.
+ *
+ * - EAGAIN: 'stream' is trying to complete a connection. Its 'connect'
+ * function should be called to complete the connection.
+ *
+ * - Other positive errno values indicate that the connection failed with
+ * the specified error.
+ *
+ * After calling this function, stream_close() must be used to destroy
+ * 'stream', otherwise resources will be leaked.
+ *
+ * The caller retains ownership of 'name'. */
+void
+stream_init(struct stream *stream, struct stream_class *class,
+ int connect_status, const char *name)
+{
+ stream->class = class;
+ stream->state = (connect_status == EAGAIN ? SCS_CONNECTING
+ : !connect_status ? SCS_CONNECTED
+ : SCS_DISCONNECTED);
+ stream->error = connect_status;
+ stream->name = xstrdup(name);
+}
+
+void
+stream_set_remote_ip(struct stream *stream, uint32_t ip)
+{
+ stream->remote_ip = ip;
+}
+
+void
+stream_set_remote_port(struct stream *stream, uint16_t port)
+{
+ stream->remote_port = port;
+}
+
+void
+stream_set_local_ip(struct stream *stream, uint32_t ip)
+{
+ stream->local_ip = ip;
+}
+
+void
+stream_set_local_port(struct stream *stream, uint16_t port)
+{
+ stream->local_port = port;
+}
+
+void
+pstream_init(struct pstream *pstream, struct pstream_class *class,
+ const char *name)
+{
+ pstream->class = class;
+ pstream->name = xstrdup(name);
+}
--- /dev/null
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_H
+#define STREAM_H 1
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "flow.h"
+
+struct pstream;
+struct stream;
+
+void stream_usage(const char *name, bool active, bool passive);
+
+/* Bidirectional byte streams. */
+int stream_open(const char *name, struct stream **);
+int stream_open_block(const char *name, struct stream **);
+void stream_close(struct stream *);
+const char *stream_get_name(const struct stream *);
+uint32_t stream_get_remote_ip(const struct stream *);
+uint16_t stream_get_remote_port(const struct stream *);
+uint32_t stream_get_local_ip(const struct stream *);
+uint16_t stream_get_local_port(const struct stream *);
+int stream_connect(struct stream *);
+int stream_recv(struct stream *, void *buffer, size_t n);
+int stream_send(struct stream *, const void *buffer, size_t n);
+
+enum stream_wait_type {
+ STREAM_CONNECT,
+ STREAM_RECV,
+ STREAM_SEND
+};
+void stream_wait(struct stream *, enum stream_wait_type);
+void stream_connect_wait(struct stream *);
+void stream_recv_wait(struct stream *);
+void stream_send_wait(struct stream *);
+
+/* Passive streams: listeners for incoming stream connections. */
+int pstream_open(const char *name, struct pstream **);
+const char *pstream_get_name(const struct pstream *);
+void pstream_close(struct pstream *);
+int pstream_accept(struct pstream *, struct stream **);
+void pstream_wait(struct pstream *);
+
+#endif /* stream.h */
VLOG_MODULE(rconn)
VLOG_MODULE(rtnetlink)
VLOG_MODULE(stp)
+VLOG_MODULE(stream_fd)
+VLOG_MODULE(stream_tcp)
+VLOG_MODULE(stream_unix)
+VLOG_MODULE(stream)
VLOG_MODULE(stats)
VLOG_MODULE(status)
VLOG_MODULE(svec)