From cc75061af782bb7e99d40e3e00a8eb90b2cbbc51 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Fri, 14 Oct 2011 13:55:00 -0700 Subject: [PATCH] netlink-socket: New function nl_sock_transact_multiple(). This will be used in an upcoming commit. --- lib/netlink-socket.c | 300 +++++++++++++++++++++++++++++++++++-------- lib/netlink-socket.h | 14 ++ 2 files changed, 257 insertions(+), 57 deletions(-) diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c index 160edd23..f117a6a2 100644 --- a/lib/netlink-socket.c +++ b/lib/netlink-socket.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "coverage.h" #include "dynamic-string.h" @@ -32,6 +33,7 @@ #include "poll-loop.h" #include "socket-util.h" #include "stress.h" +#include "util.h" #include "vlog.h" VLOG_DEFINE_THIS_MODULE(netlink_socket); @@ -63,8 +65,19 @@ struct nl_sock uint32_t pid; int protocol; struct nl_dump *dump; + unsigned int rcvbuf; /* Receive buffer size (SO_RCVBUF). */ }; +/* Compile-time limit on iovecs, so that we can allocate a maximum-size array + * of iovecs on the stack. */ +#define MAX_IOVS 128 + +/* Maximum number of iovecs that may be passed to sendmsg, capped at a + * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS. + * + * Initialized by nl_sock_create(). */ +static int max_iovs; + static int alloc_pid(uint32_t *); static void free_pid(uint32_t); static int nl_sock_cow__(struct nl_sock *); @@ -79,6 +92,23 @@ nl_sock_create(int protocol, struct nl_sock **sockp) struct sockaddr_nl local, remote; int retval = 0; + if (!max_iovs) { + int save_errno = errno; + errno = 0; + + max_iovs = sysconf(_SC_UIO_MAXIOV); + if (max_iovs < _XOPEN_IOV_MAX) { + if (max_iovs == -1 && errno) { + VLOG_WARN("sysconf(_SC_UIO_MAXIOV): %s", strerror(errno)); + } + max_iovs = _XOPEN_IOV_MAX; + } else if (max_iovs > MAX_IOVS) { + max_iovs = MAX_IOVS; + } + + errno = save_errno; + } + *sockp = NULL; sock = malloc(sizeof *sock); if (sock == NULL) { @@ -93,6 +123,13 @@ nl_sock_create(int protocol, struct nl_sock **sockp) sock->protocol = protocol; sock->dump = NULL; + retval = get_socket_rcvbuf(sock->fd); + if (retval < 0) { + retval = -retval; + goto error; + } + sock->rcvbuf = retval; + retval = alloc_pid(&sock->pid); if (retval) { goto error; @@ -354,6 +391,202 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf **bufp, bool wait) return nl_sock_recv__(sock, bufp, wait); } +static int +find_nl_transaction_by_seq(struct nl_transaction **transactions, size_t n, + uint32_t seq) +{ + int i; + + for (i = 0; i < n; i++) { + struct nl_transaction *t = transactions[i]; + + if (seq == nl_msg_nlmsghdr(t->request)->nlmsg_seq) { + return i; + } + } + + return -1; +} + +static void +nl_sock_record_errors__(struct nl_transaction **transactions, size_t n, + int error) +{ + size_t i; + + for (i = 0; i < n; i++) { + transactions[i]->error = error; + transactions[i]->reply = NULL; + } +} + +static int +nl_sock_transact_multiple__(struct nl_sock *sock, + struct nl_transaction **transactions, size_t n, + size_t *done) +{ + struct iovec iovs[MAX_IOVS]; + struct msghdr msg; + int error; + int i; + + *done = 0; + for (i = 0; i < n; i++) { + struct ofpbuf *request = transactions[i]->request; + struct nlmsghdr *nlmsg = nl_msg_nlmsghdr(request); + + nlmsg->nlmsg_len = request->size; + nlmsg->nlmsg_pid = sock->pid; + if (i == n - 1) { + /* Ensure that we get a reply even if the final request doesn't + * ordinarily call for one. */ + nlmsg->nlmsg_flags |= NLM_F_ACK; + } + + iovs[i].iov_base = request->data; + iovs[i].iov_len = request->size; + } + + memset(&msg, 0, sizeof msg); + msg.msg_iov = iovs; + msg.msg_iovlen = n; + do { + error = sendmsg(sock->fd, &msg, 0) < 0 ? errno : 0; + } while (error == EINTR); + + for (i = 0; i < n; i++) { + struct ofpbuf *request = transactions[i]->request; + + log_nlmsg(__func__, error, request->data, request->size, + sock->protocol); + } + if (!error) { + COVERAGE_ADD(netlink_sent, n); + } + + if (error) { + return error; + } + + while (n > 0) { + struct ofpbuf *reply; + + error = nl_sock_recv__(sock, &reply, true); + if (error) { + return error; + } + + i = find_nl_transaction_by_seq(transactions, n, + nl_msg_nlmsghdr(reply)->nlmsg_seq); + if (i < 0) { + VLOG_DBG_RL(&rl, "ignoring unexpected seq %#"PRIx32, + nl_msg_nlmsghdr(reply)->nlmsg_seq); + ofpbuf_delete(reply); + continue; + } + + nl_sock_record_errors__(transactions, i, 0); + if (nl_msg_nlmsgerr(reply, &error)) { + transactions[i]->reply = NULL; + transactions[i]->error = error; + if (error) { + VLOG_DBG_RL(&rl, "received NAK error=%d (%s)", + error, strerror(error)); + } + ofpbuf_delete(reply); + } else { + transactions[i]->reply = reply; + transactions[i]->error = 0; + } + + *done += i + 1; + transactions += i + 1; + n -= i + 1; + } + + return 0; +} + +/* Sends the 'request' member of the 'n' transactions in 'transactions' to the + * kernel, in order, and waits for responses to all of them. Fills in the + * 'error' member of each transaction with 0 if it was successful, otherwise + * with a positive errno value. 'reply' will be NULL on error or if the + * transaction was successful but had no reply beyond an indication of success. + * For a successful transaction that did have a more detailed reply, 'reply' + * will be set to the reply message. + * + * The caller is responsible for destroying each request and reply, and the + * transactions array itself. + * + * Before sending each message, this function will finalize nlmsg_len in each + * 'request' to match the ofpbuf's size, and set nlmsg_pid to 'sock''s pid. + * NLM_F_ACK will be added to some requests' nlmsg_flags. + * + * Bare Netlink is an unreliable transport protocol. This function layers + * reliable delivery and reply semantics on top of bare Netlink. See + * nl_sock_transact() for some caveats. + */ +void +nl_sock_transact_multiple(struct nl_sock *sock, + struct nl_transaction **transactions, size_t n) +{ + int max_batch_count; + int error; + + if (!n) { + return; + } + + error = nl_sock_cow__(sock); + if (error) { + nl_sock_record_errors__(transactions, n, error); + return; + } + + /* In theory, every request could have a 64 kB reply. But the default and + * maximum socket rcvbuf size with typical Dom0 memory sizes both tend to + * be a bit below 128 kB, so that would only allow a single message in a + * "batch". So we assume that replies average (at most) 4 kB, which allows + * a good deal of batching. + * + * In practice, most of the requests that we batch either have no reply at + * all or a brief reply. */ + max_batch_count = MAX(sock->rcvbuf / 4096, 1); + max_batch_count = MIN(max_batch_count, max_iovs); + + while (n > 0) { + size_t count, bytes; + size_t done; + + /* Batch up to 'max_batch_count' transactions. But cap it at about a + * page of requests total because big skbuffs are expensive to + * allocate in the kernel. */ +#if defined(PAGESIZE) + enum { MAX_BATCH_BYTES = MAX(1, PAGESIZE - 512) }; +#else + enum { MAX_BATCH_BYTES = 4096 - 512 }; +#endif + bytes = transactions[0]->request->size; + for (count = 1; count < n && count < max_batch_count; count++) { + if (bytes + transactions[count]->request->size > MAX_BATCH_BYTES) { + break; + } + bytes += transactions[count]->request->size; + } + + error = nl_sock_transact_multiple__(sock, transactions, count, &done); + transactions += done; + n -= done; + + if (error == ENOBUFS) { + VLOG_DBG_RL(&rl, "receive buffer overflow, resending request"); + } else if (error) { + VLOG_ERR_RL(&rl, "transaction error (%s)", strerror(error)); + nl_sock_record_errors__(transactions, n, error); + } + } +} + /* Sends 'request' to the kernel via 'sock' and waits for a response. If * successful, returns 0. On failure, returns a positive errno value. * @@ -395,68 +628,21 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf **bufp, bool wait) * needs to be idempotent. */ int -nl_sock_transact(struct nl_sock *sock, - const struct ofpbuf *request, struct ofpbuf **replyp) +nl_sock_transact(struct nl_sock *sock, const struct ofpbuf *request, + struct ofpbuf **replyp) { - uint32_t seq = nl_msg_nlmsghdr(request)->nlmsg_seq; - struct nlmsghdr *nlmsghdr; - struct ofpbuf *reply; - int retval; + struct nl_transaction *transactionp; + struct nl_transaction transaction; + transaction.request = (struct ofpbuf *) request; + transactionp = &transaction; + nl_sock_transact_multiple(sock, &transactionp, 1); if (replyp) { - *replyp = NULL; - } - - /* Ensure that we get a reply even if this message doesn't ordinarily call - * for one. */ - nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_ACK; - -send: - retval = nl_sock_send(sock, request, true); - if (retval) { - return retval; - } - -recv: - retval = nl_sock_recv(sock, &reply, true); - if (retval) { - if (retval == ENOBUFS) { - COVERAGE_INC(netlink_overflow); - VLOG_DBG_RL(&rl, "receive buffer overflow, resending request"); - goto send; - } else { - return retval; - } - } - nlmsghdr = nl_msg_nlmsghdr(reply); - if (seq != nlmsghdr->nlmsg_seq) { - VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32, - nl_msg_nlmsghdr(reply)->nlmsg_seq, seq); - ofpbuf_delete(reply); - goto recv; - } - - /* If the reply is an error, discard the reply and return the error code. - * - * Except: if the reply is just an acknowledgement (error code of 0), and - * the caller is interested in the reply (replyp != NULL), pass the reply - * up to the caller. Otherwise the caller will get a return value of 0 - * and null '*replyp', which makes unwary callers likely to segfault. */ - if (nl_msg_nlmsgerr(reply, &retval) && (retval || !replyp)) { - ofpbuf_delete(reply); - if (retval) { - VLOG_DBG_RL(&rl, "received NAK error=%d (%s)", - retval, strerror(retval)); - } - return retval != EAGAIN ? retval : EPROTO; - } - - if (replyp) { - *replyp = reply; + *replyp = transaction.reply; } else { - ofpbuf_delete(reply); + ofpbuf_delete(transaction.reply); } - return 0; + return transaction.error; } /* Drain all the messages currently in 'sock''s receive queue. */ diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h index d789f412..7e01acbf 100644 --- a/lib/netlink-socket.h +++ b/lib/netlink-socket.h @@ -35,6 +35,7 @@ #include #include #include +#include "list.h" struct ofpbuf; struct nl_sock; @@ -63,6 +64,19 @@ short int nl_sock_woke(const struct nl_sock *); uint32_t nl_sock_pid(const struct nl_sock *); +/* Batching transactions. */ +struct nl_transaction { + /* Filled in by client. */ + struct ofpbuf *request; /* Request to send. */ + + /* Filled in by nl_sock_transact_batch(). */ + struct ofpbuf *reply; /* Reply (NULL if reply was an error code). */ + int error; /* Positive errno value, 0 if no error. */ +}; + +void nl_sock_transact_multiple(struct nl_sock *, + struct nl_transaction **, size_t n); + /* Table dumping. */ struct nl_dump { struct nl_sock *sock; /* Socket being dumped. */ -- 2.30.2