--- /dev/null
+/* Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "worker.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include "command-line.h"
+#include "daemon.h"
+#include "ofpbuf.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(worker);
+
+/* Header for an RPC request. */
+struct worker_request {
+ size_t request_len; /* Length of the payload in bytes. */
+ worker_request_func *request_cb; /* Function to call in worker process. */
+ worker_reply_func *reply_cb; /* Function to call in main process. */
+ void *reply_aux; /* Auxiliary data for 'reply_cb'. */
+};
+
+/* Header for an RPC reply. */
+struct worker_reply {
+ size_t reply_len; /* Length of the payload in bytes. */
+ worker_reply_func *reply_cb; /* Function to call in main process. */
+ void *reply_aux; /* Auxiliary data for 'reply_cb'. */
+};
+
+/* Receive buffer for a RPC request or reply. */
+struct rxbuf {
+ /* Header. */
+ struct ofpbuf header; /* Header data. */
+ int fds[SOUTIL_MAX_FDS]; /* File descriptors. */
+ size_t n_fds;
+
+ /* Payload. */
+ struct ofpbuf payload; /* Payload data. */
+};
+
+static int client_sock = -1;
+static struct rxbuf client_rx;
+
+static void rxbuf_init(struct rxbuf *);
+static void rxbuf_clear(struct rxbuf *);
+static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
+
+static struct iovec *prefix_iov(void *data, size_t len,
+ const struct iovec *iovs, size_t n_iovs);
+
+static void worker_broke(void);
+
+static void worker_main(int fd) NO_RETURN;
+
+/* Starts a worker process as a subprocess of the current process. Currently
+ * only a single worker process is supported, so this function may only be
+ * called once.
+ *
+ * The client should call worker_run() and worker_wait() from its main loop.
+ *
+ * Call this function between daemonize_start() and daemonize_complete(). */
+void
+worker_start(void)
+{
+ int work_fds[2];
+
+ assert(client_sock < 0);
+
+ /* Create non-blocking socket pair. */
+ xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
+ xset_nonblocking(work_fds[0]);
+ xset_nonblocking(work_fds[1]);
+
+ if (!fork_and_clean_up()) {
+ /* In child (worker) process. */
+ daemonize_post_detach();
+ close(work_fds[0]);
+ worker_main(work_fds[1]);
+ NOT_REACHED();
+ }
+
+ /* In parent (main) process. */
+ close(work_fds[1]);
+ client_sock = work_fds[0];
+ rxbuf_init(&client_rx);
+}
+
+/* Returns true if this process has started a worker and the worker is not
+ * known to have malfunctioned. */
+bool
+worker_is_running(void)
+{
+ return client_sock >= 0;
+}
+
+/* If a worker process was started, processes RPC replies from it, calling the
+ * registered 'reply_cb' callbacks.
+ *
+ * If the worker process died or malfunctioned, aborts. */
+void
+worker_run(void)
+{
+ if (worker_is_running()) {
+ int error;
+
+ error = rxbuf_run(&client_rx, client_sock,
+ sizeof(struct worker_reply));
+ if (!error) {
+ struct worker_reply *reply = client_rx.header.data;
+ reply->reply_cb(&client_rx.payload, client_rx.fds,
+ client_rx.n_fds, reply->reply_aux);
+ rxbuf_clear(&client_rx);
+ } else if (error != EAGAIN) {
+ worker_broke();
+ VLOG_ABORT("receive from worker failed (%s)",
+ ovs_retval_to_string(error));
+ }
+ }
+}
+
+/* Causes the poll loop to wake up if we need to process RPC replies. */
+void
+worker_wait(void)
+{
+ if (worker_is_running()) {
+ poll_fd_wait(client_sock, POLLIN);
+ }
+}
+\f
+/* Interface for main process to interact with the worker. */
+
+/* Sends an RPC request to the worker process. The worker process will call
+ * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
+ * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
+ * in 'fds'.
+ *
+ * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
+ * or worker_reply_iovec() with a reply. The main process will later call
+ * 'reply_cb' with the reply data (if any) and file descriptors (if any).
+ *
+ * 'request_cb' receives copies (as if by dup()) of the file descriptors in
+ * fds[]. 'request_cb' takes ownership of these copies, and the caller of
+ * worker_request() retains its ownership of the originals.
+ *
+ * This function may block until the RPC request has been sent (if the socket
+ * buffer fills up) but it does not wait for the reply (if any). If this
+ * function blocks, it may invoke reply callbacks for previous requests.
+ *
+ * The worker process executes RPC requests in strict order of submission and
+ * runs each request to completion before beginning the next request. The main
+ * process invokes reply callbacks in strict order of request submission. */
+void
+worker_request(const void *data, size_t size,
+ const int fds[], size_t n_fds,
+ worker_request_func *request_cb,
+ worker_reply_func *reply_cb, void *aux)
+{
+ if (size > 0) {
+ struct iovec iov;
+
+ iov.iov_base = (void *) data;
+ iov.iov_len = size;
+ worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
+ } else {
+ worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
+ }
+}
+
+static int
+worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
+ const int fds[], size_t n_fds)
+{
+ size_t sent = 0;
+
+ for (;;) {
+ int error;
+
+ /* Try to send the rest of the request. */
+ error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
+ fds, n_fds, sent, &sent);
+ if (error != EAGAIN) {
+ return error;
+ }
+
+ /* Process replies to avoid deadlock. */
+ worker_run();
+
+ poll_fd_wait(client_sock, POLLIN | POLLOUT);
+ poll_block();
+ }
+}
+
+/* Same as worker_request() except that the data to send is specified as an
+ * array of iovecs. */
+void
+worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
+ const int fds[], size_t n_fds,
+ worker_request_func *request_cb,
+ worker_reply_func *reply_cb, void *aux)
+{
+ struct worker_request rq;
+ struct iovec *all_iovs;
+ int error;
+
+ assert(worker_is_running());
+
+ rq.request_len = iovec_len(iovs, n_iovs);
+ rq.request_cb = request_cb;
+ rq.reply_cb = reply_cb;
+ rq.reply_aux = aux;
+
+ all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
+ error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
+ if (error) {
+ worker_broke();
+ VLOG_ABORT("send failed (%s)", strerror(error));
+ }
+ free(all_iovs);
+}
+
+/* Closes the client socket, if any, so that worker_is_running() will return
+ * false.
+ *
+ * The client does this just before aborting if the worker process dies or
+ * malfunctions, to prevent the logging subsystem from trying to use the
+ * worker to log the failure. */
+static void
+worker_broke(void)
+{
+ if (client_sock >= 0) {
+ close(client_sock);
+ client_sock = -1;
+ }
+}
+\f
+/* Interfaces for RPC implementations (running in the worker process). */
+
+static int server_sock = -1;
+static bool expect_reply;
+static struct worker_request request;
+
+/* When a call to worker_request() or worker_request_iovec() provides a
+ * 'reply_cb' callback, the 'request_cb' implementation must call this function
+ * to send its reply. The main process will call 'reply_cb' passing the
+ * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
+ * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
+ *
+ * If a call to worker_request() or worker_request_iovec() provides no
+ * 'reply_cb' callback, the 'request_cb' implementation must not call this
+ * function.
+ *
+ * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
+ * fds[]. 'reply_cb' takes ownership of these copies, and the caller of
+ * worker_reply() retains its ownership of the originals.
+ *
+ * This function blocks until the RPC reply has been sent (if the socket buffer
+ * fills up) but it does not wait for the main process to receive or to process
+ * the reply. */
+void
+worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
+{
+ if (size > 0) {
+ struct iovec iov;
+
+ iov.iov_base = (void *) data;
+ iov.iov_len = size;
+ worker_reply_iovec(&iov, 1, fds, n_fds);
+ } else {
+ worker_reply_iovec(NULL, 0, fds, n_fds);
+ }
+}
+
+/* Same as worker_reply() except that the data to send is specified as an array
+ * of iovecs. */
+void
+worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
+ const int fds[], size_t n_fds)
+{
+ struct worker_reply reply;
+ struct iovec *all_iovs;
+ int error;
+
+ assert(expect_reply);
+ expect_reply = false;
+
+ reply.reply_len = iovec_len(iovs, n_iovs);
+ reply.reply_cb = request.reply_cb;
+ reply.reply_aux = request.reply_aux;
+
+ all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
+
+ error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
+ fds, n_fds);
+ if (error == EPIPE) {
+ /* Parent probably died. Continue processing any RPCs still buffered,
+ * to avoid missing log messages. */
+ VLOG_INFO("send failed (%s)", strerror(error));
+ } else if (error) {
+ VLOG_ABORT("send failed (%s)", strerror(error));
+ }
+
+ free(all_iovs);
+}
+
+static void
+worker_main(int fd)
+{
+ struct rxbuf rx;
+
+ server_sock = fd;
+
+ subprogram_name = "worker";
+ proctitle_set("%s: worker process for pid %lu",
+ program_name, (unsigned long int) getppid());
+ VLOG_INFO("worker process started");
+
+ rxbuf_init(&rx);
+ for (;;) {
+ int error;
+
+ error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
+ if (!error) {
+ request = *(struct worker_request *) rx.header.data;
+
+ expect_reply = request.reply_cb != NULL;
+ request.request_cb(&rx.payload, rx.fds, rx.n_fds);
+ assert(!expect_reply);
+
+ rxbuf_clear(&rx);
+ } else if (error == EOF && !rx.header.size) {
+ /* Main process closed the IPC socket. Exit cleanly. */
+ break;
+ } else if (error != EAGAIN) {
+ VLOG_ABORT("RPC receive failed (%s)", strerror(error));
+ }
+
+ poll_fd_wait(server_sock, POLLIN);
+ poll_block();
+ }
+
+ VLOG_INFO("worker process exiting");
+ exit(0);
+}
+\f
+static void
+rxbuf_init(struct rxbuf *rx)
+{
+ ofpbuf_init(&rx->header, 0);
+ rx->n_fds = 0;
+ ofpbuf_init(&rx->payload, 0);
+}
+
+static void
+rxbuf_clear(struct rxbuf *rx)
+{
+ ofpbuf_clear(&rx->header);
+ rx->n_fds = 0;
+ ofpbuf_clear(&rx->payload);
+}
+
+static int
+rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
+{
+ for (;;) {
+ if (!rx->header.size) {
+ int retval;
+
+ ofpbuf_clear(&rx->header);
+ ofpbuf_prealloc_tailroom(&rx->header, header_len);
+
+ retval = recv_data_and_fds(sock, rx->header.data, header_len,
+ rx->fds, &rx->n_fds);
+ if (retval <= 0) {
+ return retval ? -retval : EOF;
+ }
+ rx->header.size += retval;
+ } else if (rx->header.size < header_len) {
+ size_t bytes_read;
+ int error;
+
+ error = read_fully(sock, ofpbuf_tail(&rx->header),
+ header_len - rx->header.size, &bytes_read);
+ rx->header.size += bytes_read;
+ if (error) {
+ return error;
+ }
+ } else {
+ size_t payload_len = *(size_t *) rx->header.data;
+
+ if (rx->payload.size < payload_len) {
+ size_t left = payload_len - rx->payload.size;
+ size_t bytes_read;
+ int error;
+
+ ofpbuf_prealloc_tailroom(&rx->payload, left);
+ error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
+ &bytes_read);
+ rx->payload.size += bytes_read;
+ if (error) {
+ return error;
+ }
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ return EAGAIN;
+}
+
+static struct iovec *
+prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
+{
+ struct iovec *dst;
+
+ dst = xmalloc((n_iovs + 1) * sizeof *dst);
+ dst[0].iov_base = data;
+ dst[0].iov_len = len;
+ memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
+
+ return dst;
+}
--- /dev/null
+/* Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef WORKER_H
+#define WORKER_H 1
+
+/* Worker processes.
+ *
+ * Thes functions allow an OVS daemon to fork off a "worker process" to do
+ * tasks that may unavoidably block in the kernel. The worker executes remote
+ * procedure calls on behalf of the main process.
+ *
+ * Tasks that may unavoidably block in the kernel include writes to regular
+ * files, sends to Generic Netlink sockets (which as of this writing use a
+ * global lock), and other unusual operations.
+ *
+ * The worker functions *will* block if the finite buffer between a main
+ * process and its worker process fills up.
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include "compiler.h"
+
+struct iovec;
+struct ofpbuf;
+
+/* The main process calls this function to start a worker. */
+void worker_start(void);
+
+/* Interface for main process to interact with the worker. */
+typedef void worker_request_func(struct ofpbuf *request,
+ const int fds[], size_t n_fds);
+typedef void worker_reply_func(struct ofpbuf *reply,
+ const int fds[], size_t n_fds, void *aux);
+
+bool worker_is_running(void);
+void worker_run(void);
+void worker_wait(void);
+
+void worker_request(const void *data, size_t size,
+ const int fds[], size_t n_fds,
+ worker_request_func *request_cb,
+ worker_reply_func *reply_cb, void *aux);
+void worker_request_iovec(const struct iovec *iovs, size_t n_iovs,
+ const int fds[], size_t n_fds,
+ worker_request_func *request_cb,
+ worker_reply_func *reply_cb, void *aux);
+
+/* Interfaces for RPC implementations (running in the worker process). */
+void worker_reply(const void *data, size_t size,
+ const int fds[], size_t n_fds);
+void worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
+ const int fds[], size_t n_fds);
+
+#endif /* worker.h */