From: Ben Pfaff Date: Fri, 6 Apr 2012 23:23:28 +0000 (-0700) Subject: dpif: Make caller of dpif_recv() provide buffer space. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=90a7c55e56bca82a0b7a05ed068d054b5c8a7584;p=openvswitch dpif: Make caller of dpif_recv() provide buffer space. This improves performance under heavy flow setup loads. Signed-off-by: Ben Pfaff --- diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c index a2908cfe..932b36f0 100644 --- a/lib/dpif-linux.c +++ b/lib/dpif-linux.c @@ -1089,7 +1089,8 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall, } static int -dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall) +dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall, + struct ofpbuf *buf) { struct dpif_linux *dpif = dpif_linux_cast(dpif_); int read_tries = 0; @@ -1123,7 +1124,6 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall) dpif->ready_mask &= ~(1u << indx); for (;;) { - struct ofpbuf *buf; int dp_ifindex; int error; @@ -1131,10 +1131,8 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall) return EAGAIN; } - buf = ofpbuf_new(2048); error = nl_sock_recv(upcall_sock, buf, false); if (error) { - ofpbuf_delete(buf); if (error == EAGAIN) { break; } @@ -1145,8 +1143,6 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall) if (!error && dp_ifindex == dpif->dp_ifindex) { return 0; } - - ofpbuf_delete(buf); if (error) { return error; } diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index a9077227..a33fe23d 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -935,7 +935,8 @@ find_nonempty_queue(struct dpif *dpif) } static int -dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall) +dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, + struct ofpbuf *buf) { struct dp_netdev_queue *q = find_nonempty_queue(dpif); if (q) { @@ -943,6 +944,9 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall) *upcall = *u; free(u); + ofpbuf_uninit(buf); + *buf = *u->packet; + return 0; } else { return EAGAIN; diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h index 75c65bba..6338f50a 100644 --- a/lib/dpif-provider.h +++ b/lib/dpif-provider.h @@ -307,21 +307,19 @@ struct dpif_class { uint32_t *priority); /* Polls for an upcall from 'dpif'. If successful, stores the upcall into - * '*upcall'. Should only be called if 'recv_set' has been used to enable - * receiving packets from 'dpif'. + * '*upcall', using 'buf' for storage. Should only be called if 'recv_set' + * has been used to enable receiving packets from 'dpif'. * - * The caller takes ownership of the data that 'upcall' points to. - * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned - * by 'upcall->packet', so their memory cannot be freed separately. (This - * is hardly a great way to do things but it works out OK for the dpif - * providers that exist so far.) - * - * For greatest efficiency, 'upcall->packet' should have at least - * offsetof(struct ofp_packet_in, data) bytes of headroom. + * The implementation should point 'upcall->packet' and 'upcall->key' into + * data in the caller-provided 'buf'. If necessary to make room, the + * implementation may expand the data in 'buf'. (This is hardly a great + * way to do things but it works out OK for the dpif providers that exist + * so far.) * * This function must not block. If no upcall is pending when it is * called, it should return EAGAIN without blocking. */ - int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall); + int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall, + struct ofpbuf *buf); /* Arranges for the poll loop to wake up when 'dpif' has a message queued * to be received with the recv member function. */ diff --git a/lib/dpif.c b/lib/dpif.c index e000e13d..a1bd59b0 100644 --- a/lib/dpif.c +++ b/lib/dpif.c @@ -1069,21 +1069,20 @@ dpif_recv_set(struct dpif *dpif, bool enable) } /* Polls for an upcall from 'dpif'. If successful, stores the upcall into - * '*upcall'. Should only be called if dpif_recv_set() has been used to enable - * receiving packets on 'dpif'. + * '*upcall', using 'buf' for storage. Should only be called if + * dpif_recv_set() has been used to enable receiving packets on 'dpif'. * - * The caller takes ownership of the data that 'upcall' points to. - * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned by - * 'upcall->packet', so their memory cannot be freed separately. (This is + * 'upcall->packet' and 'upcall->key' point into data in the caller-provided + * 'buf', so their memory cannot be freed separately from 'buf'. (This is * hardly a great way to do things but it works out OK for the dpif providers * and clients that exist so far.) * * Returns 0 if successful, otherwise a positive errno value. Returns EAGAIN * if no upcall is immediately available. */ int -dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall) +dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) { - int error = dpif->dpif_class->recv(dpif, upcall); + int error = dpif->dpif_class->recv(dpif, upcall, buf); if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) { struct ds flow; char *packet; diff --git a/lib/dpif.h b/lib/dpif.h index 768934b1..bdd4fee6 100644 --- a/lib/dpif.h +++ b/lib/dpif.h @@ -251,7 +251,7 @@ struct dpif_upcall { }; int dpif_recv_set(struct dpif *, bool enable); -int dpif_recv(struct dpif *, struct dpif_upcall *); +int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *); void dpif_recv_purge(struct dpif *); void dpif_recv_wait(struct dpif *); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 978b5ed5..62634315 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -2783,7 +2783,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls, &flow, &initial_tci, upcall->packet); if (fitness == ODP_FIT_ERROR) { - ofpbuf_delete(upcall->packet); continue; } flow_extract(upcall->packet, flow.skb_priority, flow.tun_id, @@ -2793,7 +2792,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls, if (process_special(ofproto, &flow, upcall->packet)) { ofproto_update_local_port_stats(&ofproto->up, 0, upcall->packet->size); - ofpbuf_delete(upcall->packet); ofproto->n_matches++; continue; } @@ -2842,7 +2840,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls, } } HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &todo) { - ofpbuf_list_delete(&miss->packets); hmap_remove(&todo, &miss->hmap_node); free(miss); } @@ -2864,7 +2861,6 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto, upcall->key_len, &flow, &initial_tci, upcall->packet); if (fitness == ODP_FIT_ERROR) { - ofpbuf_delete(upcall->packet); return; } @@ -2876,31 +2872,39 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto, } else { VLOG_WARN_RL(&rl, "invalid user cookie : 0x%"PRIx64, upcall->userdata); } - ofpbuf_delete(upcall->packet); } static int handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch) { struct dpif_upcall misses[FLOW_MISS_MAX_BATCH]; + struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH]; + uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8]; + int n_processed; int n_misses; int i; - assert (max_batch <= FLOW_MISS_MAX_BATCH); + assert(max_batch <= FLOW_MISS_MAX_BATCH); + n_processed = 0; n_misses = 0; - for (i = 0; i < max_batch; i++) { + for (n_processed = 0; n_processed < max_batch; n_processed++) { struct dpif_upcall *upcall = &misses[n_misses]; + struct ofpbuf *buf = &miss_bufs[n_misses]; int error; - error = dpif_recv(ofproto->dpif, upcall); + ofpbuf_use_stub(buf, miss_buf_stubs[n_misses], + sizeof miss_buf_stubs[n_misses]); + error = dpif_recv(ofproto->dpif, upcall, buf); if (error) { + ofpbuf_uninit(buf); break; } switch (upcall->type) { case DPIF_UC_ACTION: handle_userspace_upcall(ofproto, upcall); + ofpbuf_uninit(buf); break; case DPIF_UC_MISS: @@ -2917,8 +2921,11 @@ handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch) } handle_miss_upcalls(ofproto, misses, n_misses); + for (i = 0; i < n_misses; i++) { + ofpbuf_uninit(&miss_bufs[i]); + } - return i; + return n_processed; } /* Flow expiration. */