Fix "make dist" by adding include/linux/types.h to distribution.
[openvswitch] / ofproto / ofproto.c
index 4e49cb63cb70299a263e546e5cbec2969f99e0bf..07f7b5c64e841273f9726f6f875e72fee595b880 100644 (file)
 
 VLOG_DEFINE_THIS_MODULE(ofproto);
 
+COVERAGE_DEFINE(facet_changed_rule);
+COVERAGE_DEFINE(facet_revalidate);
+COVERAGE_DEFINE(odp_overflow);
+COVERAGE_DEFINE(ofproto_agg_request);
+COVERAGE_DEFINE(ofproto_costly_flags);
+COVERAGE_DEFINE(ofproto_ctlr_action);
+COVERAGE_DEFINE(ofproto_del_rule);
+COVERAGE_DEFINE(ofproto_error);
+COVERAGE_DEFINE(ofproto_expiration);
+COVERAGE_DEFINE(ofproto_expired);
+COVERAGE_DEFINE(ofproto_flows_req);
+COVERAGE_DEFINE(ofproto_flush);
+COVERAGE_DEFINE(ofproto_invalidated);
+COVERAGE_DEFINE(ofproto_no_packet_in);
+COVERAGE_DEFINE(ofproto_ofconn_stuck);
+COVERAGE_DEFINE(ofproto_ofp2odp);
+COVERAGE_DEFINE(ofproto_packet_in);
+COVERAGE_DEFINE(ofproto_packet_out);
+COVERAGE_DEFINE(ofproto_queue_req);
+COVERAGE_DEFINE(ofproto_recv_openflow);
+COVERAGE_DEFINE(ofproto_reinit_ports);
+COVERAGE_DEFINE(ofproto_unexpected_rule);
+COVERAGE_DEFINE(ofproto_uninstallable);
+COVERAGE_DEFINE(ofproto_update_port);
+
 #include "sflow_api.h"
 
 struct ofport {
@@ -230,7 +255,7 @@ struct ofconn {
     struct list node;           /* In struct ofproto's "all_conns" list. */
     struct rconn *rconn;        /* OpenFlow connection. */
     enum ofconn_type type;      /* Type. */
-    int flow_format;            /* One of NXFF_*. */
+    enum nx_flow_format flow_format; /* Currently selected flow format. */
 
     /* OFPT_PACKET_IN related data. */
     struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
@@ -305,6 +330,7 @@ struct ofproto {
     long long int next_in_band_update;
     struct sockaddr_in *extra_in_band_remotes;
     size_t n_extra_remotes;
+    int in_band_queue;
 
     /* Flow table. */
     struct classifier cls;
@@ -405,11 +431,14 @@ ofproto_create(const char *datapath, const char *datapath_type,
 
     /* Initialize submodules. */
     p->switch_status = switch_status_create(p);
-    p->in_band = NULL;
     p->fail_open = NULL;
     p->netflow = NULL;
     p->sflow = NULL;
 
+    /* Initialize in-band control. */
+    p->in_band = NULL;
+    p->in_band_queue = -1;
+
     /* Initialize flow table. */
     classifier_init(&p->cls);
     p->next_expiration = time_msec() + 1000;
@@ -600,6 +629,7 @@ update_in_band_remotes(struct ofproto *ofproto)
         if (ofproto->in_band) {
             in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
         }
+        in_band_set_queue(ofproto->in_band, ofproto->in_band_queue);
         ofproto->next_in_band_update = time_msec() + 1000;
     } else {
         in_band_destroy(ofproto->in_band);
@@ -776,6 +806,18 @@ ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
     update_in_band_remotes(ofproto);
 }
 
+/* Sets the OpenFlow queue used by flows set up by in-band control on
+ * 'ofproto' to 'queue_id'.  If 'queue_id' is negative, then in-band control
+ * flows will use the default queue. */
+void
+ofproto_set_in_band_queue(struct ofproto *ofproto, int queue_id)
+{
+    if (queue_id != ofproto->in_band_queue) {
+        ofproto->in_band_queue = queue_id;
+        update_in_band_remotes(ofproto);
+    }
+}
+
 void
 ofproto_set_desc(struct ofproto *p,
                  const char *mfr_desc, const char *hw_desc,
@@ -1274,7 +1316,7 @@ int
 ofproto_port_del(struct ofproto *ofproto, uint16_t odp_port)
 {
     struct ofport *ofport = get_port(ofproto, odp_port);
-    const char *name = ofport ? (char *) ofport->opp.name : "<unknown>";
+    const char *name = ofport ? ofport->opp.name : "<unknown>";
     int error;
 
     error = dpif_port_del(ofproto->dpif, odp_port);
@@ -1352,19 +1394,12 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     }
 }
 
-static void
-destroy_rule(struct cls_rule *rule_, void *ofproto_)
-{
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct ofproto *ofproto = ofproto_;
-
-    rule_remove(ofproto, rule);
-}
-
 void
 ofproto_flush_flows(struct ofproto *ofproto)
 {
     struct facet *facet, *next_facet;
+    struct rule *rule, *next_rule;
+    struct cls_cursor cursor;
 
     COVERAGE_INC(ofproto_flush);
 
@@ -1376,7 +1411,12 @@ ofproto_flush_flows(struct ofproto *ofproto)
         facet->installed = false;
         facet_remove(ofproto, facet);
     }
-    classifier_for_each(&ofproto->cls, destroy_rule, ofproto);
+
+    cls_cursor_init(&cursor, &ofproto->cls, NULL);
+    CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
+        rule_remove(ofproto, rule);
+    }
+
     dpif_flow_flush(ofproto->dpif);
     if (ofproto->in_band) {
         in_band_flushed(ofproto->in_band);
@@ -1399,7 +1439,7 @@ reinit_ports(struct ofproto *p)
 
     svec_init(&devnames);
     HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
-        svec_add (&devnames, (char *) ofport->opp.name);
+        svec_add (&devnames, ofport->opp.name);
     }
     dpif_port_list(p->dpif, &odp_ports, &n_odp_ports);
     for (i = 0; i < n_odp_ports; i++) {
@@ -1425,6 +1465,7 @@ make_ofport(const struct odp_port *odp_port)
 
     memset(&netdev_options, 0, sizeof netdev_options);
     netdev_options.name = odp_port->devname;
+    netdev_options.type = odp_port->type;
     netdev_options.ethertype = NETDEV_ETH_TYPE_NONE;
 
     error = netdev_open(&netdev_options, &netdev);
@@ -1481,7 +1522,7 @@ ofport_equal(const struct ofport *a_, const struct ofport *b_)
     BUILD_ASSERT_DECL(sizeof *a == 48); /* Detect ofp_phy_port changes. */
     return (a->port_no == b->port_no
             && !memcmp(a->hw_addr, b->hw_addr, sizeof a->hw_addr)
-            && !strcmp((char *) a->name, (char *) b->name)
+            && !strcmp(a->name, b->name)
             && a->state == b->state
             && a->config == b->config
             && a->curr == b->curr
@@ -1500,7 +1541,10 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
         struct ofp_port_status *ops;
         struct ofpbuf *b;
 
-        if (!ofconn_receives_async_msgs(ofconn)) {
+        /* Primary controllers, even slaves, should always get port status
+           updates.  Otherwise obey ofconn_receives_async_msgs(). */
+        if (ofconn->type != OFCONN_PRIMARY
+            && !ofconn_receives_async_msgs(ofconn)) {
             continue;
         }
 
@@ -1515,7 +1559,7 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
 static void
 ofport_install(struct ofproto *p, struct ofport *ofport)
 {
-    const char *netdev_name = (const char *) ofport->opp.name;
+    const char *netdev_name = ofport->opp.name;
 
     netdev_monitor_add(p->netdev_monitor, ofport->netdev);
     hmap_insert(&p->ports, &ofport->hmap_node, hash_int(ofport->odp_port, 0));
@@ -1531,7 +1575,7 @@ ofport_remove(struct ofproto *p, struct ofport *ofport)
     netdev_monitor_remove(p->netdev_monitor, ofport->netdev);
     hmap_remove(&p->ports, &ofport->hmap_node);
     shash_delete(&p->port_by_name,
-                 shash_find(&p->port_by_name, (char *) ofport->opp.name));
+                 shash_find(&p->port_by_name, ofport->opp.name));
     if (p->sflow) {
         ofproto_sflow_del_port(p->sflow, ofport->odp_port);
     }
@@ -1603,9 +1647,9 @@ update_port(struct ofproto *p, const char *devname)
         return;
     } else if (old_ofport && new_ofport) {
         /* Most of the 'config' bits are OpenFlow soft state, but
-         * OFPPC_PORT_DOWN is maintained the kernel.  So transfer the OpenFlow
-         * bits from old_ofport.  (make_ofport() only sets OFPPC_PORT_DOWN and
-         * leaves the other bits 0.)  */
+         * OFPPC_PORT_DOWN is maintained by the kernel.  So transfer the
+         * OpenFlow bits from old_ofport.  (make_ofport() only sets
+         * OFPPC_PORT_DOWN and leaves the other bits 0.)  */
         new_ofport->opp.config |= old_ofport->opp.config & ~OFPPC_PORT_DOWN;
 
         if (ofport_equal(old_ofport, new_ofport)) {
@@ -2435,15 +2479,14 @@ hton_ofp_phy_port(struct ofp_phy_port *opp)
 }
 
 static int
-handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh)
+handle_echo_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct ofp_header *rq = oh;
-    queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter);
+    queue_tx(make_echo_reply(oh), ofconn, ofconn->reply_counter);
     return 0;
 }
 
 static int
-handle_features_request(struct ofconn *ofconn, struct ofp_header *oh)
+handle_features_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofp_switch_features *osf;
     struct ofpbuf *buf;
@@ -2477,7 +2520,7 @@ handle_features_request(struct ofconn *ofconn, struct ofp_header *oh)
 }
 
 static int
-handle_get_config_request(struct ofconn *ofconn, struct ofp_header *oh)
+handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofpbuf *buf;
     struct ofp_switch_config *osc;
@@ -2498,16 +2541,9 @@ handle_get_config_request(struct ofconn *ofconn, struct ofp_header *oh)
 }
 
 static int
-handle_set_config(struct ofconn *ofconn, struct ofp_switch_config *osc)
+handle_set_config(struct ofconn *ofconn, const struct ofp_switch_config *osc)
 {
-    uint16_t flags;
-    int error;
-
-    error = check_ofp_message(&osc->header, OFPT_SET_CONFIG, sizeof *osc);
-    if (error) {
-        return error;
-    }
-    flags = ntohs(osc->flags);
+    uint16_t flags = ntohs(osc->flags);
 
     if (ofconn->type == OFCONN_PRIMARY && ofconn->role != NX_ROLE_SLAVE) {
         switch (flags & OFPC_FRAG_MASK) {
@@ -2770,16 +2806,12 @@ xlate_set_queue_action(struct action_xlate_ctx *ctx,
 static void
 xlate_set_dl_tci(struct action_xlate_ctx *ctx)
 {
-    ovs_be16 dl_vlan = ctx->flow.dl_vlan;
-    uint8_t dl_vlan_pcp = ctx->flow.dl_vlan_pcp;
-
-    if (dl_vlan == htons(OFP_VLAN_NONE)) {
+    ovs_be16 tci = ctx->flow.vlan_tci;
+    if (!(tci & htons(VLAN_CFI))) {
         odp_actions_add(ctx->out, ODPAT_STRIP_VLAN);
     } else {
         union odp_action *oa = odp_actions_add(ctx->out, ODPAT_SET_DL_TCI);
-        oa->dl_tci.tci = htons(ntohs(dl_vlan & htons(VLAN_VID_MASK))
-                               | (dl_vlan_pcp << VLAN_PCP_SHIFT)
-                               | VLAN_CFI);
+        oa->dl_tci.tci = tci & ~htons(VLAN_CFI);
     }
 }
 
@@ -2787,12 +2819,11 @@ static void
 xlate_reg_move_action(struct action_xlate_ctx *ctx,
                       const struct nx_action_reg_move *narm)
 {
-    ovs_be16 old_vlan = ctx->flow.dl_vlan;
-    uint8_t old_pcp = ctx->flow.dl_vlan_pcp;
+    ovs_be16 old_tci = ctx->flow.vlan_tci;
 
     nxm_execute_reg_move(narm, &ctx->flow);
 
-    if (ctx->flow.dl_vlan != old_vlan || ctx->flow.dl_vlan_pcp != old_pcp) {
+    if (ctx->flow.vlan_tci != old_tci) {
         xlate_set_dl_tci(ctx);
     }
 }
@@ -2842,6 +2873,9 @@ xlate_nicira_action(struct action_xlate_ctx *ctx,
     case NXAST_REG_LOAD:
         nxm_execute_reg_load((const struct nx_action_reg_load *) nah,
                              &ctx->flow);
+
+    case NXAST_NOTE:
+        /* Nothing to do. */
         break;
 
     /* If you add a new action here that modifies flow data, don't forget to
@@ -2879,18 +2913,20 @@ do_xlate_actions(const union ofp_action *in, size_t n_in,
             break;
 
         case OFPAT_SET_VLAN_VID:
-            ctx->flow.dl_vlan = ia->vlan_vid.vlan_vid;
+            ctx->flow.vlan_tci &= ~htons(VLAN_VID_MASK);
+            ctx->flow.vlan_tci |= ia->vlan_vid.vlan_vid | htons(VLAN_CFI);
             xlate_set_dl_tci(ctx);
             break;
 
         case OFPAT_SET_VLAN_PCP:
-            ctx->flow.dl_vlan_pcp = ia->vlan_pcp.vlan_pcp;
+            ctx->flow.vlan_tci &= ~htons(VLAN_PCP_MASK);
+            ctx->flow.vlan_tci |= htons(
+                (ia->vlan_pcp.vlan_pcp << VLAN_PCP_SHIFT) | VLAN_CFI);
             xlate_set_dl_tci(ctx);
             break;
 
         case OFPAT_STRIP_VLAN:
-            ctx->flow.dl_vlan = htons(OFP_VLAN_NONE);
-            ctx->flow.dl_vlan_pcp = 0;
+            ctx->flow.vlan_tci = htons(0);
             xlate_set_dl_tci(ctx);
             break;
 
@@ -3015,7 +3051,7 @@ reject_slave_controller(struct ofconn *ofconn, const const char *msg_type)
 }
 
 static int
-handle_packet_out(struct ofconn *ofconn, struct ofp_header *oh)
+handle_packet_out(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn->ofproto;
     struct ofp_packet_out *opo;
@@ -3036,7 +3072,7 @@ handle_packet_out(struct ofconn *ofconn, struct ofp_header *oh)
     }
 
     /* Get ofp_packet_out. */
-    request.data = oh;
+    request.data = (void *) oh;
     request.size = ntohs(oh->length);
     opo = ofpbuf_try_pull(&request, offsetof(struct ofp_packet_out, actions));
     if (!opo) {
@@ -3110,10 +3146,10 @@ update_port_config(struct ofproto *p, struct ofport *port,
 }
 
 static int
-handle_port_mod(struct ofconn *ofconn, struct ofp_header *oh)
+handle_port_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn->ofproto;
-    const struct ofp_port_mod *opm;
+    const struct ofp_port_mod *opm = (const struct ofp_port_mod *) oh;
     struct ofport *port;
     int error;
 
@@ -3121,11 +3157,6 @@ handle_port_mod(struct ofconn *ofconn, struct ofp_header *oh)
     if (error) {
         return error;
     }
-    error = check_ofp_message(oh, OFPT_PORT_MOD, sizeof *opm);
-    if (error) {
-        return error;
-    }
-    opm = (struct ofp_port_mod *) oh;
 
     port = get_port(p, ofp_port_to_odp_port(ntohs(opm->port_no)));
     if (!port) {
@@ -3155,9 +3186,11 @@ make_ofp_stats_reply(ovs_be32 xid, ovs_be16 type, size_t body_len)
 }
 
 static struct ofpbuf *
-start_ofp_stats_reply(const struct ofp_stats_request *request, size_t body_len)
+start_ofp_stats_reply(const struct ofp_header *request, size_t body_len)
 {
-    return make_ofp_stats_reply(request->header.xid, request->type, body_len);
+    const struct ofp_stats_request *osr
+        = (const struct ofp_stats_request *) request;
+    return make_ofp_stats_reply(osr->header.xid, osr->type, body_len);
 }
 
 static void *
@@ -3213,7 +3246,7 @@ append_nxstats_reply(size_t nbytes, struct ofconn *ofconn,
 
 static int
 handle_desc_stats_request(struct ofconn *ofconn,
-                          struct ofp_stats_request *request)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn->ofproto;
     struct ofp_desc_stats *ods;
@@ -3234,7 +3267,7 @@ handle_desc_stats_request(struct ofconn *ofconn,
 
 static int
 handle_table_stats_request(struct ofconn *ofconn,
-                           struct ofp_stats_request *request)
+                           const struct ofp_header *request)
 {
     struct ofproto *p = ofconn->ofproto;
     struct ofp_table_stats *ots;
@@ -3287,21 +3320,15 @@ append_port_stat(struct ofport *port, struct ofconn *ofconn,
 }
 
 static int
-handle_port_stats_request(struct ofconn *ofconn, struct ofp_stats_request *osr,
-                          size_t arg_size)
+handle_port_stats_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn->ofproto;
-    struct ofp_port_stats_request *psr;
+    const struct ofp_port_stats_request *psr = ofputil_stats_body(oh);
     struct ofp_port_stats *ops;
     struct ofpbuf *msg;
     struct ofport *port;
 
-    if (arg_size != sizeof *psr) {
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-    psr = (struct ofp_port_stats_request *) osr->body;
-
-    msg = start_ofp_stats_reply(osr, sizeof *ops * 16);
+    msg = start_ofp_stats_reply(oh, sizeof *ops * 16);
     if (psr->port_no != htons(OFPP_NONE)) {
         port = get_port(p, ofp_port_to_odp_port(ntohs(psr->port_no)));
         if (port) {
@@ -3317,12 +3344,6 @@ handle_port_stats_request(struct ofconn *ofconn, struct ofp_stats_request *osr,
     return 0;
 }
 
-struct flow_stats_cbdata {
-    struct ofconn *ofconn;
-    ovs_be16 out_port;
-    struct ofpbuf *msg;
-};
-
 /* Obtains statistic counters for 'rule' within 'p' and stores them into
  * '*packet_countp' and '*byte_countp'.  The returned statistics include
  * statistics for all of 'rule''s facets. */
@@ -3381,29 +3402,27 @@ calc_flow_duration(long long int start, ovs_be32 *sec, ovs_be32 *nsec)
 }
 
 static void
-flow_stats_cb(struct cls_rule *rule_, void *cbdata_)
+put_ofp_flow_stats(struct ofconn *ofconn, struct rule *rule,
+                   ovs_be16 out_port, struct ofpbuf **replyp)
 {
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct flow_stats_cbdata *cbdata = cbdata_;
     struct ofp_flow_stats *ofs;
     uint64_t packet_count, byte_count;
     size_t act_len, len;
 
-    if (rule_is_hidden(rule) || !rule_has_out_port(rule, cbdata->out_port)) {
+    if (rule_is_hidden(rule) || !rule_has_out_port(rule, out_port)) {
         return;
     }
 
     act_len = sizeof *rule->actions * rule->n_actions;
     len = offsetof(struct ofp_flow_stats, actions) + act_len;
 
-    query_stats(cbdata->ofconn->ofproto, rule, &packet_count, &byte_count);
+    query_stats(ofconn->ofproto, rule, &packet_count, &byte_count);
 
-    ofs = append_ofp_stats_reply(len, cbdata->ofconn, &cbdata->msg);
+    ofs = append_ofp_stats_reply(len, ofconn, replyp);
     ofs->length = htons(len);
     ofs->table_id = 0;
     ofs->pad = 0;
-    flow_to_match(&rule->cr.flow, rule->cr.wc.wildcards,
-                  cbdata->ofconn->flow_format, &ofs->match);
+    ofputil_cls_rule_to_match(&rule->cr, ofconn->flow_format, &ofs->match);
     calc_flow_duration(rule->created, &ofs->duration_sec, &ofs->duration_nsec);
     ofs->cookie = rule->flow_cookie;
     ofs->priority = htons(rule->cr.priority);
@@ -3424,54 +3443,52 @@ is_valid_table(uint8_t table_id)
 }
 
 static int
-handle_flow_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_request *osr, size_t arg_size)
+handle_flow_stats_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct ofp_flow_stats_request *fsr;
-    struct flow_stats_cbdata cbdata;
-
-    if (arg_size != sizeof *fsr) {
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-    fsr = (struct ofp_flow_stats_request *) osr->body;
+    const struct ofp_flow_stats_request *fsr = ofputil_stats_body(oh);
+    struct ofpbuf *reply;
 
     COVERAGE_INC(ofproto_flows_req);
-    cbdata.msg = start_ofp_stats_reply(osr, 1024);
+    reply = start_ofp_stats_reply(oh, 1024);
     if (is_valid_table(fsr->table_id)) {
+        struct cls_cursor cursor;
         struct cls_rule target;
+        struct rule *rule;
 
-        cbdata.ofconn = ofconn;
-        cbdata.out_port = fsr->out_port;
-        cls_rule_from_match(&fsr->match, 0, NXFF_OPENFLOW10, 0, &target);
-        classifier_for_each_match(&ofconn->ofproto->cls, &target,
-                                  flow_stats_cb, &cbdata);
+        ofputil_cls_rule_from_match(&fsr->match, 0, NXFF_OPENFLOW10, 0,
+                                    &target);
+        cls_cursor_init(&cursor, &ofconn->ofproto->cls, &target);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            put_ofp_flow_stats(ofconn, rule, fsr->out_port, &reply);
+        }
     }
+    queue_tx(reply, ofconn, ofconn->reply_counter);
 
-    queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
     return 0;
 }
 
 static void
-nx_flow_stats_cb(struct cls_rule *rule_, void *cbdata_)
+put_nx_flow_stats(struct ofconn *ofconn, struct rule *rule,
+                  ovs_be16 out_port, struct ofpbuf **replyp)
 {
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct flow_stats_cbdata *cbdata = cbdata_;
     struct nx_flow_stats *nfs;
     uint64_t packet_count, byte_count;
     size_t act_len, start_len;
+    struct ofpbuf *reply;
 
-    if (rule_is_hidden(rule) || !rule_has_out_port(rule, cbdata->out_port)) {
+    if (rule_is_hidden(rule) || !rule_has_out_port(rule, out_port)) {
         return;
     }
 
-    query_stats(cbdata->ofconn->ofproto, rule, &packet_count, &byte_count);
+    query_stats(ofconn->ofproto, rule, &packet_count, &byte_count);
 
     act_len = sizeof *rule->actions * rule->n_actions;
 
-    start_len = cbdata->msg->size;
-    append_nxstats_reply(sizeof *nfs + NXM_MAX_LEN + act_len,
-                         cbdata->ofconn, &cbdata->msg);
-    nfs = ofpbuf_put_uninit(cbdata->msg, sizeof *nfs);
+    start_len = (*replyp)->size;
+    append_nxstats_reply(sizeof *nfs + NXM_MAX_LEN + act_len, ofconn, replyp);
+    reply = *replyp;
+
+    nfs = ofpbuf_put_uninit(reply, sizeof *nfs);
     nfs->table_id = 0;
     nfs->pad = 0;
     calc_flow_duration(rule->created, &nfs->duration_sec, &nfs->duration_nsec);
@@ -3479,64 +3496,66 @@ nx_flow_stats_cb(struct cls_rule *rule_, void *cbdata_)
     nfs->priority = htons(rule->cr.priority);
     nfs->idle_timeout = htons(rule->idle_timeout);
     nfs->hard_timeout = htons(rule->hard_timeout);
-    nfs->match_len = htons(nx_put_match(cbdata->msg, &rule->cr));
+    nfs->match_len = htons(nx_put_match(reply, &rule->cr));
     memset(nfs->pad2, 0, sizeof nfs->pad2);
     nfs->packet_count = htonll(packet_count);
     nfs->byte_count = htonll(byte_count);
     if (rule->n_actions > 0) {
-        ofpbuf_put(cbdata->msg, rule->actions, act_len);
+        ofpbuf_put(reply, rule->actions, act_len);
     }
-    nfs->length = htons(cbdata->msg->size - start_len);
+    nfs->length = htons(reply->size - start_len);
 }
 
 static int
-handle_nxst_flow(struct ofconn *ofconn, struct ofpbuf *b)
+handle_nxst_flow(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct nx_flow_stats_request *nfsr;
-    struct flow_stats_cbdata cbdata;
     struct cls_rule target;
+    struct ofpbuf *reply;
+    struct ofpbuf b;
     int error;
 
+    b.data = (void *) oh;
+    b.size = ntohs(oh->length);
+
     /* Dissect the message. */
-    nfsr = ofpbuf_try_pull(b, sizeof *nfsr);
+    nfsr = ofpbuf_try_pull(&b, sizeof *nfsr);
     if (!nfsr) {
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
     }
-    error = nx_pull_match(b, ntohs(nfsr->match_len), 0, &target);
+    error = nx_pull_match(&b, ntohs(nfsr->match_len), 0, &target);
     if (error) {
         return error;
     }
+    if (b.size) {
+        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
+    }
 
     COVERAGE_INC(ofproto_flows_req);
-    cbdata.msg = start_nxstats_reply(&nfsr->nsm, 1024);
+    reply = start_nxstats_reply(&nfsr->nsm, 1024);
     if (is_valid_table(nfsr->table_id)) {
-        cbdata.ofconn = ofconn;
-        cbdata.out_port = nfsr->out_port;
-        classifier_for_each_match(&ofconn->ofproto->cls, &target,
-                                  nx_flow_stats_cb, &cbdata);
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &ofconn->ofproto->cls, &target);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            put_nx_flow_stats(ofconn, rule, nfsr->out_port, &reply);
+        }
     }
-    queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
+    queue_tx(reply, ofconn, ofconn->reply_counter);
+
     return 0;
 }
 
-struct flow_stats_ds_cbdata {
-    struct ofproto *ofproto;
-    struct ds *results;
-};
-
 static void
-flow_stats_ds_cb(struct cls_rule *rule_, void *cbdata_)
+flow_stats_ds(struct ofproto *ofproto, struct rule *rule, struct ds *results)
 {
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct flow_stats_ds_cbdata *cbdata = cbdata_;
-    struct ds *results = cbdata->results;
     struct ofp_match match;
     uint64_t packet_count, byte_count;
     size_t act_len = sizeof *rule->actions * rule->n_actions;
 
-    query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
-    flow_to_match(&rule->cr.flow, rule->cr.wc.wildcards,
-                  NXFF_OPENFLOW10, &match);
+    query_stats(ofproto, rule, &packet_count, &byte_count);
+    ofputil_cls_rule_to_match(&rule->cr, NXFF_OPENFLOW10, &match);
 
     ds_put_format(results, "duration=%llds, ",
                   (time_msec() - rule->created) / 1000);
@@ -3557,44 +3576,13 @@ flow_stats_ds_cb(struct cls_rule *rule_, void *cbdata_)
 void
 ofproto_get_all_flows(struct ofproto *p, struct ds *results)
 {
-    struct ofp_match match;
-    struct cls_rule target;
-    struct flow_stats_ds_cbdata cbdata;
-
-    memset(&match, 0, sizeof match);
-    match.wildcards = htonl(OVSFW_ALL);
-
-    cbdata.ofproto = p;
-    cbdata.results = results;
-
-    cls_rule_from_match(&match, 0, NXFF_OPENFLOW10, 0, &target);
-    classifier_for_each_match(&p->cls, &target, flow_stats_ds_cb, &cbdata);
-}
-
-struct aggregate_stats_cbdata {
-    struct ofproto *ofproto;
-    ovs_be16 out_port;
-    uint64_t packet_count;
-    uint64_t byte_count;
-    uint32_t n_flows;
-};
-
-static void
-aggregate_stats_cb(struct cls_rule *rule_, void *cbdata_)
-{
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct aggregate_stats_cbdata *cbdata = cbdata_;
-    uint64_t packet_count, byte_count;
+    struct cls_cursor cursor;
+    struct rule *rule;
 
-    if (rule_is_hidden(rule) || !rule_has_out_port(rule, cbdata->out_port)) {
-        return;
+    cls_cursor_init(&cursor, &p->cls, NULL);
+    CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+        flow_stats_ds(p, rule, results);
     }
-
-    query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
-
-    cbdata->packet_count += packet_count;
-    cbdata->byte_count += byte_count;
-    cbdata->n_flows++;
 }
 
 static void
@@ -3602,44 +3590,50 @@ query_aggregate_stats(struct ofproto *ofproto, struct cls_rule *target,
                       ovs_be16 out_port, uint8_t table_id,
                       struct ofp_aggregate_stats_reply *oasr)
 {
-    struct aggregate_stats_cbdata cbdata;
+    uint64_t total_packets = 0;
+    uint64_t total_bytes = 0;
+    int n_flows = 0;
 
     COVERAGE_INC(ofproto_agg_request);
-    cbdata.packet_count = 0;
-    cbdata.byte_count = 0;
-    cbdata.n_flows = 0;
+
     if (is_valid_table(table_id)) {
-        cbdata.ofproto = ofproto;
-        cbdata.out_port = out_port;
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &ofproto->cls, target);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)) {
+                uint64_t packet_count;
+                uint64_t byte_count;
+
+                query_stats(ofproto, rule, &packet_count, &byte_count);
 
-        classifier_for_each_match(&ofproto->cls, target,
-                                  aggregate_stats_cb, &cbdata);
+                total_packets += packet_count;
+                total_bytes += byte_count;
+                n_flows++;
+            }
+        }
     }
 
-    oasr->flow_count = htonl(cbdata.n_flows);
-    oasr->packet_count = htonll(cbdata.packet_count);
-    oasr->byte_count = htonll(cbdata.byte_count);
+    oasr->flow_count = htonl(n_flows);
+    oasr->packet_count = htonll(total_packets);
+    oasr->byte_count = htonll(total_bytes);
     memset(oasr->pad, 0, sizeof oasr->pad);
 }
 
 static int
 handle_aggregate_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_request *osr,
-                               size_t arg_size)
+                               const struct ofp_header *oh)
 {
-    struct ofp_aggregate_stats_request *request;
+    const struct ofp_aggregate_stats_request *request = ofputil_stats_body(oh);
     struct ofp_aggregate_stats_reply *reply;
     struct cls_rule target;
     struct ofpbuf *msg;
 
-    if (arg_size != sizeof *request) {
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-    request = (struct ofp_aggregate_stats_request *) osr->body;
-
-    cls_rule_from_match(&request->match, 0, NXFF_OPENFLOW10, 0, &target);
+    ofputil_cls_rule_from_match(&request->match, 0, NXFF_OPENFLOW10, 0,
+                                &target);
 
-    msg = start_ofp_stats_reply(osr, sizeof *reply);
+    msg = start_ofp_stats_reply(oh, sizeof *reply);
     reply = append_ofp_stats_reply(sizeof *reply, ofconn, &msg);
     query_aggregate_stats(ofconn->ofproto, &target, request->out_port,
                           request->table_id, reply);
@@ -3648,23 +3642,30 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
 }
 
 static int
-handle_nxst_aggregate(struct ofconn *ofconn, struct ofpbuf *b)
+handle_nxst_aggregate(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct nx_aggregate_stats_request *request;
     struct ofp_aggregate_stats_reply *reply;
     struct cls_rule target;
+    struct ofpbuf b;
     struct ofpbuf *buf;
     int error;
 
+    b.data = (void *) oh;
+    b.size = ntohs(oh->length);
+
     /* Dissect the message. */
-    request = ofpbuf_try_pull(b, sizeof *request);
+    request = ofpbuf_try_pull(&b, sizeof *request);
     if (!request) {
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
     }
-    error = nx_pull_match(b, ntohs(request->match_len), 0, &target);
+    error = nx_pull_match(&b, ntohs(request->match_len), 0, &target);
     if (error) {
         return error;
     }
+    if (b.size) {
+        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
+    }
 
     /* Reply. */
     COVERAGE_INC(ofproto_flows_req);
@@ -3726,26 +3727,24 @@ handle_queue_stats_for_port(struct ofport *port, uint32_t queue_id,
 }
 
 static int
-handle_queue_stats_request(struct ofconn *ofconn,
-                           const struct ofp_stats_request *osr,
-                           size_t arg_size)
+handle_queue_stats_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *ofproto = ofconn->ofproto;
-    struct ofp_queue_stats_request *qsr;
+    const struct ofp_queue_stats_request *qsr;
     struct queue_stats_cbdata cbdata;
     struct ofport *port;
     unsigned int port_no;
     uint32_t queue_id;
 
-    if (arg_size != sizeof *qsr) {
+    qsr = ofputil_stats_body(oh);
+    if (!qsr) {
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
     }
-    qsr = (struct ofp_queue_stats_request *) osr->body;
 
     COVERAGE_INC(ofproto_queue_req);
 
     cbdata.ofconn = ofconn;
-    cbdata.msg = start_ofp_stats_reply(osr, 128);
+    cbdata.msg = start_ofp_stats_reply(oh, 128);
 
     port_no = ntohs(qsr->port_no);
     queue_id = ntohl(qsr->queue_id);
@@ -3767,85 +3766,6 @@ handle_queue_stats_request(struct ofconn *ofconn,
     return 0;
 }
 
-static int
-handle_vendor_stats_request(struct ofconn *ofconn,
-                            struct ofp_stats_request *osr, size_t arg_size)
-{
-    struct nicira_stats_msg *nsm;
-    struct ofpbuf b;
-    ovs_be32 vendor;
-
-    if (arg_size < 4) {
-        VLOG_WARN_RL(&rl, "truncated vendor stats request body");
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-
-    memcpy(&vendor, osr->body, sizeof vendor);
-    if (vendor != htonl(NX_VENDOR_ID)) {
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
-    }
-
-    if (ntohs(osr->header.length) < sizeof(struct nicira_stats_msg)) {
-        VLOG_WARN_RL(&rl, "truncated Nicira stats request");
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-
-    nsm = (struct nicira_stats_msg *) osr;
-    b.data = nsm;
-    b.size = ntohs(nsm->header.length);
-    switch (ntohl(nsm->subtype)) {
-    case NXST_FLOW:
-        return handle_nxst_flow(ofconn, &b);
-
-    case NXST_AGGREGATE:
-        return handle_nxst_aggregate(ofconn, &b);
-
-    default:
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_SUBTYPE);
-    }
-}
-
-static int
-handle_stats_request(struct ofconn *ofconn, struct ofp_header *oh)
-{
-    struct ofp_stats_request *osr;
-    size_t arg_size;
-    int error;
-
-    error = check_ofp_message_array(oh, OFPT_STATS_REQUEST, sizeof *osr,
-                                    1, &arg_size);
-    if (error) {
-        return error;
-    }
-    osr = (struct ofp_stats_request *) oh;
-
-    switch (ntohs(osr->type)) {
-    case OFPST_DESC:
-        return handle_desc_stats_request(ofconn, osr);
-
-    case OFPST_FLOW:
-        return handle_flow_stats_request(ofconn, osr, arg_size);
-
-    case OFPST_AGGREGATE:
-        return handle_aggregate_stats_request(ofconn, osr, arg_size);
-
-    case OFPST_TABLE:
-        return handle_table_stats_request(ofconn, osr);
-
-    case OFPST_PORT:
-        return handle_port_stats_request(ofconn, osr, arg_size);
-
-    case OFPST_QUEUE:
-        return handle_queue_stats_request(ofconn, osr, arg_size);
-
-    case OFPST_VENDOR:
-        return handle_vendor_stats_request(ofconn, osr, arg_size);
-
-    default:
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_STAT);
-    }
-}
-
 static long long int
 msec_from_nsec(uint64_t sec, uint32_t nsec)
 {
@@ -3977,7 +3897,6 @@ struct modify_flows_cbdata {
 
 static int modify_flow(struct ofproto *, const struct flow_mod *,
                        struct rule *);
-static void modify_flows_cb(struct cls_rule *, void *cbdata_);
 
 /* Implements OFPFC_MODIFY.  Returns 0 on success or an OpenFlow error code as
  * encoded by ofp_mkerr() on failure.
@@ -3987,19 +3906,24 @@ static void modify_flows_cb(struct cls_rule *, void *cbdata_);
 static int
 modify_flows_loose(struct ofconn *ofconn, struct flow_mod *fm)
 {
-    struct modify_flows_cbdata cbdata;
+    struct ofproto *p = ofconn->ofproto;
+    struct rule *match = NULL;
+    struct cls_cursor cursor;
+    struct rule *rule;
 
-    cbdata.ofproto = ofconn->ofproto;
-    cbdata.fm = fm;
-    cbdata.match = NULL;
+    cls_cursor_init(&cursor, &p->cls, &fm->cr);
+    CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+        if (!rule_is_hidden(rule)) {
+            match = rule;
+            modify_flow(p, fm, rule);
+        }
+    }
 
-    classifier_for_each_match(&ofconn->ofproto->cls, &fm->cr,
-                              modify_flows_cb, &cbdata);
-    if (cbdata.match) {
-        /* This credits the packet to whichever flow happened to happened to
-         * match last.  That's weird.  Maybe we should do a lookup for the
-         * flow that actually matches the packet?  Who knows. */
-        send_buffered_packet(ofconn, cbdata.match, fm->buffer_id);
+    if (match) {
+        /* This credits the packet to whichever flow happened to match last.
+         * That's weird.  Maybe we should do a lookup for the flow that
+         * actually matches the packet?  Who knows. */
+        send_buffered_packet(ofconn, match, fm->buffer_id);
         return 0;
     } else {
         return add_flow(ofconn, fm);
@@ -4024,19 +3948,6 @@ modify_flow_strict(struct ofconn *ofconn, struct flow_mod *fm)
     }
 }
 
-/* Callback for modify_flows_loose(). */
-static void
-modify_flows_cb(struct cls_rule *rule_, void *cbdata_)
-{
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct modify_flows_cbdata *cbdata = cbdata_;
-
-    if (!rule_is_hidden(rule)) {
-        cbdata->match = rule;
-        modify_flow(cbdata->ofproto, cbdata->fm, rule);
-    }
-}
-
 /* Implements core of OFPFC_MODIFY and OFPFC_MODIFY_STRICT where 'rule' has
  * been identified as a flow in 'p''s flow table to be modified, by changing
  * the rule's actions to match those in 'ofm' (which is followed by 'n_actions'
@@ -4067,24 +3978,19 @@ modify_flow(struct ofproto *p, const struct flow_mod *fm, struct rule *rule)
 \f
 /* OFPFC_DELETE implementation. */
 
-struct delete_flows_cbdata {
-    struct ofproto *ofproto;
-    ovs_be16 out_port;
-};
-
-static void delete_flows_cb(struct cls_rule *, void *cbdata_);
 static void delete_flow(struct ofproto *, struct rule *, ovs_be16 out_port);
 
 /* Implements OFPFC_DELETE. */
 static void
 delete_flows_loose(struct ofproto *p, const struct flow_mod *fm)
 {
-    struct delete_flows_cbdata cbdata;
+    struct rule *rule, *next_rule;
+    struct cls_cursor cursor;
 
-    cbdata.ofproto = p;
-    cbdata.out_port = htons(fm->out_port);
-
-    classifier_for_each_match(&p->cls, &fm->cr, delete_flows_cb, &cbdata);
+    cls_cursor_init(&cursor, &p->cls, &fm->cr);
+    CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
+        delete_flow(p, rule, htons(fm->out_port));
+    }
 }
 
 /* Implements OFPFC_DELETE_STRICT. */
@@ -4097,16 +4003,6 @@ delete_flow_strict(struct ofproto *p, struct flow_mod *fm)
     }
 }
 
-/* Callback for delete_flows_loose(). */
-static void
-delete_flows_cb(struct cls_rule *rule_, void *cbdata_)
-{
-    struct rule *rule = rule_from_cls_rule(rule_);
-    struct delete_flows_cbdata *cbdata = cbdata_;
-
-    delete_flow(cbdata->ofproto, rule, cbdata->out_port);
-}
-
 /* Implements core of OFPFC_DELETE and OFPFC_DELETE_STRICT where 'rule' has
  * been identified as a flow to delete from 'p''s flow table, by deleting the
  * flow and sending out a OFPT_FLOW_REMOVED message to any interested
@@ -4179,7 +4075,7 @@ flow_mod_core(struct ofconn *ofconn, struct flow_mod *fm)
 }
 
 static int
-handle_ofpt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
+handle_ofpt_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofp_match orig_match;
     struct ofp_flow_mod *ofm;
@@ -4187,7 +4083,7 @@ handle_ofpt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
     struct ofpbuf b;
     int error;
 
-    b.data = oh;
+    b.data = (void *) oh;
     b.size = ntohs(oh->length);
 
     /* Dissect the message. */
@@ -4220,8 +4116,8 @@ handle_ofpt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
     }
 
     /* Translate the message. */
-    cls_rule_from_match(&ofm->match, ntohs(ofm->priority), ofconn->flow_format,
-                        ofm->cookie, &fm.cr);
+    ofputil_cls_rule_from_match(&ofm->match, ntohs(ofm->priority),
+                                ofconn->flow_format, ofm->cookie, &fm.cr);
     fm.cookie = ofm->cookie;
     fm.command = ntohs(ofm->command);
     fm.idle_timeout = ntohs(ofm->idle_timeout);
@@ -4235,14 +4131,14 @@ handle_ofpt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
 }
 
 static int
-handle_nxt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
+handle_nxt_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct nx_flow_mod *nfm;
     struct flow_mod fm;
     struct ofpbuf b;
     int error;
 
-    b.data = oh;
+    b.data = (void *) oh;
     b.size = ntohs(oh->length);
 
     /* Dissect the message. */
@@ -4274,34 +4170,23 @@ handle_nxt_flow_mod(struct ofconn *ofconn, struct ofp_header *oh)
 }
 
 static int
-handle_tun_id_from_cookie(struct ofconn *ofconn, struct nxt_tun_id_cookie *msg)
+handle_tun_id_from_cookie(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    int error;
-
-    error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
-    if (error) {
-        return error;
-    }
+    const struct nxt_tun_id_cookie *msg
+        = (const struct nxt_tun_id_cookie *) oh;
 
     ofconn->flow_format = msg->set ? NXFF_TUN_ID_FROM_COOKIE : NXFF_OPENFLOW10;
     return 0;
 }
 
 static int
-handle_role_request(struct ofconn *ofconn, struct nicira_header *msg)
+handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct nx_role_request *nrr;
+    struct nx_role_request *nrr = (struct nx_role_request *) oh;
     struct nx_role_request *reply;
     struct ofpbuf *buf;
     uint32_t role;
 
-    if (ntohs(msg->header.length) != sizeof *nrr) {
-        VLOG_WARN_RL(&rl, "received role request of length %u (expected %zu)",
-                     ntohs(msg->header.length), sizeof *nrr);
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-    nrr = (struct nx_role_request *) msg;
-
     if (ofconn->type != OFCONN_PRIMARY) {
         VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
                      "connection");
@@ -4328,8 +4213,7 @@ handle_role_request(struct ofconn *ofconn, struct nicira_header *msg)
     }
     ofconn->role = role;
 
-    reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, msg->header.xid,
-                           &buf);
+    reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
     reply->role = htonl(role);
     queue_tx(buf, ofconn, ofconn->reply_counter);
 
@@ -4337,16 +4221,11 @@ handle_role_request(struct ofconn *ofconn, struct nicira_header *msg)
 }
 
 static int
-handle_nxt_set_flow_format(struct ofconn *ofconn,
-                           struct nxt_set_flow_format *msg)
+handle_nxt_set_flow_format(struct ofconn *ofconn, const struct ofp_header *oh)
 {
+    const struct nxt_set_flow_format *msg
+        = (const struct nxt_set_flow_format *) oh;
     uint32_t format;
-    int error;
-
-    error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
-    if (error) {
-        return error;
-    }
 
     format = ntohl(msg->format);
     if (format == NXFF_OPENFLOW10
@@ -4360,52 +4239,7 @@ handle_nxt_set_flow_format(struct ofconn *ofconn,
 }
 
 static int
-handle_vendor(struct ofconn *ofconn, void *msg)
-{
-    struct ofproto *p = ofconn->ofproto;
-    struct ofp_vendor_header *ovh = msg;
-    struct nicira_header *nh;
-
-    if (ntohs(ovh->header.length) < sizeof(struct ofp_vendor_header)) {
-        VLOG_WARN_RL(&rl, "received vendor message of length %u "
-                          "(expected at least %zu)",
-                   ntohs(ovh->header.length), sizeof(struct ofp_vendor_header));
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-    if (ovh->vendor != htonl(NX_VENDOR_ID)) {
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
-    }
-    if (ntohs(ovh->header.length) < sizeof(struct nicira_header)) {
-        VLOG_WARN_RL(&rl, "received Nicira vendor message of length %u "
-                          "(expected at least %zu)",
-                     ntohs(ovh->header.length), sizeof(struct nicira_header));
-        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
-    }
-
-    nh = msg;
-    switch (ntohl(nh->subtype)) {
-    case NXT_STATUS_REQUEST:
-        return switch_status_handle_request(p->switch_status, ofconn->rconn,
-                                            msg);
-
-    case NXT_TUN_ID_FROM_COOKIE:
-        return handle_tun_id_from_cookie(ofconn, msg);
-
-    case NXT_ROLE_REQUEST:
-        return handle_role_request(ofconn, msg);
-
-    case NXT_SET_FLOW_FORMAT:
-        return handle_nxt_set_flow_format(ofconn, msg);
-
-    case NXT_FLOW_MOD:
-        return handle_nxt_flow_mod(ofconn, &ovh->header);
-    }
-
-    return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_SUBTYPE);
-}
-
-static int
-handle_barrier_request(struct ofconn *ofconn, struct ofp_header *oh)
+handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofp_header *ob;
     struct ofpbuf *buf;
@@ -4417,71 +4251,135 @@ handle_barrier_request(struct ofconn *ofconn, struct ofp_header *oh)
     return 0;
 }
 
-static void
-handle_openflow(struct ofconn *ofconn, struct ofpbuf *ofp_msg)
+static int
+handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
-    struct ofp_header *oh = ofp_msg->data;
+    const struct ofp_header *oh = msg->data;
+    const struct ofputil_msg_type *type;
     int error;
 
-    COVERAGE_INC(ofproto_recv_openflow);
-    switch (oh->type) {
-    case OFPT_ECHO_REQUEST:
-        error = handle_echo_request(ofconn, oh);
-        break;
-
-    case OFPT_ECHO_REPLY:
-        error = 0;
-        break;
+    error = ofputil_decode_msg_type(oh, &type);
+    if (error) {
+        return error;
+    }
 
-    case OFPT_FEATURES_REQUEST:
-        error = handle_features_request(ofconn, oh);
-        break;
+    switch (ofputil_msg_type_code(type)) {
+        /* OpenFlow requests. */
+    case OFPUTIL_OFPT_ECHO_REQUEST:
+        return handle_echo_request(ofconn, oh);
 
-    case OFPT_GET_CONFIG_REQUEST:
-        error = handle_get_config_request(ofconn, oh);
-        break;
+    case OFPUTIL_OFPT_FEATURES_REQUEST:
+        return handle_features_request(ofconn, oh);
 
-    case OFPT_SET_CONFIG:
-        error = handle_set_config(ofconn, ofp_msg->data);
-        break;
+    case OFPUTIL_OFPT_GET_CONFIG_REQUEST:
+        return handle_get_config_request(ofconn, oh);
 
-    case OFPT_PACKET_OUT:
-        error = handle_packet_out(ofconn, ofp_msg->data);
-        break;
+    case OFPUTIL_OFPT_SET_CONFIG:
+        return handle_set_config(ofconn, msg->data);
 
-    case OFPT_PORT_MOD:
-        error = handle_port_mod(ofconn, oh);
-        break;
+    case OFPUTIL_OFPT_PACKET_OUT:
+        return handle_packet_out(ofconn, oh);
 
-    case OFPT_FLOW_MOD:
-        error = handle_ofpt_flow_mod(ofconn, ofp_msg->data);
-        break;
+    case OFPUTIL_OFPT_PORT_MOD:
+        return handle_port_mod(ofconn, oh);
 
-    case OFPT_STATS_REQUEST:
-        error = handle_stats_request(ofconn, oh);
-        break;
+    case OFPUTIL_OFPT_FLOW_MOD:
+        return handle_ofpt_flow_mod(ofconn, oh);
 
-    case OFPT_VENDOR:
-        error = handle_vendor(ofconn, ofp_msg->data);
-        break;
+    case OFPUTIL_OFPT_BARRIER_REQUEST:
+        return handle_barrier_request(ofconn, oh);
 
-    case OFPT_BARRIER_REQUEST:
-        error = handle_barrier_request(ofconn, oh);
-        break;
+        /* OpenFlow replies. */
+    case OFPUTIL_OFPT_ECHO_REPLY:
+        return 0;
 
+        /* Nicira extension requests. */
+    case OFPUTIL_NXT_STATUS_REQUEST:
+        return switch_status_handle_request(
+            ofconn->ofproto->switch_status, ofconn->rconn, oh);
+
+    case OFPUTIL_NXT_TUN_ID_FROM_COOKIE:
+        return handle_tun_id_from_cookie(ofconn, oh);
+
+    case OFPUTIL_NXT_ROLE_REQUEST:
+        return handle_role_request(ofconn, oh);
+
+    case OFPUTIL_NXT_SET_FLOW_FORMAT:
+        return handle_nxt_set_flow_format(ofconn, oh);
+
+    case OFPUTIL_NXT_FLOW_MOD:
+        return handle_nxt_flow_mod(ofconn, oh);
+
+        /* OpenFlow statistics requests. */
+    case OFPUTIL_OFPST_DESC_REQUEST:
+        return handle_desc_stats_request(ofconn, oh);
+
+    case OFPUTIL_OFPST_FLOW_REQUEST:
+        return handle_flow_stats_request(ofconn, oh);
+
+    case OFPUTIL_OFPST_AGGREGATE_REQUEST:
+        return handle_aggregate_stats_request(ofconn, oh);
+
+    case OFPUTIL_OFPST_TABLE_REQUEST:
+        return handle_table_stats_request(ofconn, oh);
+
+    case OFPUTIL_OFPST_PORT_REQUEST:
+        return handle_port_stats_request(ofconn, oh);
+
+    case OFPUTIL_OFPST_QUEUE_REQUEST:
+        return handle_queue_stats_request(ofconn, oh);
+
+        /* Nicira extension statistics requests. */
+    case OFPUTIL_NXST_FLOW_REQUEST:
+        return handle_nxst_flow(ofconn, oh);
+
+    case OFPUTIL_NXST_AGGREGATE_REQUEST:
+        return handle_nxst_aggregate(ofconn, oh);
+
+    case OFPUTIL_INVALID:
+    case OFPUTIL_OFPT_HELLO:
+    case OFPUTIL_OFPT_ERROR:
+    case OFPUTIL_OFPT_FEATURES_REPLY:
+    case OFPUTIL_OFPT_GET_CONFIG_REPLY:
+    case OFPUTIL_OFPT_PACKET_IN:
+    case OFPUTIL_OFPT_FLOW_REMOVED:
+    case OFPUTIL_OFPT_PORT_STATUS:
+    case OFPUTIL_OFPT_BARRIER_REPLY:
+    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REQUEST:
+    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REPLY:
+    case OFPUTIL_OFPST_DESC_REPLY:
+    case OFPUTIL_OFPST_FLOW_REPLY:
+    case OFPUTIL_OFPST_QUEUE_REPLY:
+    case OFPUTIL_OFPST_PORT_REPLY:
+    case OFPUTIL_OFPST_TABLE_REPLY:
+    case OFPUTIL_OFPST_AGGREGATE_REPLY:
+    case OFPUTIL_NXT_STATUS_REPLY:
+    case OFPUTIL_NXT_ROLE_REPLY:
+    case OFPUTIL_NXT_FLOW_REMOVED:
+    case OFPUTIL_NXST_FLOW_REPLY:
+    case OFPUTIL_NXST_AGGREGATE_REPLY:
     default:
         if (VLOG_IS_WARN_ENABLED()) {
             char *s = ofp_to_string(oh, ntohs(oh->length), 2);
             VLOG_DBG_RL(&rl, "OpenFlow message ignored: %s", s);
             free(s);
         }
-        error = ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_TYPE);
-        break;
+        if (oh->type == OFPT_STATS_REQUEST || oh->type == OFPT_STATS_REPLY) {
+            return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_STAT);
+        } else {
+            return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_TYPE);
+        }
     }
+}
 
+static void
+handle_openflow(struct ofconn *ofconn, struct ofpbuf *ofp_msg)
+{
+    int error = handle_openflow__(ofconn, ofp_msg);
     if (error) {
         send_error_oh(ofconn, ofp_msg->data, error);
     }
+    COVERAGE_INC(ofproto_recv_openflow);
 }
 \f
 static void
@@ -4496,6 +4394,11 @@ handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
     payload.size = msg->length - sizeof *msg;
     flow_extract(&payload, msg->arg, msg->port, &flow);
 
+    packet->l2 = payload.l2;
+    packet->l3 = payload.l3;
+    packet->l4 = payload.l4;
+    packet->l7 = payload.l7;
+
     /* Check with in-band control to see if this packet should be sent
      * to the local port regardless of the flow table. */
     if (in_band_msg_in_hook(p->in_band, &flow, &payload)) {
@@ -4588,14 +4491,9 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
 \f
 /* Flow expiration. */
 
-struct expire_cbdata {
-    struct ofproto *ofproto;
-    int dp_max_idle;
-};
-
 static int ofproto_dp_max_idle(const struct ofproto *);
 static void ofproto_update_used(struct ofproto *);
-static void rule_expire(struct cls_rule *, void *cbdata);
+static void rule_expire(struct ofproto *, struct rule *);
 static void ofproto_expire_facets(struct ofproto *, int dp_max_idle);
 
 /* This function is called periodically by ofproto_run().  Its job is to
@@ -4607,18 +4505,22 @@ static void ofproto_expire_facets(struct ofproto *, int dp_max_idle);
 static int
 ofproto_expire(struct ofproto *ofproto)
 {
-    struct expire_cbdata cbdata;
+    struct rule *rule, *next_rule;
+    struct cls_cursor cursor;
+    int dp_max_idle;
 
     /* Update 'used' for each flow in the datapath. */
     ofproto_update_used(ofproto);
 
     /* Expire facets that have been idle too long. */
-    cbdata.dp_max_idle = ofproto_dp_max_idle(ofproto);
-    ofproto_expire_facets(ofproto, cbdata.dp_max_idle);
+    dp_max_idle = ofproto_dp_max_idle(ofproto);
+    ofproto_expire_facets(ofproto, dp_max_idle);
 
     /* Expire OpenFlow flows whose idle_timeout or hard_timeout has passed. */
-    cbdata.ofproto = ofproto;
-    classifier_for_each(&ofproto->cls, rule_expire, &cbdata);
+    cls_cursor_init(&cursor, &ofproto->cls, NULL);
+    CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
+        rule_expire(ofproto, rule);
+    }
 
     /* Let the hook know that we're at a stable point: all outstanding data
      * in existing flows has been accounted to the account_cb.  Thus, the
@@ -4628,7 +4530,7 @@ ofproto_expire(struct ofproto *ofproto)
         ofproto->ofhooks->account_checkpoint_cb(ofproto->aux);
     }
 
-    return MIN(cbdata.dp_max_idle, 1000);
+    return MIN(dp_max_idle, 1000);
 }
 
 /* Update 'used' member of installed facets. */
@@ -4807,15 +4709,11 @@ ofproto_expire_facets(struct ofproto *ofproto, int dp_max_idle)
     }
 }
 
-/* If 'cls_rule' is an OpenFlow rule, that has expired according to OpenFlow
- * rules, then delete it entirely.
- *
- * (This is a callback function for classifier_for_each().) */
+/* If 'rule' is an OpenFlow rule, that has expired according to OpenFlow rules,
+ * then delete it entirely. */
 static void
-rule_expire(struct cls_rule *cls_rule, void *cbdata_)
+rule_expire(struct ofproto *ofproto, struct rule *rule)
 {
-    struct expire_cbdata *cbdata = cbdata_;
-    struct rule *rule = rule_from_cls_rule(cls_rule);
     struct facet *facet, *next_facet;
     long long int now;
     uint8_t reason;
@@ -4837,14 +4735,14 @@ rule_expire(struct cls_rule *cls_rule, void *cbdata_)
     /* Update stats.  (This is a no-op if the rule expired due to an idle
      * timeout, because that only happens when the rule has no facets left.) */
     LIST_FOR_EACH_SAFE (facet, next_facet, list_node, &rule->facets) {
-        facet_remove(cbdata->ofproto, facet);
+        facet_remove(ofproto, facet);
     }
 
     /* Get rid of the rule. */
     if (!rule_is_hidden(rule)) {
-        rule_send_removed(cbdata->ofproto, rule, reason);
+        rule_send_removed(ofproto, rule, reason);
     }
-    rule_remove(cbdata->ofproto, rule);
+    rule_remove(ofproto, rule);
 }
 \f
 static struct ofpbuf *
@@ -4855,8 +4753,7 @@ compose_ofp_flow_removed(struct ofconn *ofconn, const struct rule *rule,
     struct ofpbuf *buf;
 
     ofr = make_openflow(sizeof *ofr, OFPT_FLOW_REMOVED, &buf);
-    flow_to_match(&rule->cr.flow, rule->cr.wc.wildcards, ofconn->flow_format,
-                  &ofr->match);
+    ofputil_cls_rule_to_match(&rule->cr, ofconn->flow_format, &ofr->match);
     ofr->cookie = rule->flow_cookie;
     ofr->priority = htons(rule->cr.priority);
     ofr->reason = reason;