ofproto-dpif: Batch interacting with the dpif on flow miss operations.
authorBen Pfaff <blp@nicira.com>
Fri, 14 Oct 2011 20:55:32 +0000 (13:55 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 14 Oct 2011 21:08:45 +0000 (14:08 -0700)
This improves "ovs-benchmark rate" performance in my testing by about 24%.

A quick experiment shows that there may still be some headroom for batching
flow deletions on facet expiration, up to perhaps 10% additional
improvement.

ofproto/ofproto-dpif.c

index d2cc36f91816e72d35879dfec027e8d03b6c066f..ae76e98516e8e3cdef65d8c3650efaee7ee7a5a3 100644 (file)
@@ -400,7 +400,12 @@ static void update_learning_table(struct ofproto_dpif *,
 static bool is_admissible(struct ofproto_dpif *, const struct flow *,
                           bool have_packet, tag_type *, int *vlanp,
                           struct ofbundle **in_bundlep);
+
+/* Upcalls. */
+#define FLOW_MISS_MAX_BATCH 50
 static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
+static void handle_miss_upcalls(struct ofproto_dpif *,
+                                struct dpif_upcall *, size_t n);
 
 /* Flow expiration. */
 static int expire(struct ofproto_dpif *);
@@ -569,8 +574,10 @@ static int
 run(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
     struct ofport_dpif *ofport;
     struct ofbundle *bundle;
+    size_t n_misses;
     int i;
 
     if (!clogged) {
@@ -578,22 +585,29 @@ run(struct ofproto *ofproto_)
     }
     dpif_run(ofproto->dpif);
 
-    for (i = 0; i < 50; i++) {
-        struct dpif_upcall packet;
+    n_misses = 0;
+    for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
+        struct dpif_upcall *upcall = &misses[n_misses];
         int error;
 
-        error = dpif_recv(ofproto->dpif, &packet);
+        error = dpif_recv(ofproto->dpif, upcall);
         if (error) {
-            if (error == ENODEV) {
-                /* Datapath destroyed. */
+            if (error == ENODEV && n_misses == 0) {
                 return error;
             }
             break;
         }
 
-        handle_upcall(ofproto, &packet);
+        if (upcall->type == DPIF_UC_MISS) {
+            /* Handle it later. */
+            n_misses++;
+        } else {
+            handle_upcall(ofproto, upcall);
+        }
     }
 
+    handle_miss_upcalls(ofproto, misses, n_misses);
+
     if (timer_expired(&ofproto->next_expiration)) {
         int delay = expire(ofproto);
         timer_set_duration(&ofproto->next_expiration, delay);
@@ -1719,6 +1733,28 @@ port_is_lacp_current(const struct ofport *ofport_)
 \f
 /* Upcall handling. */
 
+/* Flow miss batching.
+ *
+ * Some dpifs implement operations faster when you hand them off in a batch.
+ * To allow batching, "struct flow_miss" queues the dpif-related work needed
+ * for a given flow.  Each "struct flow_miss" corresponds to sending one or
+ * more packets, plus possibly installing the flow in the dpif.
+ *
+ * So far we only batch the operations that affect flow setup time the most.
+ * It's possible to batch more than that, but the benefit might be minimal. */
+struct flow_miss {
+    struct hmap_node hmap_node;
+    struct flow flow;
+    const struct nlattr *key;
+    size_t key_len;
+    struct list packets;
+};
+
+struct flow_miss_op {
+    union dpif_op dpif_op;
+    struct facet *facet;
+};
+
 /* Sends an OFPT_PACKET_IN message for 'packet' of type OFPR_NO_MATCH to each
  * OpenFlow controller as necessary according to their individual
  * configurations.
@@ -1792,72 +1828,206 @@ process_special(struct ofproto_dpif *ofproto, const struct flow *flow,
     return false;
 }
 
-static void
-handle_miss_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static struct flow_miss *
+flow_miss_create(struct hmap *todo, const struct flow *flow,
+                 const struct nlattr *key, size_t key_len)
 {
-    struct facet *facet;
-    struct flow flow;
+    uint32_t hash = flow_hash(flow, 0);
+    struct flow_miss *miss;
 
-    /* Obtain in_port and tun_id, at least. */
-    odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+    HMAP_FOR_EACH_WITH_HASH (miss, hmap_node, hash, todo) {
+        if (flow_equal(&miss->flow, flow)) {
+            return miss;
+        }
+    }
 
-    /* Set header pointers in 'flow'. */
-    flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
+    miss = xmalloc(sizeof *miss);
+    hmap_insert(todo, &miss->hmap_node, hash);
+    miss->flow = *flow;
+    miss->key = key;
+    miss->key_len = key_len;
+    list_init(&miss->packets);
+    return miss;
+}
 
-    /* Handle 802.1ag and LACP. */
-    if (process_special(ofproto, &flow, upcall->packet)) {
-        ofpbuf_delete(upcall->packet);
-        ofproto->n_matches++;
-        return;
-    }
+static void
+handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
+                 struct flow_miss_op *ops, size_t *n_ops)
+{
+    const struct flow *flow = &miss->flow;
+    struct ofpbuf *packet, *next_packet;
+    struct facet *facet;
 
-    facet = facet_lookup_valid(ofproto, &flow);
+    facet = facet_lookup_valid(ofproto, flow);
     if (!facet) {
-        struct rule_dpif *rule = rule_dpif_lookup(ofproto, &flow, 0);
+        struct rule_dpif *rule;
+
+        rule = rule_dpif_lookup(ofproto, flow, 0);
         if (!rule) {
             /* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
-            struct ofport_dpif *port = get_ofp_port(ofproto, flow.in_port);
+            struct ofport_dpif *port = get_ofp_port(ofproto, flow->in_port);
             if (port) {
                 if (port->up.opp.config & htonl(OFPPC_NO_PACKET_IN)) {
                     COVERAGE_INC(ofproto_dpif_no_packet_in);
                     /* XXX install 'drop' flow entry */
-                    ofpbuf_delete(upcall->packet);
                     return;
                 }
             } else {
                 VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
-                             flow.in_port);
+                             flow->in_port);
+            }
+
+            LIST_FOR_EACH_SAFE (packet, next_packet, list_node,
+                                &miss->packets) {
+                list_remove(&packet->list_node);
+                send_packet_in_miss(ofproto, packet, flow, false);
             }
 
-            send_packet_in_miss(ofproto, upcall->packet, &flow, false);
             return;
         }
 
-        facet = facet_create(rule, &flow);
-        facet_make_actions(ofproto, facet, upcall->packet);
-    } else if (!facet->may_install) {
-        /* The facet is not installable, that is, we need to process every
-         * packet, so process the current packet's actions into 'facet'. */
-        facet_make_actions(ofproto, facet, upcall->packet);
+        facet = facet_create(rule, flow);
     }
 
-    if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
-        /*
-         * Extra-special case for fail-open mode.
-         *
-         * We are in fail-open mode and the packet matched the fail-open rule,
-         * but we are connected to a controller too.  We should send the packet
-         * up to the controller in the hope that it will try to set up a flow
-         * and thereby allow us to exit fail-open.
-         *
-         * See the top-level comment in fail-open.c for more information.
-         */
-        send_packet_in_miss(ofproto, upcall->packet, &flow, true);
+    LIST_FOR_EACH_SAFE (packet, next_packet, list_node, &miss->packets) {
+        list_remove(&packet->list_node);
+        ofproto->n_matches++;
+
+        if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
+            /*
+             * Extra-special case for fail-open mode.
+             *
+             * We are in fail-open mode and the packet matched the fail-open
+             * rule, but we are connected to a controller too.  We should send
+             * the packet up to the controller in the hope that it will try to
+             * set up a flow and thereby allow us to exit fail-open.
+             *
+             * See the top-level comment in fail-open.c for more information.
+             */
+            send_packet_in_miss(ofproto, packet, flow, true);
+        }
+
+        if (!facet->may_install) {
+            facet_make_actions(ofproto, facet, packet);
+        }
+        if (!execute_controller_action(ofproto, &facet->flow,
+                                       facet->actions, facet->actions_len,
+                                       packet)) {
+            struct flow_miss_op *op = &ops[(*n_ops)++];
+            struct dpif_execute *execute = &op->dpif_op.execute;
+
+            op->facet = facet;
+            execute->type = DPIF_OP_EXECUTE;
+            execute->key = miss->key;
+            execute->key_len = miss->key_len;
+            execute->actions
+                = (facet->may_install
+                   ? facet->actions
+                   : xmemdup(facet->actions, facet->actions_len));
+            execute->actions_len = facet->actions_len;
+            execute->packet = packet;
+        }
     }
 
-    facet_execute(ofproto, facet, upcall->packet);
-    facet_install(ofproto, facet, false);
-    ofproto->n_matches++;
+    if (facet->may_install) {
+        struct flow_miss_op *op = &ops[(*n_ops)++];
+        struct dpif_flow_put *put = &op->dpif_op.flow_put;
+
+        op->facet = facet;
+        put->type = DPIF_OP_FLOW_PUT;
+        put->flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
+        put->key = miss->key;
+        put->key_len = miss->key_len;
+        put->actions = facet->actions;
+        put->actions_len = facet->actions_len;
+        put->stats = NULL;
+    }
+}
+
+static void
+handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
+                    size_t n_upcalls)
+{
+    struct dpif_upcall *upcall;
+    struct flow_miss *miss, *next_miss;
+    struct flow_miss_op flow_miss_ops[FLOW_MISS_MAX_BATCH * 2];
+    union dpif_op *dpif_ops[FLOW_MISS_MAX_BATCH * 2];
+    struct hmap todo;
+    size_t n_ops;
+    size_t i;
+
+    if (!n_upcalls) {
+        return;
+    }
+
+    /* Construct the to-do list.
+     *
+     * This just amounts to extracting the flow from each packet and sticking
+     * the packets that have the same flow in the same "flow_miss" structure so
+     * that we can process them together. */
+    hmap_init(&todo);
+    for (upcall = upcalls; upcall < &upcalls[n_upcalls]; upcall++) {
+        struct flow_miss *miss;
+        struct flow flow;
+
+        /* Obtain in_port and tun_id, at least, then set 'flow''s header
+         * pointers. */
+        odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+        flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
+
+        /* Handle 802.1ag and LACP specially. */
+        if (process_special(ofproto, &flow, upcall->packet)) {
+            ofpbuf_delete(upcall->packet);
+            ofproto->n_matches++;
+            continue;
+        }
+
+        /* Add other packets to a to-do list. */
+        miss = flow_miss_create(&todo, &flow, upcall->key, upcall->key_len);
+        list_push_back(&miss->packets, &upcall->packet->list_node);
+    }
+
+    /* Process each element in the to-do list, constructing the set of
+     * operations to batch. */
+    n_ops = 0;
+    HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &todo) {
+        handle_flow_miss(ofproto, miss, flow_miss_ops, &n_ops);
+        ofpbuf_list_delete(&miss->packets);
+        hmap_remove(&todo, &miss->hmap_node);
+        free(miss);
+    }
+    assert(n_ops <= ARRAY_SIZE(flow_miss_ops));
+    hmap_destroy(&todo);
+
+    /* Execute batch. */
+    for (i = 0; i < n_ops; i++) {
+        dpif_ops[i] = &flow_miss_ops[i].dpif_op;
+    }
+    dpif_operate(ofproto->dpif, dpif_ops, n_ops);
+
+    /* Free memory and update facets. */
+    for (i = 0; i < n_ops; i++) {
+        struct flow_miss_op *op = &flow_miss_ops[i];
+        struct dpif_execute *execute;
+        struct dpif_flow_put *put;
+
+        switch (op->dpif_op.type) {
+        case DPIF_OP_EXECUTE:
+            execute = &op->dpif_op.execute;
+            if (op->facet->actions != execute->actions) {
+                free((struct nlattr *) execute->actions);
+            }
+            ofpbuf_delete((struct ofpbuf *) execute->packet);
+            break;
+
+        case DPIF_OP_FLOW_PUT:
+            put = &op->dpif_op.flow_put;
+            if (!put->error) {
+                op->facet->installed = true;
+            }
+            break;
+        }
+    }
 }
 
 static void
@@ -1895,8 +2065,8 @@ handle_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
         break;
 
     case DPIF_UC_MISS:
-        handle_miss_upcall(ofproto, upcall);
-        break;
+        /* The caller handles these. */
+        NOT_REACHED();
 
     case DPIF_N_UC_TYPES:
     default: