struct ofbundle *);
/* Upcalls. */
#define FLOW_MISS_MAX_BATCH 50
-
-static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
-static void handle_miss_upcalls(struct ofproto_dpif *,
- struct dpif_upcall *, size_t n);
+static int handle_upcalls(struct ofproto_dpif *, unsigned int max_batch);
/* Flow expiration. */
static int expire(struct ofproto_dpif *);
run(struct ofproto *ofproto_)
{
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
- struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
struct ofport_dpif *ofport;
struct ofbundle *bundle;
- size_t n_misses;
- int i;
+ unsigned int work;
if (!clogged) {
complete_operations(ofproto);
}
dpif_run(ofproto->dpif);
- n_misses = 0;
- for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
- struct dpif_upcall *upcall = &misses[n_misses];
- int error;
-
- error = dpif_recv(ofproto->dpif, upcall);
- if (error) {
- if (error == ENODEV && n_misses == 0) {
- return error;
- }
+ /* Handle one or more batches of upcalls, until there's nothing left to do
+ * or until we do a fixed total amount of work.
+ *
+ * We do work in batches because it can be much cheaper to set up a number
+ * of flows and fire off their patches all at once. We do multiple batches
+ * because in some cases handling a packet can cause another packet to be
+ * queued almost immediately as part of the return flow. Both
+ * optimizations can make major improvements on some benchmarks and
+ * presumably for real traffic as well. */
+ work = 0;
+ while (work < FLOW_MISS_MAX_BATCH) {
+ int retval = handle_upcalls(ofproto, FLOW_MISS_MAX_BATCH - work);
+ if (retval < 0) {
+ return -retval;
+ } else if (!retval) {
break;
- }
-
- if (upcall->type == DPIF_UC_MISS) {
- /* Handle it later. */
- n_misses++;
} else {
- handle_upcall(ofproto, upcall);
+ work += retval;
}
}
- handle_miss_upcalls(ofproto, misses, n_misses);
-
if (timer_expired(&ofproto->next_expiration)) {
int delay = expire(ofproto);
timer_set_duration(&ofproto->next_expiration, delay);
}
}
-static void
-handle_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static int
+handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
{
- switch (upcall->type) {
- case DPIF_UC_ACTION:
- handle_userspace_upcall(ofproto, upcall);
- break;
+ struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+ int n_misses;
+ int i;
- case DPIF_UC_MISS:
- /* The caller handles these. */
- NOT_REACHED();
+ assert (max_batch <= FLOW_MISS_MAX_BATCH);
- case DPIF_N_UC_TYPES:
- default:
- VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, upcall->type);
- break;
+ n_misses = 0;
+ for (i = 0; i < max_batch; i++) {
+ struct dpif_upcall *upcall = &misses[n_misses];
+ int error;
+
+ error = dpif_recv(ofproto->dpif, upcall);
+ if (error) {
+ if (error == ENODEV && n_misses == 0) {
+ return -ENODEV;
+ }
+ break;
+ }
+
+ switch (upcall->type) {
+ case DPIF_UC_ACTION:
+ handle_userspace_upcall(ofproto, upcall);
+ break;
+
+ case DPIF_UC_MISS:
+ /* Handle it later. */
+ n_misses++;
+ break;
+
+ case DPIF_N_UC_TYPES:
+ default:
+ VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
+ upcall->type);
+ break;
+ }
}
+
+ handle_miss_upcalls(ofproto, misses, n_misses);
+
+ return i;
}
\f
/* Flow expiration. */