stream: New library for bidirectional streams (e.g. TCP, SSL, Unix sockets).
authorBen Pfaff <blp@nicira.com>
Wed, 4 Nov 2009 23:02:32 +0000 (15:02 -0800)
committerBen Pfaff <blp@nicira.com>
Wed, 4 Nov 2009 23:24:40 +0000 (15:24 -0800)
This code is heavily based on the vconn code.  Eventually we should make
the stream-based vconns (currently that's all of them) a wrapper around
streams, but I haven't done that yet.

SSL is not implemented yet.

lib/automake.mk
lib/stream-fd.c [new file with mode: 0644]
lib/stream-fd.h [new file with mode: 0644]
lib/stream-provider.h [new file with mode: 0644]
lib/stream-tcp.c [new file with mode: 0644]
lib/stream-unix.c [new file with mode: 0644]
lib/stream.c [new file with mode: 0644]
lib/stream.h [new file with mode: 0644]
lib/vlog-modules.def

index 1d02dfd4948d4d34f78aaa673112c8bddc52da44..fd791c2815bb5ecd9a9c78a3b6b91ecf3de16cde 100644 (file)
@@ -102,6 +102,13 @@ lib_libopenvswitch_a_SOURCES = \
        lib/socket-util.h \
        lib/stp.c \
        lib/stp.h \
+       lib/stream-fd.c \
+       lib/stream-fd.h \
+       lib/stream-provider.h \
+       lib/stream-tcp.c \
+       lib/stream-unix.c \
+       lib/stream.c \
+       lib/stream.h \
        lib/svec.c \
        lib/svec.h \
        lib/tag.c \
@@ -198,6 +205,7 @@ COVERAGE_FILES = \
        lib/process.c \
        lib/rconn.c \
        lib/rtnetlink.c \
+       lib/stream.c \
        lib/timeval.c \
        lib/unixctl.c \
        lib/util.c \
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
new file mode 100644 (file)
index 0000000..46aa8e7
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-fd.h"
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "fatal-signal.h"
+#include "leak-checker.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_fd
+
+/* Active file descriptor stream. */
+
+struct stream_fd
+{
+    struct stream stream;
+    int fd;
+    char *unlink_path;
+};
+
+static struct stream_class stream_fd_class;
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
+
+static void maybe_unlink_and_free(char *path);
+
+/* Creates a new stream named 'name' that will send and receive data on 'fd'
+ * and stores a pointer to the stream in '*streamp'.  Initial connection status
+ * 'connect_status' is interpreted as described for stream_init().
+ *
+ * When '*streamp' is closed, then 'unlink_path' (if nonnull) will be passed to
+ * fatal_signal_unlink_file_now() and then freed with free().
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  (The current
+ * implementation never fails.) */
+int
+new_fd_stream(const char *name, int fd, int connect_status,
+              char *unlink_path, struct stream **streamp)
+{
+    struct stream_fd *s;
+
+    s = xmalloc(sizeof *s);
+    stream_init(&s->stream, &stream_fd_class, connect_status, name);
+    s->fd = fd;
+    s->unlink_path = unlink_path;
+    *streamp = &s->stream;
+    return 0;
+}
+
+static struct stream_fd *
+stream_fd_cast(struct stream *stream)
+{
+    stream_assert_class(stream, &stream_fd_class);
+    return CONTAINER_OF(stream, struct stream_fd, stream);
+}
+
+static void
+fd_close(struct stream *stream)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    close(s->fd);
+    maybe_unlink_and_free(s->unlink_path);
+    free(s);
+}
+
+static int
+fd_connect(struct stream *stream)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    return check_connection_completion(s->fd);
+}
+
+static ssize_t
+fd_recv(struct stream *stream, void *buffer, size_t n)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    ssize_t retval = read(s->fd, buffer, n);
+    return retval >= 0 ? retval : -errno;
+}
+
+static ssize_t
+fd_send(struct stream *stream, const void *buffer, size_t n)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    ssize_t retval = write(s->fd, buffer, n);
+    return (retval > 0 ? retval
+            : retval == 0 ? -EAGAIN
+            : -errno);
+}
+
+static void
+fd_wait(struct stream *stream, enum stream_wait_type wait)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    switch (wait) {
+    case STREAM_CONNECT:
+    case STREAM_SEND:
+        poll_fd_wait(s->fd, POLLOUT);
+        break;
+
+    case STREAM_RECV:
+        poll_fd_wait(s->fd, POLLIN);
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+}
+
+static struct stream_class stream_fd_class = {
+    "fd",                       /* name */
+    NULL,                       /* open */
+    fd_close,                   /* close */
+    fd_connect,                 /* connect */
+    fd_recv,                    /* recv */
+    fd_send,                    /* send */
+    fd_wait,                    /* wait */
+};
+\f
+/* Passive file descriptor stream. */
+
+struct fd_pstream
+{
+    struct pstream pstream;
+    int fd;
+    int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
+                     struct stream **);
+    char *unlink_path;
+};
+
+static struct pstream_class fd_pstream_class;
+
+static struct fd_pstream *
+fd_pstream_cast(struct pstream *pstream)
+{
+    pstream_assert_class(pstream, &fd_pstream_class);
+    return CONTAINER_OF(pstream, struct fd_pstream, pstream);
+}
+
+/* Creates a new pstream named 'name' that will accept new socket connections
+ * on 'fd' and stores a pointer to the stream in '*pstreamp'.
+ *
+ * When a connection has been accepted, 'accept_cb' will be called with the new
+ * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
+ * accept_cb must return 0 if the connection is successful, in which case it
+ * must initialize '*streamp' to the new stream, or a positive errno value on
+ * error.  In either case accept_cb takes ownership of the 'fd' passed in.
+ *
+ * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed
+ * to fatal_signal_unlink_file_now() and freed with free().
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  (The current
+ * implementation never fails.) */
+int
+new_fd_pstream(const char *name, int fd,
+               int (*accept_cb)(int fd, const struct sockaddr *sa,
+                                size_t sa_len, struct stream **streamp),
+               char *unlink_path, struct pstream **pstreamp)
+{
+    struct fd_pstream *ps = xmalloc(sizeof *ps);
+    pstream_init(&ps->pstream, &fd_pstream_class, name);
+    ps->fd = fd;
+    ps->accept_cb = accept_cb;
+    ps->unlink_path = unlink_path;
+    *pstreamp = &ps->pstream;
+    return 0;
+}
+
+static void
+pfd_close(struct pstream *pstream)
+{
+    struct fd_pstream *ps = fd_pstream_cast(pstream);
+    close(ps->fd);
+    maybe_unlink_and_free(ps->unlink_path);
+    free(ps);
+}
+
+static int
+pfd_accept(struct pstream *pstream, struct stream **new_streamp)
+{
+    struct fd_pstream *ps = fd_pstream_cast(pstream);
+    struct sockaddr_storage ss;
+    socklen_t ss_len = sizeof ss;
+    int new_fd;
+    int retval;
+
+    new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
+    if (new_fd < 0) {
+        int retval = errno;
+        if (retval != EAGAIN) {
+            VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
+        }
+        return retval;
+    }
+
+    retval = set_nonblocking(new_fd);
+    if (retval) {
+        close(new_fd);
+        return retval;
+    }
+
+    return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
+                         new_streamp);
+}
+
+static void
+pfd_wait(struct pstream *pstream)
+{
+    struct fd_pstream *ps = fd_pstream_cast(pstream);
+    poll_fd_wait(ps->fd, POLLIN);
+}
+
+static struct pstream_class fd_pstream_class = {
+    "pstream",
+    NULL,
+    pfd_close,
+    pfd_accept,
+    pfd_wait
+};
+\f
+/* Helper functions. */
+static void
+maybe_unlink_and_free(char *path)
+{
+    if (path) {
+        fatal_signal_unlink_file_now(path);
+        free(path);
+    }
+}
diff --git a/lib/stream-fd.h b/lib/stream-fd.h
new file mode 100644 (file)
index 0000000..d2a34eb
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_FD_H
+#define STREAM_FD_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+struct stream;
+struct pstream;
+struct sockaddr;
+
+int new_fd_stream(const char *name, int fd, int connect_status,
+                      char *unlink_path, struct stream **streamp);
+int new_fd_pstream(const char *name, int fd,
+                   int (*accept_cb)(int fd, const struct sockaddr *,
+                                    size_t sa_len, struct stream **),
+                   char *unlink_path,
+                   struct pstream **pstreamp);
+
+#endif /* stream-fd.h */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
new file mode 100644 (file)
index 0000000..6beaab7
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_PROVIDER_H
+#define STREAM_PROVIDER_H 1
+
+#include <assert.h>
+#include <sys/types.h>
+#include "stream.h"
+
+/* Active stream connection. */
+
+/* Active stream connection.
+ *
+ * This structure should be treated as opaque by implementation. */
+struct stream {
+    struct stream_class *class;
+    int state;
+    int error;
+    uint32_t remote_ip;
+    uint16_t remote_port;
+    uint32_t local_ip;
+    uint16_t local_port;
+    char *name;
+};
+
+void stream_init(struct stream *, struct stream_class *, int connect_status,
+                 const char *name);
+void stream_set_remote_ip(struct stream *, uint32_t remote_ip);
+void stream_set_remote_port(struct stream *, uint16_t remote_port);
+void stream_set_local_ip(struct stream *, uint32_t local_ip);
+void stream_set_local_port(struct stream *, uint16_t local_port);
+static inline void stream_assert_class(const struct stream *stream,
+                                       const struct stream_class *class)
+{
+    assert(stream->class == class);
+}
+
+struct stream_class {
+    /* Prefix for connection names, e.g. "tcp", "ssl", "unix". */
+    const char *name;
+
+    /* Attempts to connect to a peer.  'name' is the full connection name
+     * provided by the user, e.g. "tcp:1.2.3.4".  This name is useful for error
+     * messages but must not be modified.
+     *
+     * 'suffix' is a copy of 'name' following the colon and may be modified.
+     *
+     * Returns 0 if successful, otherwise a positive errno value.  If
+     * successful, stores a pointer to the new connection in '*streamp'.
+     *
+     * The open function must not block waiting for a connection to complete.
+     * If the connection cannot be completed immediately, it should return
+     * EAGAIN (not EINPROGRESS, as returned by the connect system call) and
+     * continue the connection in the background. */
+    int (*open)(const char *name, char *suffix, struct stream **streamp);
+
+    /* Closes 'stream' and frees associated memory. */
+    void (*close)(struct stream *stream);
+
+    /* Tries to complete the connection on 'stream'.  If 'stream''s connection
+     * is complete, returns 0 if the connection was successful or a positive
+     * errno value if it failed.  If the connection is still in progress,
+     * returns EAGAIN.
+     *
+     * The connect function must not block waiting for the connection to
+     * complete; instead, it should return EAGAIN immediately. */
+    int (*connect)(struct stream *stream);
+
+    /* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and
+     * returns:
+     *
+     *     - If successful, the number of bytes received (between 1 and 'n').
+     *
+     *     - On error, a negative errno value.
+     *
+     *     - 0, if the connection has been closed in the normal fashion.
+     *
+     * The recv function will not be passed a zero 'n'.
+     *
+     * The recv function must not block waiting for data to arrive.  If no data
+     * have been received, it should return -EAGAIN immediately. */
+    ssize_t (*recv)(struct stream *stream, void *buffer, size_t n);
+
+    /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
+     *
+     *     - If successful, the number of bytes sent (between 1 and 'n').
+     *
+     *     - On error, a negative errno value.
+     *
+     *     - Never returns 0.
+     *
+     * The send function will not be passed a zero 'n'.
+     *
+     * The send function must not block.  If no bytes can be immediately
+     * accepted for transmission, it should return -EAGAIN immediately. */
+    ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
+
+    /* Arranges for the poll loop to wake up when 'stream' is ready to take an
+     * action of the given 'type'. */
+    void (*wait)(struct stream *stream, enum stream_wait_type type);
+};
+\f
+/* Passive listener for incoming stream connections.
+ *
+ * This structure should be treated as opaque by stream implementations. */
+struct pstream {
+    struct pstream_class *class;
+    char *name;
+};
+
+void pstream_init(struct pstream *, struct pstream_class *, const char *name);
+static inline void pstream_assert_class(const struct pstream *pstream,
+                                        const struct pstream_class *class)
+{
+    assert(pstream->class == class);
+}
+
+struct pstream_class {
+    /* Prefix for connection names, e.g. "ptcp", "pssl", "punix". */
+    const char *name;
+
+    /* Attempts to start listening for stream connections.  'name' is the full
+     * connection name provided by the user, e.g. "ptcp:1234".  This name is
+     * useful for error messages but must not be modified.
+     *
+     * 'suffix' is a copy of 'name' following the colon and may be modified.
+     *
+     * Returns 0 if successful, otherwise a positive errno value.  If
+     * successful, stores a pointer to the new connection in '*pstreamp'.
+     *
+     * The listen function must not block.  If the connection cannot be
+     * completed immediately, it should return EAGAIN (not EINPROGRESS, as
+     * returned by the connect system call) and continue the connection in the
+     * background. */
+    int (*listen)(const char *name, char *suffix, struct pstream **pstreamp);
+
+    /* Closes 'pstream' and frees associated memory. */
+    void (*close)(struct pstream *pstream);
+
+    /* Tries to accept a new connection on 'pstream'.  If successful, stores
+     * the new connection in '*new_streamp' and returns 0.  Otherwise, returns
+     * a positive errno value.
+     *
+     * The accept function must not block waiting for a connection.  If no
+     * connection is ready to be accepted, it should return EAGAIN. */
+    int (*accept)(struct pstream *pstream, struct stream **new_streamp);
+
+    /* Arranges for the poll loop to wake up when a connection is ready to be
+     * accepted on 'pstream'. */
+    void (*wait)(struct pstream *pstream);
+};
+
+/* Active and passive stream classes. */
+extern struct stream_class tcp_stream_class;
+extern struct pstream_class ptcp_pstream_class;
+extern struct stream_class unix_stream_class;
+extern struct pstream_class punix_pstream_class;
+
+#endif /* stream-provider.h */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
new file mode 100644 (file)
index 0000000..ecd9686
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream.h"
+#include <errno.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "packets.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream-fd.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_tcp
+
+/* Active TCP. */
+
+static int
+new_tcp_stream(const char *name, int fd, int connect_status,
+              const struct sockaddr_in *remote, struct stream **streamp)
+{
+    struct sockaddr_in local;
+    socklen_t local_len = sizeof local;
+    int on = 1;
+    int retval;
+
+    /* Get the local IP and port information */
+    retval = getsockname(fd, (struct sockaddr *)&local, &local_len);
+    if (retval) {
+        memset(&local, 0, sizeof local);
+    }
+
+    retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
+    if (retval) {
+        VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
+        close(fd);
+        return errno;
+    }
+
+    retval = new_fd_stream(name, fd, connect_status, NULL, streamp);
+    if (!retval) {
+        struct stream *stream = *streamp;
+        stream_set_remote_ip(stream, remote->sin_addr.s_addr);
+        stream_set_remote_port(stream, remote->sin_port);
+        stream_set_local_ip(stream, local.sin_addr.s_addr);
+        stream_set_local_port(stream, local.sin_port);
+    }
+    return retval;
+}
+
+static int
+tcp_open(const char *name, char *suffix, struct stream **streamp)
+{
+    struct sockaddr_in sin;
+    int fd, error;
+
+    error = tcp_open_active(suffix, 0, &sin, &fd);
+    if (fd >= 0) {
+        return new_tcp_stream(name, fd, error, &sin, streamp);
+    } else {
+        VLOG_ERR("%s: connect: %s", name, strerror(error));
+        return error;
+    }
+}
+
+struct stream_class tcp_stream_class = {
+    "tcp",                      /* name */
+    tcp_open,                   /* open */
+    NULL,                       /* close */
+    NULL,                       /* connect */
+    NULL,                       /* recv */
+    NULL,                       /* send */
+    NULL,                       /* wait */
+};
+\f
+/* Passive TCP. */
+
+static int ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+                       struct stream **streamp);
+
+static int
+ptcp_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp)
+{
+    int fd;
+
+    fd = tcp_open_passive(suffix, 0);
+    if (fd < 0) {
+        return -fd;
+    } else {
+        return new_fd_pstream("ptcp", fd, ptcp_accept, NULL, pstreamp);
+    }
+}
+
+static int
+ptcp_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+            struct stream **streamp)
+{
+    const struct sockaddr_in *sin = (const struct sockaddr_in *) sa;
+    char name[128];
+
+    if (sa_len == sizeof(struct sockaddr_in) && sin->sin_family == AF_INET) {
+        sprintf(name, "tcp:"IP_FMT, IP_ARGS(&sin->sin_addr));
+        sprintf(strchr(name, '\0'), ":%"PRIu16, ntohs(sin->sin_port));
+    } else {
+        strcpy(name, "tcp");
+    }
+    return new_tcp_stream(name, fd, 0, sin, streamp);
+}
+
+struct pstream_class ptcp_pstream_class = {
+    "ptcp",
+    ptcp_open,
+    NULL,
+    NULL,
+    NULL
+};
+
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
new file mode 100644 (file)
index 0000000..a5dfd55
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream.h"
+#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <netdb.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "packets.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream-fd.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_stream_unix
+
+/* Active UNIX socket. */
+
+/* Number of unix sockets created so far, to ensure binding path uniqueness. */
+static int n_unix_sockets;
+
+static int
+unix_open(const char *name, char *suffix, struct stream **streamp)
+{
+    const char *connect_path = suffix;
+    char *bind_path;
+    int fd;
+
+    bind_path = xasprintf("/tmp/stream-unix.%ld.%d",
+                          (long int) getpid(), n_unix_sockets++);
+    fd = make_unix_socket(SOCK_STREAM, true, false, bind_path, connect_path);
+    if (fd < 0) {
+        VLOG_ERR("%s: connection to %s failed: %s",
+                 bind_path, connect_path, strerror(-fd));
+        free(bind_path);
+        return -fd;
+    }
+
+    return new_fd_stream(name, fd, check_connection_completion(fd),
+                         bind_path, streamp);
+}
+
+struct stream_class unix_stream_class = {
+    "unix",                     /* name */
+    unix_open,                  /* open */
+    NULL,                       /* close */
+    NULL,                       /* connect */
+    NULL,                       /* recv */
+    NULL,                       /* send */
+    NULL,                       /* wait */
+};
+\f
+/* Passive UNIX socket. */
+
+static int punix_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+                        struct stream **streamp);
+
+static int
+punix_open(const char *name UNUSED, char *suffix, struct pstream **pstreamp)
+{
+    int fd, error;
+
+    fd = make_unix_socket(SOCK_STREAM, true, true, suffix, NULL);
+    if (fd < 0) {
+        VLOG_ERR("%s: binding failed: %s", suffix, strerror(errno));
+        return errno;
+    }
+
+    error = set_nonblocking(fd);
+    if (error) {
+        close(fd);
+        return error;
+    }
+
+    if (listen(fd, 10) < 0) {
+        error = errno;
+        VLOG_ERR("%s: listen: %s", name, strerror(error));
+        close(fd);
+        return error;
+    }
+
+    return new_fd_pstream("punix", fd, punix_accept,
+                          xstrdup(suffix), pstreamp);
+}
+
+static int
+punix_accept(int fd, const struct sockaddr *sa, size_t sa_len,
+             struct stream **streamp)
+{
+    const struct sockaddr_un *sun = (const struct sockaddr_un *) sa;
+    int name_len = get_unix_name_len(sa_len);
+    char name[128];
+
+    if (name_len > 0) {
+        snprintf(name, sizeof name, "unix:%.*s", name_len, sun->sun_path);
+    } else {
+        strcpy(name, "unix");
+    }
+    return new_fd_stream(name, fd, 0, NULL, streamp);
+}
+
+struct pstream_class punix_pstream_class = {
+    "punix",
+    punix_open,
+    NULL,
+    NULL,
+    NULL
+};
+
diff --git a/lib/stream.c b/lib/stream.c
new file mode 100644 (file)
index 0000000..1a0ea7b
--- /dev/null
@@ -0,0 +1,490 @@
+/*
+ * Copyright (c) 2008, 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-provider.h"
+#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <netinet/in.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include "coverage.h"
+#include "dynamic-string.h"
+#include "flow.h"
+#include "ofp-print.h"
+#include "ofpbuf.h"
+#include "openflow/nicira-ext.h"
+#include "openflow/openflow.h"
+#include "packets.h"
+#include "poll-loop.h"
+#include "random.h"
+#include "util.h"
+
+#define THIS_MODULE VLM_stream
+#include "vlog.h"
+
+/* State of an active stream.*/
+enum stream_state {
+    SCS_CONNECTING,             /* Underlying stream is not connected. */
+    SCS_CONNECTED,              /* Connection established. */
+    SCS_DISCONNECTED            /* Connection failed or connection closed. */
+};
+
+static struct stream_class *stream_classes[] = {
+    &tcp_stream_class,
+    &unix_stream_class,
+};
+
+static struct pstream_class *pstream_classes[] = {
+    &ptcp_pstream_class,
+    &punix_pstream_class,
+};
+
+/* Check the validity of the stream class structures. */
+static void
+check_stream_classes(void)
+{
+#ifndef NDEBUG
+    size_t i;
+
+    for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
+        struct stream_class *class = stream_classes[i];
+        assert(class->name != NULL);
+        assert(class->open != NULL);
+        if (class->close || class->recv || class->send || class->wait) {
+            assert(class->close != NULL);
+            assert(class->recv != NULL);
+            assert(class->send != NULL);
+            assert(class->wait != NULL);
+        } else {
+            /* This class delegates to another one. */
+        }
+    }
+
+    for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
+        struct pstream_class *class = pstream_classes[i];
+        assert(class->name != NULL);
+        assert(class->listen != NULL);
+        if (class->close || class->accept || class->wait) {
+            assert(class->close != NULL);
+            assert(class->accept != NULL);
+            assert(class->wait != NULL);
+        } else {
+            /* This class delegates to another one. */
+        }
+    }
+#endif
+}
+
+/* Prints information on active (if 'active') and passive (if 'passive')
+ * connection methods supported by the stream. */
+void
+stream_usage(const char *name, bool active, bool passive)
+{
+    /* Really this should be implemented via callbacks into the stream
+     * providers, but that seems too heavy-weight to bother with at the
+     * moment. */
+
+    printf("\n");
+    if (active) {
+        printf("Active %s connection methods:\n", name);
+        printf("  tcp:IP:PORT             "
+               "PORT at remote IP\n");
+        printf("  unix:FILE               "
+               "Unix domain socket named FILE\n");
+    }
+
+    if (passive) {
+        printf("Passive %s connection methods:\n", name);
+        printf("  ptcp:PORT[:IP]          "
+               "listen to TCP PORT on IP\n");
+        printf("  punix:FILE              "
+               "listen on Unix domain socket FILE\n");
+    }
+}
+
+/* Attempts to connect a stream to a remote peer.  'name' is a connection name
+ * in the form "TYPE:ARGS", where TYPE is an active stream class's name and
+ * ARGS are stream class-specific.
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  If successful,
+ * stores a pointer to the new connection in '*streamp', otherwise a null
+ * pointer.  */
+int
+stream_open(const char *name, struct stream **streamp)
+{
+    size_t prefix_len;
+    size_t i;
+
+    COVERAGE_INC(stream_open);
+    check_stream_classes();
+
+    *streamp = NULL;
+    prefix_len = strcspn(name, ":");
+    if (prefix_len == strlen(name)) {
+        return EAFNOSUPPORT;
+    }
+    for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
+        struct stream_class *class = stream_classes[i];
+        if (strlen(class->name) == prefix_len
+            && !memcmp(class->name, name, prefix_len)) {
+            struct stream *stream;
+            char *suffix_copy = xstrdup(name + prefix_len + 1);
+            int retval = class->open(name, suffix_copy, &stream);
+            free(suffix_copy);
+            if (!retval) {
+                assert(stream->state != SCS_CONNECTING
+                       || stream->class->connect);
+                *streamp = stream;
+            }
+            return retval;
+        }
+    }
+    return EAFNOSUPPORT;
+}
+
+int
+stream_open_block(const char *name, struct stream **streamp)
+{
+    struct stream *stream;
+    int error;
+
+    error = stream_open(name, &stream);
+    while (error == EAGAIN) {
+        stream_connect_wait(stream);
+        poll_block();
+        error = stream_connect(stream);
+        assert(error != EINPROGRESS);
+    }
+    if (error) {
+        stream_close(stream);
+        *streamp = NULL;
+    } else {
+        *streamp = stream;
+    }
+    return error;
+}
+
+/* Closes 'stream'. */
+void
+stream_close(struct stream *stream)
+{
+    if (stream != NULL) {
+        char *name = stream->name;
+        (stream->class->close)(stream);
+        free(name);
+    }
+}
+
+/* Returns the name of 'stream', that is, the string passed to
+ * stream_open(). */
+const char *
+stream_get_name(const struct stream *stream)
+{
+    return stream ? stream->name : "(null)";
+}
+
+/* Returns the IP address of the peer, or 0 if the peer is not connected over
+ * an IP-based protocol or if its IP address is not yet known. */
+uint32_t
+stream_get_remote_ip(const struct stream *stream)
+{
+    return stream->remote_ip;
+}
+
+/* Returns the transport port of the peer, or 0 if the connection does not
+ * contain a port or if the port is not yet known. */
+uint16_t
+stream_get_remote_port(const struct stream *stream)
+{
+    return stream->remote_port;
+}
+
+/* Returns the IP address used to connect to the peer, or 0 if the connection
+ * is not an IP-based protocol or if its IP address is not yet known. */
+uint32_t
+stream_get_local_ip(const struct stream *stream)
+{
+    return stream->local_ip;
+}
+
+/* Returns the transport port used to connect to the peer, or 0 if the
+ * connection does not contain a port or if the port is not yet known. */
+uint16_t
+stream_get_local_port(const struct stream *stream)
+{
+    return stream->local_port;
+}
+
+static void
+scs_connecting(struct stream *stream)
+{
+    int retval = (stream->class->connect)(stream);
+    assert(retval != EINPROGRESS);
+    if (!retval) {
+        stream->state = SCS_CONNECTED;
+    } else if (retval != EAGAIN) {
+        stream->state = SCS_DISCONNECTED;
+        stream->error = retval;
+    }
+}
+
+/* Tries to complete the connection on 'stream', which must be an active
+ * stream.  If 'stream''s connection is complete, returns 0 if the connection
+ * was successful or a positive errno value if it failed.  If the
+ * connection is still in progress, returns EAGAIN. */
+int
+stream_connect(struct stream *stream)
+{
+    enum stream_state last_state;
+
+    do {
+        last_state = stream->state;
+        switch (stream->state) {
+        case SCS_CONNECTING:
+            scs_connecting(stream);
+            break;
+
+        case SCS_CONNECTED:
+            return 0;
+
+        case SCS_DISCONNECTED:
+            return stream->error;
+
+        default:
+            NOT_REACHED();
+        }
+    } while (stream->state != last_state);
+
+    return EAGAIN;
+}
+
+/* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns:
+ *
+ *     - If successful, the number of bytes received (between 1 and 'n').
+ *
+ *     - On error, a negative errno value.
+ *
+ *     - 0, if the connection has been closed in the normal fashion, or if 'n'
+ *       is zero.
+ *
+ * The recv function will not block waiting for a packet to arrive.  If no
+ * data have been received, it returns -EAGAIN immediately. */
+int
+stream_recv(struct stream *stream, void *buffer, size_t n)
+{
+    int retval = stream_connect(stream);
+    return (retval ? -retval
+            : n == 0 ? 0
+            : (stream->class->recv)(stream, buffer, n));
+}
+
+/* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
+ *
+ *     - If successful, the number of bytes sent (between 1 and 'n').  0 is
+ *       only a valid return value if 'n' is 0.
+ *
+ *     - On error, a negative errno value.
+ *
+ * The send function will not block.  If no bytes can be immediately accepted
+ * for transmission, it returns -EAGAIN immediately. */
+int
+stream_send(struct stream *stream, const void *buffer, size_t n)
+{
+    int retval = stream_connect(stream);
+    return (retval ? -retval
+            : n == 0 ? 0
+            : (stream->class->send)(stream, buffer, n));
+}
+
+void
+stream_wait(struct stream *stream, enum stream_wait_type wait)
+{
+    assert(wait == STREAM_CONNECT || wait == STREAM_RECV
+           || wait == STREAM_SEND);
+
+    switch (stream->state) {
+    case SCS_CONNECTING:
+        wait = STREAM_CONNECT;
+        break;
+
+    case SCS_DISCONNECTED:
+        poll_immediate_wake();
+        return;
+    }
+    (stream->class->wait)(stream, wait);
+}
+
+void
+stream_connect_wait(struct stream *stream)
+{
+    stream_wait(stream, STREAM_CONNECT);
+}
+
+void
+stream_recv_wait(struct stream *stream)
+{
+    stream_wait(stream, STREAM_RECV);
+}
+
+void
+stream_send_wait(struct stream *stream)
+{
+    stream_wait(stream, STREAM_SEND);
+}
+
+/* Attempts to start listening for remote stream connections.  'name' is a
+ * connection name in the form "TYPE:ARGS", where TYPE is an passive stream
+ * class's name and ARGS are stream class-specific.
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  If successful,
+ * stores a pointer to the new connection in '*pstreamp', otherwise a null
+ * pointer.  */
+int
+pstream_open(const char *name, struct pstream **pstreamp)
+{
+    size_t prefix_len;
+    size_t i;
+
+    check_stream_classes();
+
+    *pstreamp = NULL;
+    prefix_len = strcspn(name, ":");
+    if (prefix_len == strlen(name)) {
+        return EAFNOSUPPORT;
+    }
+    for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
+        struct pstream_class *class = pstream_classes[i];
+        if (strlen(class->name) == prefix_len
+            && !memcmp(class->name, name, prefix_len)) {
+            char *suffix_copy = xstrdup(name + prefix_len + 1);
+            int retval = class->listen(name, suffix_copy, pstreamp);
+            free(suffix_copy);
+            if (retval) {
+                *pstreamp = NULL;
+            }
+            return retval;
+        }
+    }
+    return EAFNOSUPPORT;
+}
+
+/* Returns the name that was used to open 'pstream'.  The caller must not
+ * modify or free the name. */
+const char *
+pstream_get_name(const struct pstream *pstream)
+{
+    return pstream->name;
+}
+
+/* Closes 'pstream'. */
+void
+pstream_close(struct pstream *pstream)
+{
+    if (pstream != NULL) {
+        char *name = pstream->name;
+        (pstream->class->close)(pstream);
+        free(name);
+    }
+}
+
+/* Tries to accept a new connection on 'pstream'.  If successful, stores the
+ * new connection in '*new_stream' and returns 0.  Otherwise, returns a
+ * positive errno value.
+ *
+ * pstream_accept() will not block waiting for a connection.  If no connection
+ * is ready to be accepted, it returns EAGAIN immediately. */
+int
+pstream_accept(struct pstream *pstream, struct stream **new_stream)
+{
+    int retval = (pstream->class->accept)(pstream, new_stream);
+    if (retval) {
+        *new_stream = NULL;
+    } else {
+        assert((*new_stream)->state != SCS_CONNECTING
+               || (*new_stream)->class->connect);
+    }
+    return retval;
+}
+
+void
+pstream_wait(struct pstream *pstream)
+{
+    (pstream->class->wait)(pstream);
+}
+\f
+/* Initializes 'stream' as a new stream named 'name', implemented via 'class'.
+ * The initial connection status, supplied as 'connect_status', is interpreted
+ * as follows:
+ *
+ *      - 0: 'stream' is connected.  Its 'send' and 'recv' functions may be
+ *        called in the normal fashion.
+ *
+ *      - EAGAIN: 'stream' is trying to complete a connection.  Its 'connect'
+ *        function should be called to complete the connection.
+ *
+ *      - Other positive errno values indicate that the connection failed with
+ *        the specified error.
+ *
+ * After calling this function, stream_close() must be used to destroy
+ * 'stream', otherwise resources will be leaked.
+ *
+ * The caller retains ownership of 'name'. */
+void
+stream_init(struct stream *stream, struct stream_class *class,
+            int connect_status, const char *name)
+{
+    stream->class = class;
+    stream->state = (connect_status == EAGAIN ? SCS_CONNECTING
+                    : !connect_status ? SCS_CONNECTED
+                    : SCS_DISCONNECTED);
+    stream->error = connect_status;
+    stream->name = xstrdup(name);
+}
+
+void
+stream_set_remote_ip(struct stream *stream, uint32_t ip)
+{
+    stream->remote_ip = ip;
+}
+
+void
+stream_set_remote_port(struct stream *stream, uint16_t port)
+{
+    stream->remote_port = port;
+}
+
+void
+stream_set_local_ip(struct stream *stream, uint32_t ip)
+{
+    stream->local_ip = ip;
+}
+
+void
+stream_set_local_port(struct stream *stream, uint16_t port)
+{
+    stream->local_port = port;
+}
+
+void
+pstream_init(struct pstream *pstream, struct pstream_class *class,
+            const char *name)
+{
+    pstream->class = class;
+    pstream->name = xstrdup(name);
+}
diff --git a/lib/stream.h b/lib/stream.h
new file mode 100644 (file)
index 0000000..76c9e6c
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAM_H
+#define STREAM_H 1
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "flow.h"
+
+struct pstream;
+struct stream;
+
+void stream_usage(const char *name, bool active, bool passive);
+
+/* Bidirectional byte streams. */
+int stream_open(const char *name, struct stream **);
+int stream_open_block(const char *name, struct stream **);
+void stream_close(struct stream *);
+const char *stream_get_name(const struct stream *);
+uint32_t stream_get_remote_ip(const struct stream *);
+uint16_t stream_get_remote_port(const struct stream *);
+uint32_t stream_get_local_ip(const struct stream *);
+uint16_t stream_get_local_port(const struct stream *);
+int stream_connect(struct stream *);
+int stream_recv(struct stream *, void *buffer, size_t n);
+int stream_send(struct stream *, const void *buffer, size_t n);
+
+enum stream_wait_type {
+    STREAM_CONNECT,
+    STREAM_RECV,
+    STREAM_SEND
+};
+void stream_wait(struct stream *, enum stream_wait_type);
+void stream_connect_wait(struct stream *);
+void stream_recv_wait(struct stream *);
+void stream_send_wait(struct stream *);
+
+/* Passive streams: listeners for incoming stream connections. */
+int pstream_open(const char *name, struct pstream **);
+const char *pstream_get_name(const struct pstream *);
+void pstream_close(struct pstream *);
+int pstream_accept(struct pstream *, struct stream **);
+void pstream_wait(struct pstream *);
+
+#endif /* stream.h */
index 59ab045c76b11ea852ea161eb8efe346ddeb4ac5..da345df428fb864f1fde3872dd103497fd121b64 100644 (file)
@@ -62,6 +62,10 @@ VLOG_MODULE(process)
 VLOG_MODULE(rconn)
 VLOG_MODULE(rtnetlink)
 VLOG_MODULE(stp)
+VLOG_MODULE(stream_fd)
+VLOG_MODULE(stream_tcp)
+VLOG_MODULE(stream_unix)
+VLOG_MODULE(stream)
 VLOG_MODULE(stats)
 VLOG_MODULE(status)
 VLOG_MODULE(svec)