static bool is_admissible(struct ofproto_dpif *, const struct flow *,
bool have_packet, tag_type *, int *vlanp,
struct ofbundle **in_bundlep);
+
+/* Upcalls. */
+#define FLOW_MISS_MAX_BATCH 50
static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
+static void handle_miss_upcalls(struct ofproto_dpif *,
+ struct dpif_upcall *, size_t n);
/* Flow expiration. */
static int expire(struct ofproto_dpif *);
run(struct ofproto *ofproto_)
{
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+ struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
struct ofport_dpif *ofport;
struct ofbundle *bundle;
+ size_t n_misses;
int i;
if (!clogged) {
}
dpif_run(ofproto->dpif);
- for (i = 0; i < 50; i++) {
- struct dpif_upcall packet;
+ n_misses = 0;
+ for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
+ struct dpif_upcall *upcall = &misses[n_misses];
int error;
- error = dpif_recv(ofproto->dpif, &packet);
+ error = dpif_recv(ofproto->dpif, upcall);
if (error) {
- if (error == ENODEV) {
- /* Datapath destroyed. */
+ if (error == ENODEV && n_misses == 0) {
return error;
}
break;
}
- handle_upcall(ofproto, &packet);
+ if (upcall->type == DPIF_UC_MISS) {
+ /* Handle it later. */
+ n_misses++;
+ } else {
+ handle_upcall(ofproto, upcall);
+ }
}
+ handle_miss_upcalls(ofproto, misses, n_misses);
+
if (timer_expired(&ofproto->next_expiration)) {
int delay = expire(ofproto);
timer_set_duration(&ofproto->next_expiration, delay);
\f
/* Upcall handling. */
+/* Flow miss batching.
+ *
+ * Some dpifs implement operations faster when you hand them off in a batch.
+ * To allow batching, "struct flow_miss" queues the dpif-related work needed
+ * for a given flow. Each "struct flow_miss" corresponds to sending one or
+ * more packets, plus possibly installing the flow in the dpif.
+ *
+ * So far we only batch the operations that affect flow setup time the most.
+ * It's possible to batch more than that, but the benefit might be minimal. */
+struct flow_miss {
+ struct hmap_node hmap_node;
+ struct flow flow;
+ const struct nlattr *key;
+ size_t key_len;
+ struct list packets;
+};
+
+struct flow_miss_op {
+ union dpif_op dpif_op;
+ struct facet *facet;
+};
+
/* Sends an OFPT_PACKET_IN message for 'packet' of type OFPR_NO_MATCH to each
* OpenFlow controller as necessary according to their individual
* configurations.
return false;
}
-static void
-handle_miss_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static struct flow_miss *
+flow_miss_create(struct hmap *todo, const struct flow *flow,
+ const struct nlattr *key, size_t key_len)
{
- struct facet *facet;
- struct flow flow;
+ uint32_t hash = flow_hash(flow, 0);
+ struct flow_miss *miss;
- /* Obtain in_port and tun_id, at least. */
- odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+ HMAP_FOR_EACH_WITH_HASH (miss, hmap_node, hash, todo) {
+ if (flow_equal(&miss->flow, flow)) {
+ return miss;
+ }
+ }
- /* Set header pointers in 'flow'. */
- flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
+ miss = xmalloc(sizeof *miss);
+ hmap_insert(todo, &miss->hmap_node, hash);
+ miss->flow = *flow;
+ miss->key = key;
+ miss->key_len = key_len;
+ list_init(&miss->packets);
+ return miss;
+}
- /* Handle 802.1ag and LACP. */
- if (process_special(ofproto, &flow, upcall->packet)) {
- ofpbuf_delete(upcall->packet);
- ofproto->n_matches++;
- return;
- }
+static void
+handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
+ struct flow_miss_op *ops, size_t *n_ops)
+{
+ const struct flow *flow = &miss->flow;
+ struct ofpbuf *packet, *next_packet;
+ struct facet *facet;
- facet = facet_lookup_valid(ofproto, &flow);
+ facet = facet_lookup_valid(ofproto, flow);
if (!facet) {
- struct rule_dpif *rule = rule_dpif_lookup(ofproto, &flow, 0);
+ struct rule_dpif *rule;
+
+ rule = rule_dpif_lookup(ofproto, flow, 0);
if (!rule) {
/* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
- struct ofport_dpif *port = get_ofp_port(ofproto, flow.in_port);
+ struct ofport_dpif *port = get_ofp_port(ofproto, flow->in_port);
if (port) {
if (port->up.opp.config & htonl(OFPPC_NO_PACKET_IN)) {
COVERAGE_INC(ofproto_dpif_no_packet_in);
/* XXX install 'drop' flow entry */
- ofpbuf_delete(upcall->packet);
return;
}
} else {
VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
- flow.in_port);
+ flow->in_port);
+ }
+
+ LIST_FOR_EACH_SAFE (packet, next_packet, list_node,
+ &miss->packets) {
+ list_remove(&packet->list_node);
+ send_packet_in_miss(ofproto, packet, flow, false);
}
- send_packet_in_miss(ofproto, upcall->packet, &flow, false);
return;
}
- facet = facet_create(rule, &flow);
- facet_make_actions(ofproto, facet, upcall->packet);
- } else if (!facet->may_install) {
- /* The facet is not installable, that is, we need to process every
- * packet, so process the current packet's actions into 'facet'. */
- facet_make_actions(ofproto, facet, upcall->packet);
+ facet = facet_create(rule, flow);
}
- if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
- /*
- * Extra-special case for fail-open mode.
- *
- * We are in fail-open mode and the packet matched the fail-open rule,
- * but we are connected to a controller too. We should send the packet
- * up to the controller in the hope that it will try to set up a flow
- * and thereby allow us to exit fail-open.
- *
- * See the top-level comment in fail-open.c for more information.
- */
- send_packet_in_miss(ofproto, upcall->packet, &flow, true);
+ LIST_FOR_EACH_SAFE (packet, next_packet, list_node, &miss->packets) {
+ list_remove(&packet->list_node);
+ ofproto->n_matches++;
+
+ if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
+ /*
+ * Extra-special case for fail-open mode.
+ *
+ * We are in fail-open mode and the packet matched the fail-open
+ * rule, but we are connected to a controller too. We should send
+ * the packet up to the controller in the hope that it will try to
+ * set up a flow and thereby allow us to exit fail-open.
+ *
+ * See the top-level comment in fail-open.c for more information.
+ */
+ send_packet_in_miss(ofproto, packet, flow, true);
+ }
+
+ if (!facet->may_install) {
+ facet_make_actions(ofproto, facet, packet);
+ }
+ if (!execute_controller_action(ofproto, &facet->flow,
+ facet->actions, facet->actions_len,
+ packet)) {
+ struct flow_miss_op *op = &ops[(*n_ops)++];
+ struct dpif_execute *execute = &op->dpif_op.execute;
+
+ op->facet = facet;
+ execute->type = DPIF_OP_EXECUTE;
+ execute->key = miss->key;
+ execute->key_len = miss->key_len;
+ execute->actions
+ = (facet->may_install
+ ? facet->actions
+ : xmemdup(facet->actions, facet->actions_len));
+ execute->actions_len = facet->actions_len;
+ execute->packet = packet;
+ }
}
- facet_execute(ofproto, facet, upcall->packet);
- facet_install(ofproto, facet, false);
- ofproto->n_matches++;
+ if (facet->may_install) {
+ struct flow_miss_op *op = &ops[(*n_ops)++];
+ struct dpif_flow_put *put = &op->dpif_op.flow_put;
+
+ op->facet = facet;
+ put->type = DPIF_OP_FLOW_PUT;
+ put->flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
+ put->key = miss->key;
+ put->key_len = miss->key_len;
+ put->actions = facet->actions;
+ put->actions_len = facet->actions_len;
+ put->stats = NULL;
+ }
+}
+
+static void
+handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
+ size_t n_upcalls)
+{
+ struct dpif_upcall *upcall;
+ struct flow_miss *miss, *next_miss;
+ struct flow_miss_op flow_miss_ops[FLOW_MISS_MAX_BATCH * 2];
+ union dpif_op *dpif_ops[FLOW_MISS_MAX_BATCH * 2];
+ struct hmap todo;
+ size_t n_ops;
+ size_t i;
+
+ if (!n_upcalls) {
+ return;
+ }
+
+ /* Construct the to-do list.
+ *
+ * This just amounts to extracting the flow from each packet and sticking
+ * the packets that have the same flow in the same "flow_miss" structure so
+ * that we can process them together. */
+ hmap_init(&todo);
+ for (upcall = upcalls; upcall < &upcalls[n_upcalls]; upcall++) {
+ struct flow_miss *miss;
+ struct flow flow;
+
+ /* Obtain in_port and tun_id, at least, then set 'flow''s header
+ * pointers. */
+ odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+ flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
+
+ /* Handle 802.1ag and LACP specially. */
+ if (process_special(ofproto, &flow, upcall->packet)) {
+ ofpbuf_delete(upcall->packet);
+ ofproto->n_matches++;
+ continue;
+ }
+
+ /* Add other packets to a to-do list. */
+ miss = flow_miss_create(&todo, &flow, upcall->key, upcall->key_len);
+ list_push_back(&miss->packets, &upcall->packet->list_node);
+ }
+
+ /* Process each element in the to-do list, constructing the set of
+ * operations to batch. */
+ n_ops = 0;
+ HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &todo) {
+ handle_flow_miss(ofproto, miss, flow_miss_ops, &n_ops);
+ ofpbuf_list_delete(&miss->packets);
+ hmap_remove(&todo, &miss->hmap_node);
+ free(miss);
+ }
+ assert(n_ops <= ARRAY_SIZE(flow_miss_ops));
+ hmap_destroy(&todo);
+
+ /* Execute batch. */
+ for (i = 0; i < n_ops; i++) {
+ dpif_ops[i] = &flow_miss_ops[i].dpif_op;
+ }
+ dpif_operate(ofproto->dpif, dpif_ops, n_ops);
+
+ /* Free memory and update facets. */
+ for (i = 0; i < n_ops; i++) {
+ struct flow_miss_op *op = &flow_miss_ops[i];
+ struct dpif_execute *execute;
+ struct dpif_flow_put *put;
+
+ switch (op->dpif_op.type) {
+ case DPIF_OP_EXECUTE:
+ execute = &op->dpif_op.execute;
+ if (op->facet->actions != execute->actions) {
+ free((struct nlattr *) execute->actions);
+ }
+ ofpbuf_delete((struct ofpbuf *) execute->packet);
+ break;
+
+ case DPIF_OP_FLOW_PUT:
+ put = &op->dpif_op.flow_put;
+ if (!put->error) {
+ op->facet->installed = true;
+ }
+ break;
+ }
+ }
}
static void
break;
case DPIF_UC_MISS:
- handle_miss_upcall(ofproto, upcall);
- break;
+ /* The caller handles these. */
+ NOT_REACHED();
case DPIF_N_UC_TYPES:
default: